[hornetq-commits] JBoss hornetq SVN: r8152 - in branches/Clebert_Sync: tests/src/org/hornetq/tests/unit/core/journal/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 27 17:13:04 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-27 17:13:03 -0400 (Tue, 27 Oct 2009)
New Revision: 8152

Modified:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Log:
Backup again...

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-27 21:13:03 UTC (rev 8152)
@@ -18,9 +18,7 @@
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.DataConstants;
 
 /**
  * This will read records 
@@ -35,13 +33,12 @@
 
    private static final Logger log = Logger.getLogger(JournalCopier.class);
 
-   
    /** enable some trace at development. */
    private static final boolean DEV_TRACE = true;
-   
+
    private static final boolean isTraceEnabled = log.isTraceEnabled();
-   
-   private static void trace(String msg)
+
+   private static void trace(final String msg)
    {
       System.out.println("JournalCopier::" + msg);
    }
@@ -52,6 +49,9 @@
 
    private final Journal journalTo;
 
+   /** Proxy mode means, everything will be copied over without any evaluations such as */
+   private boolean proxyMode = false;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -62,14 +62,14 @@
     * @param recordsSnapshot
     * @param nextOrderingID
     */
-   public JournalCopier(SequentialFileFactory fileFactory,
-                        JournalImpl journalFrom,
-                        Journal journalTo,
-                        Set<Long> recordsSnapshot,
-                        Set<Long> pendingTransactionsSnapshot)
+   public JournalCopier(final SequentialFileFactory fileFactory,
+                        final JournalImpl journalFrom,
+                        final Journal journalTo,
+                        final Set<Long> recordsSnapshot,
+                        final Set<Long> pendingTransactionsSnapshot)
    {
       super(fileFactory, journalFrom, recordsSnapshot, -1);
-      this.pendingTransactions = pendingTransactionsSnapshot;
+      pendingTransactions = pendingTransactionsSnapshot;
       this.journalTo = journalTo;
    }
 
@@ -81,7 +81,7 @@
 
    public void onReadAddRecord(final RecordInfo info) throws Exception
    {
-      if (lookupRecord(info.id))
+      if (proxyMode || lookupRecord(info.id))
       {
          if (DEV_TRACE)
          {
@@ -100,7 +100,7 @@
 
    public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (proxyMode || pendingTransactions.contains(transactionID))
       {
          if (DEV_TRACE)
          {
@@ -118,8 +118,12 @@
    public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
    {
 
-      if (pendingTransactions.contains(transactionID))
+      if (proxyMode)
       {
+         journalTo.appendCommitRecord(transactionID, false);
+      }
+      else if (pendingTransactions.contains(transactionID))
+      {
          // Sanity check, this should never happen
          log.warn("Inconsistency during compacting: CommitRecord ID = " + transactionID +
                   " for an already committed transaction during compacting");
@@ -128,12 +132,17 @@
 
    public void onReadDeleteRecord(final long recordID) throws Exception
    {
+      if (proxyMode)
+      {
+         journalTo.appendDeleteRecord(recordID, false);
+      }
+      // else....
       // Nothing to be done here, we don't copy deleted records
    }
 
    public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (proxyMode || pendingTransactions.contains(transactionID))
       {
          journalTo.appendDeleteRecordTransactional(transactionID, info.id, info.data);
       }
@@ -147,7 +156,7 @@
 
    public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (proxyMode || pendingTransactions.contains(transactionID))
       {
          journalTo.appendPrepareRecord(transactionID, extraData, false);
       }
@@ -155,6 +164,10 @@
 
    public void onReadRollbackRecord(final long transactionID) throws Exception
    {
+      if (proxyMode)
+      {
+         journalTo.appendRollbackRecord(transactionID, false);
+      }
       if (pendingTransactions.contains(transactionID))
       {
          // Sanity check, this should never happen
@@ -165,7 +178,7 @@
 
    public void onReadUpdateRecord(final RecordInfo info) throws Exception
    {
-      if (lookupRecord(info.id))
+      if (proxyMode || lookupRecord(info.id))
       {
          journalTo.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
       }
@@ -173,7 +186,7 @@
 
    public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (proxyMode || pendingTransactions.contains(transactionID))
       {
          journalTo.appendUpdateRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
       }
@@ -183,6 +196,11 @@
       }
    }
 
+   public void setProxyMode(final boolean proxyMode)
+   {
+      this.proxyMode = proxyMode;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 21:13:03 UTC (rev 8152)
@@ -1476,8 +1476,6 @@
       compactMinFiles = 0;
       autoReclaim = false;
 
-      flushExecutor();
-
       // Wait the compactor and cleanup to finish case they are running
       // This will also set the compactorRunning, as clean up and compact can't happen at the same time
       while (!compactorRunning.compareAndSet(false, true))
@@ -1517,8 +1515,6 @@
                return;
             }
 
-            dataFiles.clear();
-
             HashSet<Long> txSet = new HashSet<Long>();
 
             for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
@@ -1535,6 +1531,9 @@
 
          Collections.sort(dataFilesToProcess, new JournalFileComparator());
 
+         // Need to make sure everything is out of executors and on the disk before backing it up
+         flush();
+
          // This is where most of the work is done, taking most of the time of the compacting routine.
          // Notice there are no locks while this is being done.
 
@@ -1544,9 +1543,48 @@
          {
             readJournalFile(fileFactory, file, copier);
          }
+         
+         
+         // Final Freeze.... Sending the left over files (including the next file)
+         globalLock.writeLock().lock();
+         
+         try
+         {
+            // Need to make sure everything is out of executors and on the disk before backing it up
+            flush();
+            
+            List<JournalFile> newDataFilesToProcess = getSnapshotFilesToProcess();
+            Collections.sort(newDataFilesToProcess, new JournalFileComparator());
+            
+            Iterator<JournalFile> newDataIterator = newDataFilesToProcess.iterator();
+            for (JournalFile alreadyProcessed : dataFilesToProcess)
+            {
+               JournalFile newFile = newDataIterator.next();
+               
+               if (newFile.getFileID() != alreadyProcessed.getFileID())
+               {
+                  log.warn("Processed FileID " + alreadyProcessed.getFileID() + " inconsistent with previous processed " + newFile.getFileID());
+               }
+            }
 
-         copier.flush();
+            while (newDataIterator.hasNext())
+            {
+               JournalFile newFile = newDataIterator.next();
+               
+               log.info("processing " + newFile.getFileID());
+               
+               readJournalFile(fileFactory, newFile, copier);
 
+            }
+            
+         }
+         finally
+         {
+            globalLock.writeLock().unlock();
+         }
+         
+
+
       }
       finally
       {
@@ -1756,6 +1794,8 @@
     */
    private List<JournalFile> getSnapshotFilesToProcess() throws Exception
    {
+      flush();
+      
       // We need to move to the next file, as we need a clear start for negatives and positives counts
       moveNextFile(true);
 

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java	2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java	2009-10-27 21:13:03 UTC (rev 8152)
@@ -14,6 +14,11 @@
 package org.hornetq.tests.unit.core.journal.impl;
 
 import java.io.File;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -22,6 +27,7 @@
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.utils.VariableLatch;
 
 /**
  * A CopyJournalTest
@@ -47,7 +53,7 @@
    
    public void testSimpleCopy() throws Exception
    {
-      setup(10, 10 * 1024, true);
+      setup(2, 100 * 1024, false);
       createJournal();
       startJournal();
       load();
@@ -59,26 +65,70 @@
          addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
       }
       addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
-      
+      for (int i = 0 ; i < 10; i++)
+      {
+         addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+      }
+    
       File destDir = new File(getTestDir()+"/dest");
       
       destDir.mkdirs();
       
-      SequentialFileFactory nioFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
+      SequentialFileFactory newFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
       
-      Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, nioFactory, filePrefix, fileExtension, 1);
+      Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, newFactory, filePrefix, fileExtension, 1);
       destJournal.start();
       destJournal.loadInternalOnly();
       
-      journal.copyTo(destJournal);
+      CountDownLatch locked = new CountDownLatch(1);
       
-      journal.flush();
+      JournalHandler handler = new JournalHandler(destJournal, locked, 5);
       
-      destJournal.flush();
+      final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class[]{Journal.class}, handler);
       
+      Thread copier = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               journal.copyTo(proxyJournal);
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+            
+         }
+      };
       
+      copier.start();
       
-      System.exit(1);
+      assertTrue(locked.await(10, TimeUnit.SECONDS));
+      
+      sequence.set(5000);
+      
+      for (int i = 0 ; i < 10 ; i++)
+      {
+         addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+         journal.forceMoveNextFile();
+      }
+      
+      
+      
+      handler.unlock();
+      
+      copier.join();
+      
+      stopJournal();
+      
+      destJournal.stop();
+      
+      this.fileFactory = newFactory;
+      
+      startJournal();
+      
+      loadAndCheck(true);
    }
 
    @Override
@@ -93,7 +143,7 @@
       return new AIOSequentialFileFactory(getTestDir(),
                                           ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
                                           1000000,
-                                          true,
+                                          false,
                                           false      
       );
    }
@@ -101,7 +151,59 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
+   
+   
+   /** This handler will lock after N calls, until the Handler is opened again */
+   protected class JournalHandler implements InvocationHandler
+   {
+      
+      
+      final VariableLatch valve = new VariableLatch();
+      
+      final CountDownLatch locked;
+      
+      final int executionsBeforeLock;
+      
+      int executions = 0;
+      
+      
+      private final Journal target;
+      
+      public JournalHandler(Journal journal, CountDownLatch locked, int executionsBeforeLock)
+      {
+         this.target = journal;
+         this.locked = locked;
+         this.executionsBeforeLock = executionsBeforeLock;
+      }
+      
+      private void lock()
+      {
+         valve.up();
+      }
+      
+      public void unlock()
+      {
+         valve.down();
+      }
+      
 
+      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+      {
+         System.out.println("Invoked " + method.getName());
+         if (executions ++ == executionsBeforeLock)
+         {
+            lock();
+            locked.countDown();
+         }
+         if (!valve.waitCompletion(10000))
+         {
+            throw new IllegalStateException("Timeout waiting for open valve");
+         }
+         return method.invoke(target, args);
+      }
+      
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------



More information about the hornetq-commits mailing list