[hornetq-commits] JBoss hornetq SVN: r8145 - in branches/Clebert_Sync: src/main/org/hornetq/core/journal/impl and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 26 22:24:41 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-26 22:24:40 -0400 (Mon, 26 Oct 2009)
New Revision: 8145

Added:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
Modified:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/SequentialFileFactory.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/TestableJournal.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOSequentialFileFactoryTest.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Backup of my current changes to the branch

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -85,6 +85,11 @@
    int getAlignment() throws Exception;
 
    void perfBlast(int pages) throws Exception;
+   
+   /** Read the entire content of the journal and copy it to another Journal */
+   void copyTo(Journal destJournal) throws Exception;
 
+   /** This method will flush everything and make a hard sync on the journal. Use it with caution. (on tests mainly) */
+   void flush() throws Exception;
 
 }

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/SequentialFileFactory.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/SequentialFileFactory.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -61,8 +61,7 @@
     */
    void createDirs() throws Exception;
    
-   // used on tests only
-   void testFlush();
+   void flushBuffers();
 
 
 }

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/TestableJournal.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/TestableJournal.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -35,8 +35,6 @@
 
    String debug() throws Exception;
 
-   void debugWait() throws Exception;
-
    int getFileSize();
 
    int getMinFiles();

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -102,7 +102,7 @@
       }
    }
 
-   public void testFlush()
+   public void flushBuffers()
    {
       timedBuffer.flush();
    }

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -64,7 +64,7 @@
    {
    }
    
-   public void testFlush()
+   public void flushBuffers()
    {
    }
 

Added: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	                        (rev 0)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Set;
+
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A JournalCopier
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCopier extends AbstractJournalUpdateTask
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(JournalCopier.class);
+
+   // Attributes ----------------------------------------------------
+
+   private final Set<Long> pendingTransactions;
+
+   private final Journal journalTo;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * @param fileFactory
+    * @param journal
+    * @param recordsSnapshot
+    * @param nextOrderingID
+    */
+   public JournalCopier(SequentialFileFactory fileFactory,
+                        JournalImpl journalFrom,
+                        Journal journalTo,
+                        Set<Long> recordsSnapshot,
+                        Set<Long> pendingTransactionsSnapshot)
+   {
+      super(fileFactory, journalFrom, recordsSnapshot, -1);
+      this.pendingTransactions = pendingTransactionsSnapshot;
+      this.journalTo = journalTo;
+   }
+
+   // Public --------------------------------------------------------
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecord(org.hornetq.core.journal.RecordInfo)
+    */
+
+   public void onReadAddRecord(final RecordInfo info) throws Exception
+   {
+      if (lookupRecord(info.id))
+      {
+         journalTo.appendAddRecord(info.id, info.userRecordType, info.data, false);
+      }
+   }
+
+   public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
+   {
+      if (pendingTransactions.contains(transactionID))
+      {
+         journalTo.appendAddRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
+      }
+      else
+      {
+         // Will try it as a regular record, the method addRecord will validate if this is a live record or not
+         onReadAddRecord(info);
+      }
+   }
+
+   public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+   {
+
+      if (pendingTransactions.contains(transactionID))
+      {
+         // Sanity check, this should never happen
+         log.warn("Inconsistency during compacting: CommitRecord ID = " + transactionID +
+                  " for an already committed transaction during compacting");
+      }
+   }
+
+   public void onReadDeleteRecord(final long recordID) throws Exception
+   {
+      // Nothing to be done here, we don't copy deleted records
+   }
+
+   public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
+   {
+      if (pendingTransactions.contains(transactionID))
+      {
+         journalTo.appendDeleteRecordTransactional(transactionID, info.id, info.data);
+      }
+      // else.. nothing to be done
+   }
+
+   public void markAsDataFile(final JournalFile file)
+   {
+      // nothing to be done here
+   }
+
+   public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+   {
+      if (pendingTransactions.contains(transactionID))
+      {
+         journalTo.appendPrepareRecord(transactionID, extraData, false);
+      }
+   }
+
+   public void onReadRollbackRecord(final long transactionID) throws Exception
+   {
+      if (pendingTransactions.contains(transactionID))
+      {
+         // Sanity check, this should never happen
+         log.warn("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+                  " for an already rolled back transaction during compacting");
+      }
+   }
+
+   public void onReadUpdateRecord(final RecordInfo info) throws Exception
+   {
+      if (lookupRecord(info.id))
+      {
+         journalTo.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
+      }
+   }
+
+   public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
+   {
+      if (pendingTransactions.contains(transactionID))
+      {
+         journalTo.appendUpdateRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
+      }
+      else
+      {
+         onReadUpdateRecord(info);
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -44,6 +44,7 @@
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
@@ -173,7 +174,7 @@
 
    private final float compactPercentage;
 
-   private final int compactMinFiles;
+   private volatile int compactMinFiles;
 
    private final SequentialFileFactory fileFactory;
 
@@ -209,8 +210,8 @@
    // After a record is appended, the usedFile can't be changed until the positives and negatives are updated
    private final ReentrantLock lockAppend = new ReentrantLock();
 
-   /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
-   private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
+   /** We never lock the journal, however we need to lock it while taking and updating snapshots */
+   private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
 
    private volatile JournalFile currentFile;
 
@@ -852,7 +853,7 @@
 
       IOCallback callback = null;
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       try
       {
@@ -878,7 +879,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
 
       if (callback != null)
@@ -901,7 +902,7 @@
 
       IOCallback callback = null;
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       try
       {
@@ -947,7 +948,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
 
       if (callback != null)
@@ -963,7 +964,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       IOCallback callback = null;
 
@@ -1012,7 +1013,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
 
       if (callback != null)
@@ -1037,7 +1038,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       try
       {
@@ -1064,7 +1065,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
    }
 
@@ -1086,7 +1087,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       try
       {
@@ -1113,7 +1114,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
    }
 
@@ -1129,7 +1130,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       try
       {
@@ -1155,7 +1156,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
    }
 
@@ -1163,7 +1164,7 @@
    {
       appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
    }
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
     */
@@ -1172,8 +1173,6 @@
       appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
    }
 
-
-
    /** 
     * 
     * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
@@ -1194,7 +1193,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       JournalTransaction tx = getTransactionInfo(txID);
 
@@ -1226,7 +1225,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
 
       // We should wait this outside of the lock, to increase throughput
@@ -1257,7 +1256,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       JournalTransaction tx = transactions.remove(txID);
 
@@ -1294,7 +1293,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
 
       if (sync)
@@ -1311,7 +1310,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
 
       JournalTransaction tx = null;
 
@@ -1343,7 +1342,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -1432,6 +1431,105 @@
       return maxID;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
+    */
+   public void copyTo(Journal journal) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         return;
+      }
+
+      int originalCompact = this.compactMinFiles;
+      boolean originalAutoReclaim = this.autoReclaim;
+
+      compactMinFiles = 0;
+      autoReclaim = false;
+
+      flushExecutor();
+
+      // Wait the compactor and cleanup to finish case they are running
+      while (!compactorRunning.compareAndSet(false, true))
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+         compactorExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               latch.countDown();
+            }
+         });
+         latch.await();
+      }
+
+      JournalCopier copier = null;
+
+      try
+      {
+
+         // begin ********************************************
+
+         List<JournalFile> dataFilesToProcess = null;
+
+         // Journal, Say cheese... I need to take a snapshot from your transactions and records now, freeze please!
+         globalLock.writeLock().lock();
+
+         try
+         {
+
+            // Take the snapshots and replace the structures
+
+            dataFilesToProcess = getSnapshotFilesToProcess();
+
+            if (dataFilesToProcess.size() == 0)
+            {
+               return;
+            }
+
+            dataFiles.clear();
+
+            HashSet<Long> txSet = new HashSet<Long>();
+
+            for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
+            {
+               txSet.add(entry.getKey());
+            }
+
+            copier = new JournalCopier(fileFactory, this, journal, records.keySet(), txSet);
+         }
+         finally
+         {
+            globalLock.writeLock().unlock();
+         }
+
+         Collections.sort(dataFilesToProcess, new JournalFileComparator());
+
+         // This is where most of the work is done, taking most of the time of the compacting routine.
+         // Notice there are no locks while this is being done.
+
+         // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
+         // well
+         for (final JournalFile file : dataFilesToProcess)
+         {
+            readJournalFile(fileFactory, file, copier);
+         }
+
+         compactor.flush();
+
+      }
+      finally
+      {
+         this.compactMinFiles = originalCompact;
+         this.autoReclaim = originalAutoReclaim;
+         compactorRunning.set(false);
+
+         // since we disabled Reclaiming during the copy, we will do a check on everything as soon as the backup is
+         // done
+         scheduleReclaim();
+      }
+   }
+
    /**
     * 
     *  Note: This method can't be called from the main executor, as it will invoke other methods depending on it.
@@ -1442,20 +1540,21 @@
 
       if (compactor != null)
       {
-         throw new IllegalStateException("There is pending compacting operation");
+         throw new IllegalStateException("There is a pending compacting operation");
       }
 
-      ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
+      List<JournalFile> dataFilesToProcess = null;
 
       boolean previousReclaimValue = autoReclaim;
 
+      globalLock.readLock().lock();
+
       try
       {
          log.debug("Starting compacting operation on journal");
 
-         // We need to guarantee that the journal is frozen for this short time
-         // We don't freeze the journal as we compact, only for the short time where we replace records
-         compactingLock.writeLock().lock();
+         // Journal, Say cheese... I need to take a snapshot from your transactions and records now, freeze please!
+         globalLock.writeLock().lock();
          try
          {
             if (state != STATE_LOADED)
@@ -1465,34 +1564,25 @@
 
             autoReclaim = false;
 
-            // We need to move to the next file, as we need a clear start for negatives and positives counts
-            moveNextFile(true);
-
             // Take the snapshots and replace the structures
 
-            dataFilesToProcess.addAll(dataFiles);
+            dataFilesToProcess = getSnapshotFilesToProcess();
 
-            for (JournalFile file : pendingCloseFiles)
+            if (dataFilesToProcess.size() == 0)
             {
-               file.getFile().close();
+               return;
             }
 
-            dataFilesToProcess.addAll(pendingCloseFiles);
-            pendingCloseFiles.clear();
-
             dataFiles.clear();
 
-            if (dataFilesToProcess.size() == 0)
-            {
-               return;
-            }
+            List<Pair<Long, JournalTransaction>> pendingTransactions = getSnapshoPendingTransactions();
 
             compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
 
-            for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
+            for (Pair<Long, JournalTransaction> tx : pendingTransactions)
             {
-               compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
-               entry.getValue().setCompacting();
+               compactor.addPendingTransaction(tx.a, tx.b.getPositiveArray());
+               tx.b.setCompacting();
             }
 
             // We will calculate the new records during compacting, what will take the position the records will take
@@ -1501,7 +1591,7 @@
          }
          finally
          {
-            compactingLock.writeLock().unlock();
+            globalLock.writeLock().unlock();
          }
 
          Collections.sort(dataFilesToProcess, new JournalFileComparator());
@@ -1529,7 +1619,7 @@
 
          SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles(), null);
 
-         compactingLock.writeLock().lock();
+         globalLock.writeLock().lock();
          try
          {
             // Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
@@ -1584,7 +1674,7 @@
          }
          finally
          {
-            compactingLock.writeLock().unlock();
+            globalLock.writeLock().unlock();
          }
 
          // At this point the journal is unlocked. We keep renaming files while the journal is already operational
@@ -1596,6 +1686,8 @@
       }
       finally
       {
+         globalLock.readLock().unlock();
+
          // An Exception was probably thrown, and the compactor was not cleared
          if (compactor != null)
          {
@@ -1614,6 +1706,40 @@
 
    }
 
+   /**
+    * @return
+    */
+   private List<Pair<Long, JournalTransaction>> getSnapshoPendingTransactions()
+   {
+      List<Pair<Long, JournalTransaction>> pendingTransactions = new ArrayList<Pair<Long, JournalTransaction>>();
+
+      for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
+      {
+         pendingTransactions.add(new Pair<Long, JournalTransaction>(entry.getKey(), entry.getValue()));
+      }
+      return pendingTransactions;
+   }
+
+   /**
+    * Requires full lock (WriteLock)
+    * @param dataFilesToProcess
+    */
+   private List<JournalFile> getSnapshotFilesToProcess() throws Exception
+   {
+      // We need to move to the next file, as we need a clear start for negatives and positives counts
+      moveNextFile(true);
+
+      List<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>();
+      for (JournalFile file : pendingCloseFiles)
+      {
+         this.closeFile(file, true);
+      }
+
+      dataFilesToProcess.addAll(dataFiles);
+
+      return dataFilesToProcess;
+   }
+
    /** 
     * <p>Load data accordingly to the record layouts</p>
     * 
@@ -2034,8 +2160,10 @@
 
             // Remove the transactionInfo
             transactions.remove(transaction.transactionID);
-            
-            loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
+
+            loadManager.failedTransaction(transaction.transactionID,
+                                          transaction.recordInfos,
+                                          transaction.recordsToDelete);
          }
          else
          {
@@ -2062,7 +2190,7 @@
    public boolean checkReclaimStatus() throws Exception
    {
       // We can't start reclaim while compacting is working
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
       try
       {
          reclaimer.scan(getDataFiles());
@@ -2144,7 +2272,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
 
       return false;
@@ -2165,12 +2293,15 @@
          return;
       }
 
-      compactingLock.readLock().lock();
+      // Journal, Say cheese... I need to take a snapshot from your transactions and records now, freeze please!
+      globalLock.readLock().lock();
 
       try
       {
          JournalCleaner cleaner = null;
          ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
+
+         // getting the lockAppend as the counters are being changed
          lockAppend.lock();
 
          try
@@ -2229,7 +2360,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
          log.debug("Clean up on file " + file + " done");
       }
 
@@ -2263,7 +2394,7 @@
             return;
          }
 
-         // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+         // We can't use the main executor for the compacting... or we would dead lock because of file open and creation
          // operations (that will use the executor)
          compactorExecutor.execute(new Runnable()
          {
@@ -2347,21 +2478,25 @@
       return builder.toString();
    }
 
-   /** Method for use on testcases.
-    *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
-   public void debugWait() throws Exception
+   public void flush() throws Exception
    {
-      fileFactory.testFlush();
+      fileFactory.flushBuffers();
 
       for (JournalTransaction tx : transactions.values())
       {
          tx.waitCallbacks();
       }
 
+      flushExecutor();
+   }
+
+   /**
+    * @throws InterruptedException
+    */
+   private void flushExecutor() throws InterruptedException
+   {
       if (filesExecutor != null && !filesExecutor.isShutdown())
       {
-         // Send something to the closingExecutor, just to make sure we went
-         // until its end
          final CountDownLatch latch = new CountDownLatch(1);
 
          filesExecutor.execute(new Runnable()
@@ -2374,7 +2509,6 @@
 
          latch.await();
       }
-
    }
 
    public int getDataFilesCount()
@@ -2430,7 +2564,7 @@
    // In some tests we need to force the journal to move to a next file
    public void forceMoveNextFile() throws Exception
    {
-      compactingLock.readLock().lock();
+      globalLock.readLock().lock();
       try
       {
          lockAppend.lock();
@@ -2441,7 +2575,7 @@
             {
                checkReclaimStatus();
             }
-            debugWait();
+            flush();
          }
          finally
          {
@@ -2450,7 +2584,7 @@
       }
       finally
       {
-         compactingLock.readLock().unlock();
+         globalLock.readLock().unlock();
       }
    }
 
@@ -2854,7 +2988,7 @@
             currentFile.getFile().write(bb, sync);
          }
 
-         return currentFile;         
+         return currentFile;
       }
       finally
       {
@@ -3115,7 +3249,7 @@
       {
          public void run()
          {
-            compactingLock.readLock().lock();
+            globalLock.readLock().lock();
             try
             {
                // The file could be closed by compacting. On this case we need to check if the close still pending
@@ -3135,7 +3269,7 @@
             }
             finally
             {
-               compactingLock.readLock().unlock();
+               globalLock.readLock().unlock();
             }
          }
       };
@@ -3355,7 +3489,7 @@
    {
 
       private static NullEncoding instance = new NullEncoding();
-      
+
       public static NullEncoding getInstance()
       {
          return instance;

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -397,6 +397,23 @@
       return localJournal.isStarted();
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
+    */
+   public void copyTo(Journal destJournal) throws Exception
+   {
+      // This would be a nonsense operation. Only the real journal can copyTo
+      throw new IllegalStateException("Operation Not Implemeted!");
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#flush()
+    */
+   public void flush() throws Exception
+   {
+      localJournal.flush();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -16,6 +16,8 @@
 
 import java.io.File;
 
+import junit.framework.TestSuite;
+
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
@@ -30,6 +32,13 @@
 public class AIOJournalCompactTest extends NIOJournalCompactTest
 {
 
+   
+   public static TestSuite suite()
+   {
+      return createAIOTestSuite(AIOJournalCompactTest.class);
+   }
+   
+
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOSequentialFileFactoryTest.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/AIOSequentialFileFactoryTest.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -16,7 +16,8 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import junit.framework.TestSuite;
+
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
@@ -31,20 +32,17 @@
  */
 public class AIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
 {
-
+   
+   public static TestSuite suite()
+   {
+      return createAIOTestSuite(AIOSequentialFileFactoryTest.class);
+   }
+   
    @Override
    protected void setUp() throws Exception
    {
       super.setUp();
 
-      if (!AsynchronousFileImpl.isLoaded())
-      {
-         fail(String.format("libAIO is not loaded on %s %s %s",
-                            System.getProperty("os.name"),
-                            System.getProperty("os.arch"),
-                            System.getProperty("os.version")));
-      }
-
       File file = new File(getTestDir());
 
       deleteDirectory(file);

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/journal/RealAIOJournalImplTest.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -15,6 +15,8 @@
 
 import java.io.File;
 
+import junit.framework.TestSuite;
+
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.SequentialFileFactory;
@@ -38,6 +40,12 @@
 public class RealAIOJournalImplTest extends JournalImplTestUnit
 {
    private static final Logger log = Logger.getLogger(RealAIOJournalImplTest.class);
+   
+   public static TestSuite suite()
+   {
+      // Ignore tests if AIO is not installed
+      return createAIOTestSuite(RealAIOJournalImplTest.class);
+   }
 
    @Override
    protected void setUp() throws Exception

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -699,5 +699,19 @@
 
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
+       */
+      public void copyTo(Journal destJournal) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#flush()
+       */
+      public void flush() throws Exception
+      {
+      }
+
    }
 }

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -236,7 +236,7 @@
 
             if (transactionSize == 0)
             {
-               journal.debugWait();
+               journal.flush();
             }
          }
          catch (Exception e)

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -280,7 +280,7 @@
 
       journalImpl.checkReclaimStatus();
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       assertEquals(2, factory.listFiles("tt").size());
 
@@ -297,7 +297,7 @@
 
       // as the request to a new file is asynchronous, we need to make sure the
       // async requests are done
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       assertEquals(3, factory.listFiles("tt").size());
 
@@ -306,7 +306,7 @@
          journalImpl.appendDeleteRecord(i, false);
       }
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       setupAndLoadJournal(JOURNAL_SIZE, 100);
 
@@ -326,7 +326,7 @@
 
       journalImpl.checkReclaimStatus();
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       assertEquals(2, factory.listFiles("tt").size());
 
@@ -341,7 +341,7 @@
 
       // as the request to a new file is asynchronous, we need to make sure the
       // async requests are done
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       assertEquals(2, factory.listFiles("tt").size());
 
@@ -354,7 +354,7 @@
 
       journalImpl.appendAddRecord(1000, (byte)1, new SimpleEncoding(1, (byte)'x'), false);
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       assertEquals(3, factory.listFiles("tt").size());
 
@@ -368,7 +368,7 @@
 
       log.debug(journalImpl.debug());
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       log.debug("Final:--> " + journalImpl.debug());
 
@@ -432,7 +432,7 @@
          journalImpl.forceMoveNextFile();
       }
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       assertEquals(12, factory.listFiles("tt").size());
 
@@ -486,7 +486,7 @@
 
       journalImpl.appendCommitRecord(1l, false);
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       assertEquals(12, factory.listFiles("tt").size());
 
@@ -542,7 +542,7 @@
 
       journalImpl.appendCommitRecord(1l, false);
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       setupAndLoadJournal(JOURNAL_SIZE, 100);
 
@@ -872,7 +872,7 @@
 
       journalImpl.appendPrepareRecord(1, xid, false);
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       setupAndLoadJournal(JOURNAL_SIZE, 1);
 
@@ -899,7 +899,7 @@
 
       journalImpl.appendCommitRecord(1l, false);
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       setupAndLoadJournal(JOURNAL_SIZE, 1);
 
@@ -923,7 +923,7 @@
          journalImpl.forceMoveNextFile();
       }
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       SimpleEncoding xid1 = new SimpleEncoding(10, (byte)1);
 
@@ -1157,7 +1157,7 @@
       setupAndLoadJournal(JOURNAL_SIZE, 0);
 
       journalImpl.forceMoveNextFile();
-      journalImpl.debugWait();
+      journalImpl.flush();
       journalImpl.checkReclaimStatus();
 
       assertEquals(0, transactions.size());
@@ -1243,11 +1243,11 @@
 
       assertEquals(2, finishedOK.intValue());
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.debugWait();
+      journalImpl.flush();
 
       journalImpl.checkReclaimStatus();
 

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -103,9 +103,9 @@
 
    protected void checkAndReclaimFiles() throws Exception
    {
-      journal.debugWait();
+      journal.flush();
       journal.checkReclaimStatus();
-      journal.debugWait();
+      journal.flush();
    }
 
    protected abstract SequentialFileFactory getFileFactory() throws Exception;
@@ -218,7 +218,7 @@
          records.add(new RecordInfo(element, (byte)0, record, false));
       }
 
-      journal.debugWait();
+      journal.flush();
    }
 
    protected void update(final long... arguments) throws Exception
@@ -232,7 +232,7 @@
          records.add(new RecordInfo(element, (byte)0, updateRecord, true));
       }
 
-      journal.debugWait();
+      journal.flush();
    }
 
    protected void delete(final long... arguments) throws Exception
@@ -244,7 +244,7 @@
          removeRecordsForID(element);
       }
 
-      journal.debugWait();
+      journal.flush();
    }
 
    protected void addTx(final long txID, final long... arguments) throws Exception
@@ -263,7 +263,7 @@
 
       }
 
-      journal.debugWait();
+      journal.flush();
    }
 
    protected void updateTx(final long txID, final long... arguments) throws Exception
@@ -278,7 +278,7 @@
 
          tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
       }
-      journal.debugWait();
+      journal.flush();
    }
 
    protected void deleteTx(final long txID, final long... arguments) throws Exception
@@ -292,7 +292,7 @@
          tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
       }
 
-      journal.debugWait();
+      journal.flush();
    }
 
    protected void prepare(final long txID, final EncodingSupport xid) throws Exception
@@ -313,7 +313,7 @@
 
       tx.prepared = true;
 
-      journal.debugWait();
+      journal.flush();
    }
 
    protected void commit(final long txID) throws Exception
@@ -329,7 +329,7 @@
 
       commitTx(txID);
 
-      journal.debugWait();
+      journal.flush();
    }
 
    protected void rollback(final long txID) throws Exception
@@ -343,7 +343,7 @@
 
       journal.appendRollbackRecord(txID, sync);
 
-      journal.debugWait();
+      journal.flush();
    }
 
    private void commitTx(final long txID)

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -985,7 +985,7 @@
 
       deleteTx(1, 1); // in file 1
 
-      journal.debugWait();
+      journal.flush();
 
       System.out.println("journal tmp :" + journal.debug());
 
@@ -1002,7 +1002,7 @@
 
       addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 2
 
-      journal.debugWait();
+      journal.flush();
 
       System.out.println("journal tmp2 :" + journal.debug());
 
@@ -1875,7 +1875,7 @@
 
       journal.forceMoveNextFile();
 
-      journal.debugWait();
+      journal.flush();
 
       addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
 

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -693,7 +693,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFileFactory#testFlush()
     */
-   public void testFlush()
+   public void flushBuffers()
    {
    }
 

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/util/UnitTestCase.java	2009-10-27 01:52:00 UTC (rev 8144)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/util/UnitTestCase.java	2009-10-27 02:24:40 UTC (rev 8145)
@@ -41,12 +41,14 @@
 import javax.transaction.xa.Xid;
 
 import junit.framework.TestCase;
+import junit.framework.TestSuite;
 
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -189,6 +191,23 @@
       return str.toString();
    }
    
+   protected static TestSuite createAIOTestSuite(Class<?> clazz)
+   {
+      TestSuite suite = new TestSuite(clazz.getName() + " testsuite");
+      
+      if (AIOSequentialFileFactory.isSupported())
+      {
+         suite.addTestSuite(clazz);
+      }
+      else
+      {
+         // System.out goes towards JUnit report
+         System.out.println("Test " + clazz.getName() + " ignored as AIO is not available");
+      }
+
+      return suite;
+   }
+   
    public static String dumpBytes(byte[] bytes)
    {
       StringBuffer buff = new StringBuffer();



More information about the hornetq-commits mailing list