[hornetq-commits] JBoss hornetq SVN: r8153 - in branches/Clebert_Sync: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 27 19:48:10 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-27 19:48:09 -0400 (Tue, 27 Oct 2009)
New Revision: 8153

Modified:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Log:
Backup changes

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-27 23:48:09 UTC (rev 8153)
@@ -36,7 +36,7 @@
    /** enable some trace at development. */
    private static final boolean DEV_TRACE = true;
 
-   private static final boolean isTraceEnabled = log.isTraceEnabled();
+   private static final boolean isTrace = log.isTraceEnabled();
 
    private static void trace(final String msg)
    {
@@ -49,7 +49,8 @@
 
    private final Journal journalTo;
 
-   /** Proxy mode means, everything will be copied over without any evaluations such as */
+   /** Proxy mode means, everything will be copied over without any evaluations.
+    *  This is useful at the end of copying when everything needs to be copied. */
    private boolean proxyMode = false;
 
    // Static --------------------------------------------------------

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 23:48:09 UTC (rev 8153)
@@ -1553,6 +1553,8 @@
             // Need to make sure everything is out of executors and on the disk before backing it up
             flush();
             
+            copier.setProxyMode(true);
+            
             List<JournalFile> newDataFilesToProcess = getSnapshotFilesToProcess();
             Collections.sort(newDataFilesToProcess, new JournalFileComparator());
             

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-27 23:48:09 UTC (rev 8153)
@@ -184,7 +184,7 @@
 
       SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
 
-      Journal localBindings = new JournalImpl(1024 * 1024,
+      JournalImpl localBindings = new JournalImpl(1024 * 1024,
                                               2,
                                               config.getJournalCompactMinFiles(),
                                               config.getJournalCompactPercentage(),
@@ -252,7 +252,7 @@
          this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
       }
 
-      Journal localMessage = new JournalImpl(config.getJournalFileSize(),
+      JournalImpl localMessage = new JournalImpl(config.getJournalFileSize(),
                                              config.getJournalMinFiles(),
                                              config.getJournalCompactMinFiles(),
                                              config.getJournalCompactPercentage(),

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java	2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java	2009-10-27 23:48:09 UTC (rev 8153)
@@ -17,6 +17,7 @@
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -44,50 +45,67 @@
    // Attributes ----------------------------------------------------
 
    final AtomicInteger sequence = new AtomicInteger(0);
-   
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-   
+
    public void testSimpleCopy() throws Exception
    {
       setup(2, 100 * 1024, false);
       createJournal();
       startJournal();
       load();
-      
-      
 
-      for (int i = 0 ; i < 10; i++)
+      ArrayList<Long> transactions = new ArrayList<Long>();
+
+      ArrayList<Long> toDelete = new ArrayList<Long>();
+
+      for (int i = 0; i < 40; i++)
       {
-         addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+         long iDelete = sequence.incrementAndGet();
+
+         toDelete.add(iDelete);
+
+         addWithSize(1024, iDelete);
       }
-      addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
-      for (int i = 0 ; i < 10; i++)
+
+      long tx = sequence.incrementAndGet();
+      transactions.add(tx);
+      addTx(tx, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
+      for (int i = 0; i < 10; i++)
       {
-         addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+         addWithSize(1024,
+                     sequence.incrementAndGet(),
+                     sequence.incrementAndGet(),
+                     sequence.incrementAndGet(),
+                     sequence.incrementAndGet());
       }
-    
-      File destDir = new File(getTestDir()+"/dest");
-      
+
+      File destDir = new File(getTestDir() + "/dest");
+
       destDir.mkdirs();
-      
+
       SequentialFileFactory newFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
-      
+
       Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, newFactory, filePrefix, fileExtension, 1);
       destJournal.start();
       destJournal.loadInternalOnly();
-      
+
       CountDownLatch locked = new CountDownLatch(1);
-      
+
       JournalHandler handler = new JournalHandler(destJournal, locked, 5);
-      
-      final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class[]{Journal.class}, handler);
-      
+
+      final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),
+                                                                   new Class[] { Journal.class },
+                                                                   handler);
+
       Thread copier = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -98,36 +116,50 @@
             {
                e.printStackTrace();
             }
-            
+
          }
       };
-      
+
       copier.start();
-      
+
       assertTrue(locked.await(10, TimeUnit.SECONDS));
-      
+
       sequence.set(5000);
-      
-      for (int i = 0 ; i < 10 ; i++)
+
+      for (int i = 0; i < 10; i++)
       {
-         addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
+         tx = sequence.incrementAndGet();
+         transactions.add(tx);
+         addTx(tx, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
          journal.forceMoveNextFile();
       }
-      
-      
-      
+
+      for (Long txToCommit : transactions)
+      {
+         commit(txToCommit);
+      }
+
+      for (Long iDelete : toDelete)
+      {
+         delete(iDelete);
+      }
+
       handler.unlock();
-      
+
       copier.join();
-      
+
       stopJournal();
-      
+
       destJournal.stop();
-      
-      this.fileFactory = newFactory;
-      
+
+      fileFactory = newFactory;
+
+      createJournal();
+
       startJournal();
-      
+
       loadAndCheck(true);
    }
 
@@ -144,53 +176,47 @@
                                           ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
                                           1000000,
                                           false,
-                                          false      
-      );
+                                          false);
    }
 
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
-   
+
    /** This handler will lock after N calls, until the Handler is opened again */
    protected class JournalHandler implements InvocationHandler
    {
-      
-      
+
       final VariableLatch valve = new VariableLatch();
-      
+
       final CountDownLatch locked;
-      
+
       final int executionsBeforeLock;
-      
+
       int executions = 0;
-      
-      
+
       private final Journal target;
-      
-      public JournalHandler(Journal journal, CountDownLatch locked, int executionsBeforeLock)
+
+      public JournalHandler(final Journal journal, final CountDownLatch locked, final int executionsBeforeLock)
       {
-         this.target = journal;
+         target = journal;
          this.locked = locked;
          this.executionsBeforeLock = executionsBeforeLock;
       }
-      
+
       private void lock()
       {
          valve.up();
       }
-      
+
       public void unlock()
       {
          valve.down();
       }
-      
 
-      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+      public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable
       {
-         System.out.println("Invoked " + method.getName());
-         if (executions ++ == executionsBeforeLock)
+         if (executions++ == executionsBeforeLock)
          {
             lock();
             locked.countDown();
@@ -201,7 +227,7 @@
          }
          return method.invoke(target, args);
       }
-      
+
    }
 
    // Private -------------------------------------------------------



More information about the hornetq-commits mailing list