[jboss-cvs] JBoss Messaging SVN: r5887 - in branches/BRANCH_JBMESSAGING_1427: src/main/org/jboss/messaging/core/journal and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Feb 17 21:07:44 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-02-17 21:07:44 -0500 (Tue, 17 Feb 2009)
New Revision: 5887

Added:
   branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java
Modified:
   branches/BRANCH_JBMESSAGING_1427/.classpath
   branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/Journal.java
   branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/TestableJournal.java
   branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
   branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
   branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
JBMESSAGING-1427 - Branch creation (upload)

Modified: branches/BRANCH_JBMESSAGING_1427/.classpath
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/.classpath	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/.classpath	2009-02-18 02:07:44 UTC (rev 5887)
@@ -64,6 +64,6 @@
 	<classpathentry kind="lib" path="thirdparty/netty/lib/netty-3.1.0.ALPHA2.jar" sourcepath="thirdparty/netty/lib/netty-3.1.0.ALPHA2-sources.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M4.jar"/>
 	<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12-1.5.2.jar"/>
-	<classpathentry kind="lib" path="thirdparty/slf4j/api/lib/slf4j-api-1.4.3.jar"/>
+	<classpathentry kind="lib" path="thirdparty/slf4j/api/lib/slf4j-api-1.5.2.jar"/>
 	<classpathentry kind="output" path="eclipse-output"/>
 </classpath>

Modified: branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/Journal.java	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/Journal.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -83,7 +83,7 @@
 
    // Load
 
-   long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+   void load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
 
    int getAlignment() throws Exception;
 

Modified: branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -22,6 +22,9 @@
 
 package org.jboss.messaging.core.journal;
 
+import org.jboss.messaging.core.journal.impl.JournalFile;
+
+
 /**
  * 
  * A TestableJournal
@@ -32,13 +35,17 @@
  */
 public interface TestableJournal extends Journal
 {
-   void checkAndReclaimFiles() throws Exception;
+   int checkAndReclaimFiles() throws Exception;
 
    int getDataFilesCount();
 
    int getFreeFilesCount();
 
    int getOpenedFilesCount();
+   
+   void cleanup(int fileID) throws Exception;
+   
+   JournalFile getJournalFile(int fileID);
 
    int getIDMapSize();
 
@@ -63,7 +70,7 @@
    /** This method could be promoted to {@link Journal} interface when we decide to use the loadManager 
     *  instead of load(List,List)
     */
-   long load(LoadManager reloadManager) throws Exception;
+   void load(LoadManager reloadManager) throws Exception;
 
    void forceMoveNextFile() throws Exception;
 

Modified: branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -31,6 +31,7 @@
  * TODO combine this with JournalFileImpl
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 public interface JournalFile
@@ -39,6 +40,30 @@
 
    void incNegCount(JournalFile file);
 
+   void decNegCount(JournalFile file);
+
+   /** Total Negative from other files to this file */
+   int getTotalNegCount();
+
+   /** Set by the Reclaimer */
+   void setTotalNegCount(int total);
+
+   /** 
+    * 
+    * A list of problematic records that would cause a linked-list effect between two files
+    * Information we will need in order to cleanup necessary delete records.
+    * 
+    * To avoid a linked-list effect, we physically remove deleted records from other files, when cleaning up
+    * */
+   void addCleanupInfo(long id, JournalFile deleteFile);
+
+   /**
+    * A list of problematic records that would cause a linked-list effect between two files
+    * @param id
+    * @return The list
+    */
+   JournalFile getCleanupInfo(long id);
+
    int getPosCount();
 
    void incPosCount();
@@ -46,7 +71,18 @@
    void decPosCount();
 
    void setCanReclaim(boolean canDelete);
+   
 
+   /**
+    * Property marking if this file is holding another file from reclaiming because of pending deletes.
+    *  */
+   void setLinkedDependency(boolean hasDependencies);
+   
+   /**
+    * @see JournalFile#setLinkedDependency(boolean) 
+    *  */
+   boolean isLinkedDependency();
+
    boolean isCanReclaim();
 
    void extendOffset(final int delta);

Modified: branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -22,13 +22,16 @@
 
 package org.jboss.messaging.core.journal.impl;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.Pair;
 
 /**
  * 
@@ -42,7 +45,7 @@
 {
    private static final Logger log = Logger.getLogger(JournalFileImpl.class);
 
-   private final SequentialFile file;
+   private SequentialFile file;
 
    private final int orderingID;
 
@@ -50,10 +53,18 @@
 
    private final AtomicInteger posCount = new AtomicInteger(0);
 
+   private int totalNegCount;
+
    private boolean canReclaim;
+   
+   private boolean linkedDependency;
 
+
    private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
 
+   // When removing an ID on cleanup, we need to know where the delete is coming from
+   private final ConcurrentMap<Long, JournalFile> cleanupIDs = new ConcurrentHashMap<Long, JournalFile>();
+
    public JournalFileImpl(final SequentialFile file, final int orderingID)
    {
       this.file = file;
@@ -61,6 +72,35 @@
       this.orderingID = orderingID;
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.impl.JournalFile#addCleanupInfo(long, org.jboss.messaging.core.journal.impl.JournalFile)
+    */
+   public void addCleanupInfo(final long id, final JournalFile deleteFile)
+   {
+      cleanupIDs.put(id, deleteFile);
+   }
+
+   public JournalFile getCleanupInfo(final long id)
+   {
+      return cleanupIDs.get(id);
+   }
+
+   /**
+    * @return the totalNegCount
+    */
+   public int getTotalNegCount()
+   {
+      return totalNegCount;
+   }
+
+   /**
+    * @param totalNegCount the totalNegCount to set
+    */
+   public void setTotalNegCount(final int totalNegCount)
+   {
+      this.totalNegCount = totalNegCount;
+   }
+
    public int getPosCount()
    {
       return posCount.intValue();
@@ -81,6 +121,11 @@
       getOrCreateNegCount(file).incrementAndGet();
    }
 
+   public void decNegCount(final JournalFile file)
+   {
+      getOrCreateNegCount(file).decrementAndGet();
+   }
+
    public int getNegCount(final JournalFile file)
    {
       AtomicInteger count = negCounts.get(file);
@@ -130,6 +175,23 @@
       return file;
    }
 
+   
+   /**
+    * @return the linkedDependency
+    */
+   public boolean isLinkedDependency()
+   {
+      return linkedDependency;
+   }
+
+   /**
+    * @param linkedDependency the linkedDependency to set
+    */
+   public void setLinkedDependency(boolean linkedDependency)
+   {
+      this.linkedDependency = linkedDependency;
+   }
+   
    @Override
    public String toString()
    {
@@ -170,4 +232,5 @@
       return count;
    }
 
+
 }

Modified: branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -45,6 +45,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -89,6 +90,9 @@
 
    private static final int STATE_LOADED = 2;
 
+   // TODO: Should we make this configurable?
+   private static final int MAX_LINKED_JOURNAL_FILES = 10;
+
    // Static --------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(JournalImpl.class);
@@ -157,6 +161,20 @@
 
    public static final byte ROLLBACK_RECORD = 19;
 
+   // Used by cleanup
+   public static final byte CLEANED_ADD_RECORD = 20;
+
+   // Used by cleanup
+   public static final byte CLEANED_ADD_RECORD_TX = 21;
+
+   // Used by cleanup
+   public static final byte CLEANED_UPDATE_RECORD = 22;
+
+   // Used by cleanup
+   public static final byte CLEANED_UPDATE_RECORD_TX = 23;
+
+   public static final byte LAST_RECORD_ID = CLEANED_UPDATE_RECORD_TX;
+
    public static final byte FILL_CHARACTER = (byte)'J';
 
    // Attributes ----------------------------------------------------
@@ -296,20 +314,8 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      int recordLength = record.getEncodeSize();
+      ByteBufferWrapper bb = generateAddRecord(true, -1, id, recordType, record);
 
-      int size = SIZE_ADD_RECORD + recordLength;
-
-      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
-      bb.putByte(ADD_RECORD);
-      bb.putInt(-1); // skip ID part
-      bb.putLong(id);
-      bb.putInt(recordLength);
-      bb.putByte(recordType);
-      record.encode(bb);
-      bb.putInt(size);
-
       try
       {
          JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
@@ -348,18 +354,8 @@
          throw new IllegalStateException("Cannot find add info " + id);
       }
 
-      int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+      ByteBufferWrapper bb = generateUpdateRecord(true, -1, id, recordType, record);
 
-      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
-      bb.putByte(UPDATE_RECORD);
-      bb.putInt(-1); // skip ID part
-      bb.putLong(id);
-      bb.putInt(record.getEncodeSize());
-      bb.putByte(recordType);
-      record.encode(bb);
-      bb.putInt(size);
-
       try
       {
          JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
@@ -406,7 +402,7 @@
       {
          JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
 
-         posFiles.addDelete(usedFile);
+         posFiles.addDelete(id, usedFile);
       }
       finally
       {
@@ -436,22 +432,9 @@
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
-      
-      int recordLength = record.getEncodeSize();
 
-      int size = SIZE_ADD_RECORD_TX + recordLength;
+      ByteBufferWrapper bb = generateAddTransactionalRecord(true, -1, txID, id, recordType, record);
 
-      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
-      bb.putByte(ADD_RECORD_TX);
-      bb.putInt(-1); // skip ID part
-      bb.putLong(txID);
-      bb.putLong(id);
-      bb.putInt(recordLength);
-      bb.putByte(recordType);
-      record.encode(bb);
-      bb.putInt(size);
-
       try
       {
          JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
@@ -491,19 +474,8 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+      ByteBufferWrapper bb = generateUpdateRecordTransactional(true, -1, txID, id, recordType, record);
 
-      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
-      bb.putByte(UPDATE_RECORD_TX);
-      bb.putInt(-1); // skip ID part
-      bb.putLong(txID);
-      bb.putLong(id);
-      bb.putInt(record.getEncodeSize());
-      bb.putByte(recordType);
-      record.encode(bb);
-      bb.putInt(size);
-
       try
       {
          JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
@@ -779,33 +751,221 @@
 
    }
 
+   public void cleanup(final JournalFile journalFile) throws Exception
+   {
+
+      final int fileID = journalFile.getOrderingID();
+
+      final SequentialFile sf = journalFile.getFile();
+
+      sf.open(maxAIO);
+
+      try
+      {
+         readJournalFile(journalFile, new JournalReader()
+         {
+
+            public void addRecord(final int recordPos, final RecordInfo recordInfo) throws Exception
+            {
+               JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
+               if (cleanupFile != null)
+               {
+                  if (trace)
+                  {
+                     trace("Cleaning addRecord id = " + recordInfo.id);
+                  }
+
+                  ByteBufferWrapper buffer = generateAddRecord(false,
+                                                               fileID,
+                                                               recordInfo.id,
+                                                               recordInfo.userRecordType,
+                                                               new ByteArrayEncoding(recordInfo.data));
+
+                  buffer.rewind();
+
+                  sf.position(recordPos);
+                  sf.write(buffer.getBuffer(), false);
+
+                  // Eliminating the dependency between a and b
+
+                  cleanupFile.decNegCount(journalFile);
+                  journalFile.decPosCount();
+               }
+
+            }
+
+            public void cleanedAddRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+            {
+               if (trace)
+               {
+                  trace("Ignored already cleaned TXrecord " + recordInfo.id + ", transactionID = " + transactionID);
+               }
+            }
+
+            public void cleanedAddRecord(final int recordPos, final RecordInfo recordInfo)
+            {
+               if (trace)
+               {
+                  trace("Ignoring already cleaned record " + recordInfo.id);
+               }
+            }
+
+            public void addRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo) throws Exception
+            {
+               JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
+               if (cleanupFile != null)
+               {
+                  if (trace)
+                  {
+                     trace("Cleaning addRecordTX record id = " + recordInfo.id + " transactionID = " + transactionID);
+                  }
+
+                  ByteBufferWrapper bb = generateAddTransactionalRecord(false,
+                                                                        fileID,
+                                                                        transactionID,
+                                                                        recordInfo.id,
+                                                                        recordInfo.userRecordType,
+                                                                        new ByteArrayEncoding(recordInfo.data));
+
+                  bb.rewind();
+
+                  sf.position(recordPos);
+                  sf.write(bb.getBuffer(), false);
+
+                  // Eliminating the dependency between a and b
+
+                  cleanupFile.decNegCount(journalFile);
+                  journalFile.decPosCount();
+               }
+            }
+
+            public void commitRecord(final int recordPos, final long transactionID, final byte[] summaryData)
+            {
+            }
+
+            public void deleteRecord(final int recordPos, final long recordID)
+            {
+            }
+
+            public void deleteRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+            {
+            }
+
+            public void markAsDataFile()
+            {
+            }
+
+            public void prepareRecord(final int recordPos,
+                                      final long transactionID,
+                                      final byte[] extraData,
+                                      final byte[] summaryData)
+            {
+            }
+
+            public void rollbackRecord(final int recordPos, final long transactionID)
+            {
+            }
+
+            public void updateRecord(final int recordPos, final RecordInfo recordInfo) throws Exception
+            {
+               JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
+               if (cleanupFile != null)
+               {
+                  if (trace)
+                  {
+                     trace("Cleaning updateRecord = " + recordInfo);
+                  }
+
+                  ByteBufferWrapper bb = generateUpdateRecord(false,
+                                                              fileID,
+                                                              recordInfo.id,
+                                                              recordInfo.userRecordType,
+                                                              new ByteArrayEncoding(recordInfo.data));
+
+                  bb.rewind();
+
+                  sf.position(recordPos);
+                  sf.write(bb.getBuffer(), false);
+
+                  // Eliminating the dependency between a and b
+
+                  cleanupFile.decNegCount(journalFile);
+                  journalFile.decPosCount();
+               }
+            }
+
+            public void updateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo) throws Exception
+            {
+               JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
+               if (cleanupFile != null)
+               {
+                  if (trace)
+                  {
+                     trace("Cleaning updateRecord = " + recordInfo);
+                  }
+
+                  ByteBufferWrapper bb = generateUpdateRecordTransactional(false,
+                                                                           fileID,
+                                                                           transactionID,
+                                                                           recordInfo.id,
+                                                                           recordInfo.userRecordType,
+                                                                           new ByteArrayEncoding(recordInfo.data));
+                  bb.rewind();
+
+                  sf.position(recordPos);
+                  sf.write(bb.getBuffer(), false);
+
+                  // Eliminating the dependency between a and b
+
+                  cleanupFile.decNegCount(journalFile);
+                  journalFile.decPosCount();
+               }
+            }
+
+            public void cleanedUpdateRecord(final int recordPos, final RecordInfo recordInfo)
+            {
+
+            }
+
+            public void cleanedUpdateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+            {
+            }
+
+         });
+      }
+      finally
+      {
+         sf.close();
+      }
+   }
+
    /**
     * @see JournalImpl#load(LoadManager)
     */
-   public synchronized long load(final List<RecordInfo> committedRecords,
+   public synchronized void load(final List<RecordInfo> committedRecords,
                                  final List<PreparedTransactionInfo> preparedTransactions) throws Exception
    {
       final Set<Long> recordsToDelete = new HashSet<Long>();
       final List<RecordInfo> records = new ArrayList<RecordInfo>();
 
-      long maxID = load(new LoadManager()
+      load(new LoadManager()
       {
-         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+         public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
          {
             preparedTransactions.add(preparedTransaction);
          }
 
-         public void addRecord(RecordInfo info)
+         public void addRecord(final RecordInfo info)
          {
             records.add(info);
          }
 
-         public void updateRecord(RecordInfo info)
+         public void updateRecord(final RecordInfo info)
          {
             records.add(info);
          }
 
-         public void deleteRecord(long id)
+         public void deleteRecord(final long id)
          {
             recordsToDelete.add(id);
          }
@@ -819,7 +979,7 @@
          }
       }
 
-      return maxID;
+      return;
    }
 
    /** 
@@ -857,265 +1017,124 @@
     * <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p> 
     * 
     * */
-   public synchronized long load(final LoadManager loadManager) throws Exception
+   public synchronized void load(final LoadManager loadManager) throws Exception
    {
       if (state != STATE_STARTED)
       {
          throw new IllegalStateException("Journal must be in started state");
       }
 
-      Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+      final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
 
-      List<JournalFile> orderedFiles = orderFiles();
+      final List<JournalFile> orderedFiles = orderFiles();
 
       int lastDataPos = SIZE_HEADER;
 
-      long maxID = -1;
+      for (JournalFile loopFile : orderedFiles)
+      {
+         final AtomicBoolean hasData = new AtomicBoolean(false);
+         final JournalFile file = loopFile;
 
-      for (JournalFile file : orderedFiles)
-      {
          file.getFile().open(1);
 
-         ByteBuffer bb = fileFactory.newBuffer(fileSize);
-
-         int bytesRead = file.getFile().read(bb);
-
-         if (bytesRead != fileSize)
+         if (trace)
          {
-            // FIXME - We should extract everything we can from this file
-            // and then we shouldn't ever reuse this file on reclaiming (instead
-            // reclaim on different size files would aways throw the file away)
-            // rather than throw ISE!
-            // We don't want to leave the user with an unusable system
-            throw new IllegalStateException("File is wrong size " + bytesRead +
-                                            " expected " +
-                                            fileSize +
-                                            " : " +
-                                            file.getFile().getFileName());
+            trace("loading file " + file);
          }
 
-         // First long is the ordering timestamp, we just jump its position
-         bb.position(SIZE_HEADER);
-
-         boolean hasData = false;
-
-         while (bb.hasRemaining())
+         try
          {
-            final int pos = bb.position();
 
-            byte recordType = bb.get();
-
-            if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+            // We use an inner method here, as the same method read could be used by both load and cleanup
+            // We reuse the same method that will treat the data-format for the journal file on both cases
+            final int fileLastPos = readJournalFile(file, new JournalReader()
             {
-               // I - We scan for any valid record on the file. If a hole
-               // happened on the middle of the file we keep looking until all
-               // the possibilities are gone
-               continue;
-            }
 
-            if (bb.position() + SIZE_INT > fileSize)
-            {
-               // II - Ignore this record, lets keep looking
-               continue;
-            }
+               public void addRecord(final int recordPos, final RecordInfo info)
+               {
+                  if (trace)
+                  {
+                     trace("AddRecord: " + info);
+                  }
+                  loadManager.addRecord(info);
 
-            // III - Every record has the file-id.
-            // This is what supports us from not re-filling the whole file
-            int readFileId = bb.getInt();
+                  posFilesMap.put(info.id, new PosFiles(file));
 
-            // IV - This record is from a previous file-usage. The file was
-            // reused and we need to ignore this record
-            if (readFileId != file.getOrderingID())
-            {
-               // If a file has damaged records, we make it a dataFile, and the
-               // next reclaiming will fix it
-               hasData = true;
+                  hasData.set(true);
 
-               bb.position(pos + 1);
-
-               continue;
-            }
-
-            long transactionID = 0;
-
-            if (isTransaction(recordType))
-            {
-               if (bb.position() + SIZE_LONG > fileSize)
-               {
-                  continue;
                }
 
-               transactionID = bb.getLong();
-            }
-
-            long recordID = 0;
-
-            if (!isCompleteTransaction(recordType))
-            {
-               if (bb.position() + SIZE_LONG > fileSize)
+               public void updateRecord(final int recordPos, final RecordInfo recordInfo)
                {
-                  continue;
-               }
+                  if (trace)
+                  {
+                     trace("UpdateRecord: " + recordInfo);
+                  }
 
-               recordID = bb.getLong();
+                  loadManager.updateRecord(recordInfo);
 
-               maxID = Math.max(maxID, recordID);
-            }
+                  hasData.set(true);
 
-            // We use the size of the record to validate the health of the
-            // record.
-            // (V) We verify the size of the record
+                  PosFiles posFiles = posFilesMap.get(recordInfo.id);
 
-            // The variable record portion used on Updates and Appends
-            int variableSize = 0;
+                  if (posFiles != null)
+                  {
+                     // It's legal for this to be null. The file(s) with the insert may
+                     // have been deleted
+                     // just leaving some updates in this file
 
-            // Used to hold extra data on transaction prepares
-            int preparedTransactionExtraDataSize = 0;
-
-            byte userRecordType = 0;
-
-            byte record[] = null;
-
-            if (isContainsBody(recordType))
-            {
-               if (bb.position() + SIZE_INT > fileSize)
-               {
-                  continue;
+                     posFiles.addUpdateFile(file);
+                  }
                }
 
-               variableSize = bb.getInt();
-
-               if (bb.position() + variableSize > fileSize)
+               public void deleteRecord(final int recordPos, final long recordID)
                {
-                  log.warn("Record at position " + pos +
-                           " file:" +
-                           file.getFile().getFileName() +
-                           " is corrupted and it is being ignored");
-                  continue;
-               }
+                  if (trace)
+                  {
+                     trace("DeleteRecord " + recordID);
+                  }
+                  loadManager.deleteRecord(recordID);
 
-               if (recordType != DELETE_RECORD_TX)
-               {
-                  userRecordType = bb.get();
-               }
+                  hasData.set(true);
 
-               record = new byte[variableSize];
+                  PosFiles posFiles = posFilesMap.remove(recordID);
 
-               bb.get(record);
-            }
+                  if (posFiles != null)
+                  {
+                     posFiles.addDelete(recordID, file);
+                  }
 
-            if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
-            {
-               if (recordType == PREPARE_RECORD)
-               {
-                  // Add the variable size required for preparedTransactions
-                  preparedTransactionExtraDataSize = bb.getInt();
                }
-               // Both commit and record contain the recordSummary, and this is
-               // used to calculate the record-size on both record-types
-               variableSize += bb.getInt() * SIZE_INT * 2;
-            }
 
-            int recordSize = getRecordSize(recordType);
-
-            // VI - this is completing V, We will validate the size at the end
-            // of the record,
-            // But we avoid buffer overflows by damaged data
-            if (pos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
-            {
-               // Avoid a buffer overflow caused by damaged data... continue
-               // scanning for more records...
-               log.warn("Record at position " + pos +
-                        " file:" +
-                        file.getFile().getFileName() +
-                        " is corrupted and it is being ignored");
-               // If a file has damaged records, we make it a dataFile, and the
-               // next reclaiming will fix it
-               hasData = true;
-
-               continue;
-            }
-
-            int oldPos = bb.position();
-
-            bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
-
-            int checkSize = bb.getInt();
-
-            // VII - The checkSize at the end has to match with the size
-            // informed at the beggining.
-            // This is like testing a hash for the record. (We could replace the
-            // checkSize by some sort of calculated hash)
-            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
-            {
-               log.warn("Record at position " + pos +
-                        " file:" +
-                        file.getFile().getFileName() +
-                        " is corrupted and it is being ignored");
-
-               // If a file has damaged records, we make it a dataFile, and the
-               // next reclaiming will fix it
-               hasData = true;
-
-               bb.position(pos + SIZE_BYTE);
-
-               continue;
-            }
-
-            bb.position(oldPos);
-
-            // At this point everything is checked. So we relax and just load
-            // the data now.
-
-            switch (recordType)
-            {
-               case ADD_RECORD:
+               public void cleanedAddRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
                {
-                  loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+                  if (trace)
+                  {
+                     trace("cleanedAddRecordTX: " + recordInfo);
+                  }
 
-                  posFilesMap.put(recordID, new PosFiles(file));
+                  JournalTransaction tnp = transactionInfos.get(transactionID);
 
-                  hasData = true;
-
-                  break;
-               }
-               case UPDATE_RECORD:
-               {
-                  loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
-
-                  hasData = true;
-
-                  PosFiles posFiles = posFilesMap.get(recordID);
-
-                  if (posFiles != null)
+                  if (tnp == null)
                   {
-                     // It's legal for this to be null. The file(s) with the may
-                     // have been deleted
-                     // just leaving some updates in this file
+                     tnp = new JournalTransaction();
 
-                     posFiles.addUpdateFile(file);
+                     transactionInfos.put(transactionID, tnp);
                   }
 
-                  break;
+                  tnp.addSummaryOnly(file);
                }
-               case DELETE_RECORD:
+
+               public void addRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
                {
-                  loadManager.deleteRecord(recordID);
 
-                  hasData = true;
-
-                  PosFiles posFiles = posFilesMap.remove(recordID);
-
-                  if (posFiles != null)
+                  if (trace)
                   {
-                     posFiles.addDelete(file);
+                     trace((recordInfo.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + recordInfo +
+                           ", txid = " +
+                           transactionID);
                   }
 
-                  break;
-               }
-               case ADD_RECORD_TX:
-               case UPDATE_RECORD_TX:
-               {
                   TransactionHolder tx = transactions.get(transactionID);
 
                   if (tx == null)
@@ -1125,7 +1144,7 @@
                      transactions.put(transactionID, tx);
                   }
 
-                  tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType == UPDATE_RECORD_TX));
+                  tx.recordInfos.add(recordInfo);
 
                   JournalTransaction tnp = transactionInfos.get(transactionID);
 
@@ -1136,14 +1155,24 @@
                      transactionInfos.put(transactionID, tnp);
                   }
 
-                  tnp.addPositive(file, recordID);
+                  tnp.addPositive(file, recordInfo.id);
 
-                  hasData = true;
+                  hasData.set(true);
+               }
 
-                  break;
+               public void updateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+               {
+                  // There is no difference here, so using the same method
+                  addRecordTX(recordPos, transactionID, recordInfo);
                }
-               case DELETE_RECORD_TX:
+
+               public void deleteRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
                {
+                  if (trace)
+                  {
+                     trace("deleteRecordTX " + recordInfo + ", txid = " + transactionID);
+                  }
+
                   TransactionHolder tx = transactions.get(transactionID);
 
                   if (tx == null)
@@ -1153,7 +1182,7 @@
                      transactions.put(transactionID, tx);
                   }
 
-                  tx.recordsToDelete.add(new RecordInfo(recordID, (byte)0, record, true));
+                  tx.recordsToDelete.add(recordInfo);
 
                   JournalTransaction tnp = transactionInfos.get(transactionID);
 
@@ -1164,14 +1193,21 @@
                      transactionInfos.put(transactionID, tnp);
                   }
 
-                  tnp.addNegative(file, recordID);
+                  tnp.addNegative(file, recordInfo.id);
 
-                  hasData = true;
+                  hasData.set(true);
+               }
 
-                  break;
-               }
-               case PREPARE_RECORD:
+               public void prepareRecord(final int recordPos,
+                                         final long transactionID,
+                                         final byte[] extraData,
+                                         final byte[] summaryData)
                {
+                  if (trace)
+                  {
+                     trace("prepareRecordTX: txid = " + transactionID);
+                  }
+
                   TransactionHolder tx = transactions.get(transactionID);
 
                   if (tx == null)
@@ -1182,12 +1218,9 @@
                      transactions.put(transactionID, tx);
                   }
 
-                  byte extraData[] = new byte[preparedTransactionExtraDataSize];
-
-                  bb.get(extraData);
-
                   // Pair <FileID, NumberOfElements>
-                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(summaryData.length,
+                                                                                              ByteBuffer.wrap(summaryData));
 
                   tx.prepared = true;
 
@@ -1210,22 +1243,28 @@
                   }
                   else
                   {
-                     log.warn("Prepared transaction " + transactionID + " wasn't considered completed, it will be ignored");
+                     log.warn("Prepared transaction " + transactionID +
+                              " wasn't considered completed, it will be ignored");
                      tx.invalid = true;
                   }
 
-                  hasData = true;
+                  hasData.set(true);
+               }
 
-                  break;
-               }
-               case COMMIT_RECORD:
+               public void commitRecord(final int recordPos, final long transactionID, final byte[] summaryData)
                {
+                  if (trace)
+                  {
+                     trace("commitRecord: txid = " + transactionID);
+                  }
+
                   TransactionHolder tx = transactions.remove(transactionID);
 
                   // We need to read it even if transaction was not found, or
                   // the reading checks would fail
                   // Pair <OrderId, NumberOfElements>
-                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(summaryData.length,
+                                                                                              ByteBuffer.wrap(summaryData));
 
                   // The commit could be alone on its own journal-file and the
                   // whole transaction body was reclaimed but not the
@@ -1274,13 +1313,18 @@
                         journalTransaction.forget();
                      }
 
-                     hasData = true;
+                     hasData.set(true);
                   }
 
-                  break;
                }
-               case ROLLBACK_RECORD:
+
+               public void rollbackRecord(final int recordPos, final long transactionID)
                {
+                  if (trace)
+                  {
+                     trace("rollbackRecord: txid = " + transactionID);
+                  }
+
                   TransactionHolder tx = transactions.remove(transactionID);
 
                   // The rollback could be alone on its own journal-file and the
@@ -1300,43 +1344,79 @@
                      // Rollbacks.. We will ignore the data anyway.
                      tnp.rollback(file);
 
-                     hasData = true;
+                     hasData.set(true);
                   }
 
-                  break;
                }
-               default:
+
+               public void markAsDataFile()
                {
-                  throw new IllegalStateException("Journal " + file.getFile().getFileName() +
-                                                  " is corrupt, invalid record type " +
-                                                  recordType);
+                  if (trace)
+                  {
+                     trace("markAsDataFile");
+                  }
+
+                  hasData.set(true);
                }
-            }
 
-            checkSize = bb.getInt();
+               public void cleanedAddRecord(final int recordPos, final RecordInfo recordInfo)
+               {
+                  if (trace)
+                  {
+                     trace("cleanedAddRecord: " + recordInfo);
+                  }
 
-            // This is a sanity check about the loading code itself.
-            // If this checkSize doesn't match, it means the reading method is
-            // not doing what it was supposed to do
-            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+               }
+
+               public void cleanedUpdateRecord(final int recordPos, final RecordInfo recordInfo)
+               {
+                  if (trace)
+                  {
+                     trace("cleanedUpdateRecord: " + recordInfo);
+                  }
+               }
+
+               public void cleanedUpdateRecordTX(final int recordPos,
+                                                 final long transactionID,
+                                                 final RecordInfo recordInfo)
+               {
+                  if (trace)
+                  {
+                     trace("cleanedUpdateRecordTx: " + recordInfo + ", txID = " + transactionID);
+                  }
+
+                  JournalTransaction tnp = transactionInfos.get(transactionID);
+
+                  if (tnp == null)
+                  {
+                     tnp = new JournalTransaction();
+
+                     transactionInfos.put(transactionID, tnp);
+                  }
+
+                  tnp.addSummaryOnly(file);
+
+               }
+
+            });
+
+            if (hasData.get())
             {
-               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+               lastDataPos = fileLastPos;
+               dataFiles.add(file);
             }
+            else
+            {
+               // Empty dataFiles with no data
+               freeFiles.add(file);
+            }
 
-            lastDataPos = bb.position();
          }
-
-         file.getFile().close();
-
-         if (hasData)
+         finally
          {
-            dataFiles.add(file);
+            file.getFile().close();
          }
-         else
-         {
-            // Empty dataFiles with no data
-            freeFiles.add(file);
-         }
+
       }
 
       // Create any more files we need
@@ -1424,7 +1504,7 @@
 
       checkAndReclaimFiles();
 
-      return maxID;
+      return;
    }
 
    public int getAlignment() throws Exception
@@ -1432,9 +1512,31 @@
       return fileFactory.getAlignment();
    }
 
-   // TestableJournal implementation
-   // --------------------------------------------------------------
+   // TestableJournal implementation --------------------------------------------------------------
 
+   public JournalFile getJournalFile(final int fileID)
+   {
+      for (JournalFile file : dataFiles)
+      {
+         if (file.getOrderingID() == fileID)
+         {
+            return file;
+         }
+      }
+
+      return null;
+   }
+
+   public void cleanup(final int fileID) throws Exception
+   {
+      JournalFile file = getJournalFile(fileID);
+
+      if (file != null)
+      {
+         cleanup(file);
+      }
+   }
+
    public void setAutoReclaim(final boolean autoReclaim)
    {
       this.autoReclaim = autoReclaim;
@@ -1456,8 +1558,12 @@
          builder.append("DataFile:" + file +
                         " posCounter = " +
                         file.getPosCount() +
+                        " totalNegative = " +
+                        file.getTotalNegCount() +
                         " reclaimStatus = " +
                         file.isCanReclaim() +
+                        " linkedDependency = " +
+                        file.isLinkedDependency() +
                         "\n");
          if (file instanceof JournalFileImpl)
          {
@@ -1518,10 +1624,19 @@
 
    }
 
-   public void checkAndReclaimFiles() throws Exception
+   public int checkAndReclaimFiles() throws Exception
    {
-      checkReclaimStatus();
+      Set<JournalFile> empty = Collections.emptySet();
+      return checkAndReclaimFiles(empty, false);
+   }
 
+   /** 
+    * When we cleanup a file, we reschedule a check, but we can't cleanup the same file again, or we may get on an infinite loop
+    * */
+   private int checkAndReclaimFiles(final Set<JournalFile> cleanedFiles, final boolean performCleanup) throws Exception
+   {
+      int secondCriterionCount = checkReclaimStatus();
+
       for (JournalFile file : dataFiles)
       {
          if (file.isCanReclaim())
@@ -1552,6 +1667,71 @@
             }
          }
       }
+
+      if (performCleanup)
+      {
+         if (secondCriterionCount > MAX_LINKED_JOURNAL_FILES)
+         {
+            JournalFile cleanupFile = null;
+            
+            // Will look for the first file that has many dependencies.
+            // This is because a single file could be holding all the records
+            for (JournalFile file : dataFiles)
+            {
+               if (file.isLinkedDependency())
+               {
+                  if (cleanedFiles.contains(file))
+                  {
+                     if (trace)
+                     {
+                        trace("File " + file + " was already cleaned, getting next one");
+                     }
+                     
+                     // However in some exceptional cases, the file could still have a dependency after cleaned (commits on different files for instance)
+                     // Because of that, on the subsequent scheduled cleanups (if any), we need to ensure we don't reprocess the same file or we would
+                     // be in an infinite loop
+                     continue;
+                  }
+
+                  cleanupFile = file;
+                  break;
+               }
+            }
+
+            if (cleanupFile != null)
+            {
+               log.info("System has too many linked files on deletes(" + secondCriterionCount +
+               "), performing a cleanup on " + cleanupFile);
+
+               if (trace)
+               {
+                  trace("Cleaning up " + cleanupFile);
+               }
+
+               cleanedFiles.add(cleanupFile);
+               cleanup(cleanupFile);
+
+               filesExecutor.execute(new Runnable()
+               {
+                  public void run()
+                  {
+                     try
+                     {
+                        checkAndReclaimFiles(cleanedFiles, true);
+                     }
+                     catch (Exception e)
+                     {
+                        log.error(e.getMessage(), e);
+                     }
+                  }
+               });
+
+            }
+
+         }
+      }
+
+      return secondCriterionCount;
    }
 
    public int getDataFilesCount()
@@ -1639,7 +1819,7 @@
    public synchronized void stop() throws Exception
    {
       trace("Stopping the journal");
-      
+
       if (state == STATE_STOPPED)
       {
          throw new IllegalStateException("Journal is already stopped");
@@ -1684,17 +1864,168 @@
       }
    }
 
-   // Public
-   // -----------------------------------------------------------------------------
+   // Public --------------------------------------------------------------------------------
 
-   // Private
-   // -----------------------------------------------------------------------------
+   // Private -------------------------------------------------------------------------------
 
-   private void checkReclaimStatus() throws Exception
+   
+   // Private Methods that encapsulate data format generation -------------------------------
+   
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @return
+    */
+   private ByteBufferWrapper generateAddRecord(final boolean addRecord,
+                                               final int fileID,
+                                               final long id,
+                                               final byte userRecordType,
+                                               final EncodingSupport record)
    {
+      int recordLength = record.getEncodeSize();
+
+      int size = SIZE_ADD_RECORD + recordLength;
+
+      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+      if (addRecord)
+      {
+         bb.putByte(ADD_RECORD);
+      }
+      else
+      {
+         bb.putByte(CLEANED_ADD_RECORD);
+      }
+
+      bb.putInt(fileID);
+      bb.putLong(id);
+      bb.putInt(recordLength);
+      bb.putByte(userRecordType);
+      record.encode(bb);
+      bb.putInt(size);
+
+      return bb;
+   }
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @return
+    */
+   private ByteBufferWrapper generateUpdateRecord(final boolean isUpdateRecord,
+                                                  final int fileID,
+                                                  final long id,
+                                                  final byte recordType,
+                                                  final EncodingSupport record)
+   {
+      int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+
+      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+      if (isUpdateRecord)
+      {
+         bb.putByte(UPDATE_RECORD);
+      }
+      else
+      {
+         bb.putByte(CLEANED_UPDATE_RECORD);
+      }
+      bb.putInt(fileID); // skip ID part
+      bb.putLong(id);
+      bb.putInt(record.getEncodeSize());
+      bb.putByte(recordType);
+      record.encode(bb);
+      bb.putInt(size);
+      return bb;
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param recordType
+    * @param record
+    * @return
+    */
+   private ByteBufferWrapper generateAddTransactionalRecord(final boolean addRecord,
+                                                            final int fileID,
+                                                            final long txID,
+                                                            final long id,
+                                                            final byte recordType,
+                                                            final EncodingSupport record)
+   {
+      int recordLength = record.getEncodeSize();
+
+      int size = SIZE_ADD_RECORD_TX + recordLength;
+
+      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+      if (addRecord)
+      {
+         bb.putByte(ADD_RECORD_TX);
+      }
+      else
+      {
+         bb.putByte(CLEANED_ADD_RECORD_TX);
+      }
+      bb.putInt(fileID); // skip ID part
+      bb.putLong(txID);
+      bb.putLong(id);
+      bb.putInt(recordLength);
+      bb.putByte(recordType);
+      record.encode(bb);
+      bb.putInt(size);
+      return bb;
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param recordType
+    * @param record
+    * @return
+    */
+   private ByteBufferWrapper generateUpdateRecordTransactional(final boolean isUpdateRecord,
+                                                               final int fileID,
+                                                               final long txID,
+                                                               final long id,
+                                                               final byte recordType,
+                                                               final EncodingSupport record)
+   {
+      int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+
+      ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+      if (isUpdateRecord)
+      {
+         bb.putByte(UPDATE_RECORD_TX);
+      }
+      else
+      {
+         bb.putByte(CLEANED_UPDATE_RECORD_TX);
+      }
+
+      bb.putInt(fileID); // skip ID part
+      bb.putLong(txID);
+      bb.putLong(id);
+      bb.putInt(record.getEncodeSize());
+      bb.putByte(recordType);
+      record.encode(bb);
+      bb.putInt(size);
+      return bb;
+   }
+
+   // -------------------------------------------------------------------------------------------------------
+   
+   /**
+    * @return the number of linkedFiles
+    */
+   private int checkReclaimStatus() throws Exception
+   {
       JournalFile[] files = new JournalFile[dataFiles.size()];
 
-      reclaimer.scan(dataFiles.toArray(files));
+      return reclaimer.scan(dataFiles.toArray(files));
    }
 
    // Discard the old JournalFile and set it with a new ID
@@ -1702,6 +2033,17 @@
    {
       int newOrderingID = generateOrderingID();
 
+      return reinitializeFile(file, newOrderingID);
+   }
+
+   /**
+    * @param file
+    * @param newOrderingID
+    * @return
+    * @throws Exception
+    */
+   private JournalFile reinitializeFile(final JournalFile file, final int newOrderingID) throws Exception
+   {
       SequentialFile sf = file.getFile();
 
       sf.open(1);
@@ -1871,6 +2213,8 @@
    {
       return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX ||
              recordType == DELETE_RECORD_TX ||
+             recordType == CLEANED_ADD_RECORD_TX ||
+             recordType == CLEANED_UPDATE_RECORD_TX ||
              isCompleteTransaction(recordType);
    }
 
@@ -1881,7 +2225,9 @@
 
    private boolean isContainsBody(final byte recordType)
    {
-      return recordType >= ADD_RECORD && recordType <= DELETE_RECORD_TX;
+      return recordType >= ADD_RECORD && recordType <= DELETE_RECORD_TX ||
+             recordType >= CLEANED_ADD_RECORD &&
+             recordType <= CLEANED_UPDATE_RECORD_TX;
    }
 
    private int getRecordSize(final byte recordType)
@@ -1891,15 +2237,19 @@
       switch (recordType)
       {
          case ADD_RECORD:
+         case CLEANED_ADD_RECORD:
             recordSize = SIZE_ADD_RECORD;
             break;
          case UPDATE_RECORD:
+         case CLEANED_UPDATE_RECORD:
             recordSize = SIZE_UPDATE_RECORD;
             break;
          case ADD_RECORD_TX:
+         case CLEANED_ADD_RECORD_TX:
             recordSize = SIZE_ADD_RECORD_TX;
             break;
          case UPDATE_RECORD_TX:
+         case CLEANED_UPDATE_RECORD_TX:
             recordSize = SIZE_UPDATE_RECORD_TX;
             break;
          case DELETE_RECORD:
@@ -2162,7 +2512,7 @@
             {
                try
                {
-                  checkAndReclaimFiles();
+                  checkAndReclaimFiles(new HashSet<JournalFile>(), true);
                }
                catch (Exception e)
                {
@@ -2192,6 +2542,17 @@
     * */
    private void pushOpenedFile() throws Exception
    {
+      JournalFile nextOpenedFile = openNewFile();
+
+      openedFiles.offer(nextOpenedFile);
+   }
+
+   /**
+    * @return
+    * @throws Exception
+    */
+   private JournalFile openNewFile() throws Exception
+   {
       JournalFile nextOpenedFile = null;
       try
       {
@@ -2209,8 +2570,7 @@
       {
          openFile(nextOpenedFile);
       }
-
-      openedFiles.offer(nextOpenedFile);
+      return nextOpenedFile;
    }
 
    private void closeFile(final JournalFile file)
@@ -2284,6 +2644,310 @@
       }
    }
 
+   /**
+    * This method encapsulates the Journal file reading, used by both compact and cleanup.
+    * 
+    * Instead of duplicating the method of reading the file, we have this method that could be used by both loading a cleanup.
+    * 
+    * @param file
+    * @param reader
+    * @return true if it has damaged data
+    * @throws Exception
+    */
+   private int readJournalFile(final JournalFile file, final JournalReader reader) throws Exception
+   {
+      ByteBuffer bb = fileFactory.newBuffer(fileSize);
+
+      file.getFile().read(bb);
+
+      int lastDataPos = SIZE_HEADER;
+
+      // First long is the ordering timestamp, we just jump its position
+      bb.position(SIZE_HEADER);
+
+      while (bb.hasRemaining())
+      {
+         final int recordPos = bb.position();
+
+         byte recordType = bb.get();
+
+         if (recordType < ADD_RECORD || recordType > LAST_RECORD_ID)
+         {
+            // I - We scan for any valid record on the file. If a hole
+            // happened on the middle of the file we keep looking until all
+            // the possibilities are gone
+            continue;
+         }
+
+         if (bb.position() + SIZE_INT > fileSize)
+         {
+            // II - Ignore this record, lets keep looking
+            continue;
+         }
+
+         // III - Every record has the file-id.
+         // This is what supports us from not re-filling the whole file
+         int readFileId = bb.getInt();
+
+         // IV - This record is from a previous file-usage. The file was
+         // reused and we need to ignore this record
+         if (readFileId != file.getOrderingID())
+         {
+            // If a file has damaged records, we make it a dataFile, and the
+            // next reclaiming will fix it
+            reader.markAsDataFile();
+
+            bb.position(recordPos + 1);
+
+            continue;
+         }
+
+         long transactionID = 0;
+
+         if (isTransaction(recordType))
+         {
+            if (bb.position() + SIZE_LONG > fileSize)
+            {
+               continue;
+            }
+
+            transactionID = bb.getLong();
+         }
+
+         long recordID = 0;
+
+         if (!isCompleteTransaction(recordType))
+         {
+            if (bb.position() + SIZE_LONG > fileSize)
+            {
+               continue;
+            }
+
+            recordID = bb.getLong();
+         }
+
+         // We use the size of the record to validate the health of the
+         // record.
+         // (V) We verify the size of the record
+
+         // The variable record portion used on Updates and Appends
+         int variableSize = 0;
+
+         // Used to hold extra data on transaction prepares
+         int preparedTransactionExtraDataSize = 0;
+
+         byte userRecordType = 0;
+
+         byte record[] = null;
+
+         if (isContainsBody(recordType))
+         {
+            if (bb.position() + SIZE_INT > fileSize)
+            {
+               continue;
+            }
+
+            variableSize = bb.getInt();
+
+            if (bb.position() + variableSize > fileSize)
+            {
+               log.warn("Record at position " + recordPos +
+                        " file:" +
+                        file.getFile().getFileName() +
+                        " is corrupted and it is being ignored");
+               continue;
+            }
+
+            if (recordType != DELETE_RECORD_TX)
+            {
+               userRecordType = bb.get();
+            }
+
+            record = new byte[variableSize];
+
+            bb.get(record);
+         }
+
+         if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+         {
+            if (recordType == PREPARE_RECORD)
+            {
+               // Add the variable size required for preparedTransactions
+               preparedTransactionExtraDataSize = bb.getInt();
+            }
+            // Both commit and record contain the recordSummary, and this is
+            // used to calculate the record-size on both record-types
+            variableSize += bb.getInt() * SIZE_INT * 2;
+         }
+
+         int recordSize = getRecordSize(recordType);
+
+         // VI - this is completing V, We will validate the size at the end
+         // of the record,
+         // But we avoid buffer overflows by damaged data
+         if (recordPos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
+         {
+            // Avoid a buffer overflow caused by damaged data... continue
+            // scanning for more records...
+            log.warn("Record at position " + recordPos +
+                     " file:" +
+                     file.getFile().getFileName() +
+                     " is corrupted and it is being ignored");
+            // If a file has damaged records, we make it a dataFile, and the
+            // next reclaiming will fix it
+            reader.markAsDataFile();
+
+            continue;
+         }
+
+         int oldPos = bb.position();
+
+         bb.position(recordPos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+
+         int checkSize = bb.getInt();
+
+         // VII - The checkSize at the end has to match with the size
+         // informed at the beggining.
+         // This is like testing a hash for the record. (We could replace the
+         // checkSize by some sort of calculated hash)
+         if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+         {
+            log.warn("Record at position " + recordPos +
+                     " file:" +
+                     file.getFile().getFileName() +
+                     " is corrupted and it is being ignored");
+
+            // If a file has damaged records, we make it a dataFile, and the
+            // next reclaiming will fix it
+            reader.markAsDataFile();
+
+            bb.position(recordPos + SIZE_BYTE);
+
+            continue;
+         }
+
+         bb.position(oldPos);
+
+         // At this point everything is checked. So we relax and just load
+         // the data now.
+
+         switch (recordType)
+         {
+            case ADD_RECORD:
+            {
+               reader.addRecord(recordPos, new RecordInfo(recordID, userRecordType, record, false));
+               break;
+            }
+            case CLEANED_ADD_RECORD:
+            {
+               reader.cleanedAddRecord(recordPos, new RecordInfo(recordID, userRecordType, record, false));
+               break;
+            }
+            case UPDATE_RECORD:
+            {
+               reader.updateRecord(recordPos, new RecordInfo(recordID, userRecordType, record, true));
+
+               break;
+            }
+            case CLEANED_UPDATE_RECORD:
+            {
+               reader.cleanedUpdateRecord(recordPos, new RecordInfo(recordID, userRecordType, record, true));
+
+               break;
+            }
+            case DELETE_RECORD:
+            {
+               reader.deleteRecord(recordPos, recordID);
+
+               break;
+            }
+
+            case CLEANED_ADD_RECORD_TX:
+            {
+               reader.cleanedAddRecordTX(recordPos, transactionID, new RecordInfo(recordID,
+                                                                                  userRecordType,
+                                                                                  record,
+                                                                                  false));
+               break;
+            }
+
+            case ADD_RECORD_TX:
+            {
+               reader.addRecordTX(recordPos, transactionID, new RecordInfo(recordID, userRecordType, record, false));
+               break;
+            }
+            case UPDATE_RECORD_TX:
+            {
+               reader.updateRecordTX(recordPos, transactionID, new RecordInfo(recordID, userRecordType, record, true));
+               break;
+            }
+            case CLEANED_UPDATE_RECORD_TX:
+            {
+               reader.cleanedUpdateRecordTX(recordPos, transactionID, new RecordInfo(recordID,
+                                                                                     userRecordType,
+                                                                                     record,
+                                                                                     true));
+               break;
+            }
+            case DELETE_RECORD_TX:
+            {
+               reader.deleteRecordTX(recordPos, transactionID, new RecordInfo(recordID, (byte)0, record, true));
+               break;
+            }
+            case PREPARE_RECORD:
+            {
+               byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+               bb.get(extraData);
+
+               byte[] summaryData = new byte[variableSize];
+               bb.get(summaryData);
+
+               reader.prepareRecord(recordPos, transactionID, extraData, summaryData);
+
+               break;
+            }
+            case COMMIT_RECORD:
+            {
+
+               byte[] summaryData = new byte[variableSize];
+               bb.get(summaryData);
+
+               reader.commitRecord(recordPos, transactionID, summaryData);
+
+               break;
+            }
+            case ROLLBACK_RECORD:
+            {
+               reader.rollbackRecord(recordPos, transactionID);
+               break;
+            }
+            default:
+            {
+               throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+                                               " is corrupt, invalid record type " +
+                                               recordType);
+            }
+
+         }
+
+         checkSize = bb.getInt();
+
+         // This is a sanity check about the loading code itself.
+         // If this checkSize doesn't match, it means the reading method is
+         // not doing what it was supposed to do
+         if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+         {
+            throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+         }
+
+         lastDataPos = bb.position();
+      }
+
+      return lastDataPos;
+
+   }
+
    public ByteBuffer newBuffer(final int size)
    {
       return buffersControl.newBuffer(size);
@@ -2359,18 +3023,38 @@
          updateFile.incPosCount();
       }
 
-      void addDelete(final JournalFile file)
+      void addDelete(final long id, final JournalFile deleteFile)
       {
-         file.incNegCount(addFile);
+         if (addFile != deleteFile)
+         {
+            addFile.addCleanupInfo(id, deleteFile);
+         }
 
+         deleteFile.incNegCount(addFile);
+
          if (updateFiles != null)
          {
-            for (JournalFile jf : updateFiles)
+            for (JournalFile updateF : updateFiles)
             {
-               file.incNegCount(jf);
+               if (addFile != updateF)
+               {
+
+                  // cleanup dependency between updateFile and deleteFile
+
+                  // Say you have this scenario: (A=Add, U=Update, D=Delete)
+                  // File1: A1
+                  // File2: U1
+                  // File3: D1
+
+                  // I need to cleanup the counter between D1 and A1, and the counter between D1 and U1
+                  updateF.addCleanupInfo(id, deleteFile);
+               }
+
+               deleteFile.incNegCount(updateF);
             }
          }
       }
+
    }
 
    /** Class that will control buffer-reuse */
@@ -2469,6 +3153,12 @@
          return numberOfElementsPerFile;
       }
 
+      // Used after cleanup records
+      public void addSummaryOnly(final JournalFile file)
+      {
+         getCounter(file).incrementAndGet();
+      }
+
       public void addPositive(final JournalFile file, final long id)
       {
          getCounter(file).incrementAndGet();
@@ -2526,7 +3216,7 @@
 
                if (posFiles != null)
                {
-                  posFiles.addDelete(n.a);
+                  posFiles.addDelete(n.b, n.a);
                }
             }
          }
@@ -2608,8 +3298,7 @@
       }
 
    }
-   
-   
+
    private class ByteArrayEncoding implements EncodingSupport
    {
 
@@ -2654,5 +3343,95 @@
 
    }
 
+   /**
+    * 
+    * 
+    * Abstraction used to read journal files.
+    *
+    *
+    */
+   private static interface JournalReader
+   {
+      void addRecord(int recordPos, RecordInfo info) throws Exception;
 
+      /**
+       * @param recordPos
+       * @param transactionID
+       * @param recordInfo
+       */
+      void cleanedUpdateRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param recordInfo
+       */
+      void cleanedUpdateRecord(int recordPos, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param recordPos
+       * @param transactionID
+       * @param recordInfo
+       */
+      void cleanedAddRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param recordPos
+       * @param recordInfo
+       */
+      void cleanedAddRecord(int recordPos, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param recordInfo
+       * @throws Exception 
+       */
+      void updateRecord(int recordPos, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param recordID
+       */
+      void deleteRecord(int recordPos, long recordID) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param recordInfo
+       * @throws Exception 
+       */
+      void addRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param recordInfo
+       * @throws Exception 
+       */
+      void updateRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param recordInfo
+       */
+      void deleteRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param extraData
+       * @param summaryData
+       */
+      void prepareRecord(int recordPos, long transactionID, byte[] extraData, byte[] summaryData) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param summaryData
+       */
+      void commitRecord(int recordPos, long transactionID, byte[] summaryData) throws Exception;
+
+      /**
+       * @param transactionID
+       */
+      void rollbackRecord(int recordPos, long transactionID) throws Exception;
+
+      /**
+       * 
+       */
+      void markAsDataFile();
+   }
+
 }

Modified: branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -44,6 +44,7 @@
  * <p>WIKI Page: <a href="http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming">http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming</a></p>
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 public class Reclaimer
@@ -57,14 +58,19 @@
       log.trace(message);
    }
 
-   public void scan(final JournalFile[] files)
+   /** Returns the number of files holding second criterion (linked-list effect) */
+   public int scan(final JournalFile[] files)
    {
+      int secondCriterionCount = 0;
+
       for (int i = 0; i < files.length; i++)
       {
-         // First we evaluate criterion 1)
+         // First we evaluate criterion 1) (Which is simple reference counting)
 
          JournalFile currentFile = files[i];
 
+         currentFile.setLinkedDependency(false);
+
          int posCount = currentFile.getPosCount();
 
          int totNeg = 0;
@@ -88,10 +94,14 @@
          }
 
          currentFile.setCanReclaim(true);
+         
+         // This attribute would be helpful on calculating % usage of a file..
+         // And it is also useful for debugging
+         currentFile.setTotalNegCount(totNeg);
 
          if (posCount <= totNeg)
          {
-            // Now we evaluate criterion 2)
+            // Now we evaluate criterion 2) (This file shouldn't have delete records on non reclaimable files)
 
             for (int j = 0; j <= i; j++)
             {
@@ -114,6 +124,11 @@
 
                      currentFile.setCanReclaim(false);
 
+                     // This file is holding currentFile from being reclaimed, hence we set it as a linked dependency
+                     file.setLinkedDependency(true);
+
+                     secondCriterionCount++;
+
                      break;
                   }
                }
@@ -124,5 +139,7 @@
             currentFile.setCanReclaim(false);
          }
       }
+
+      return secondCriterionCount;
    }
 }

Added: branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java	                        (rev 0)
+++ branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.journal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.integration.xa.BasicXaTest;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A JournalCleanupIntegrationTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Feb 13, 2009 6:55:48 PM
+ *
+ *
+ */
+public class JournalCleanupIntegrationTest extends ServiceTestBase
+{
+   private static Logger log = Logger.getLogger(BasicXaTest.class);
+
+   private final Map<String, AddressSettings> addressSettings = new HashMap<String, AddressSettings>();
+
+   private MessagingService messagingService;
+
+   private ClientSessionFactory sessionFactory;
+
+   private Configuration configuration;
+
+   private final SimpleString a1 = new SimpleString("a1");
+
+   private final SimpleString a2 = new SimpleString("a2");
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      clearData();
+      addressSettings.clear();
+      configuration = createDefaultConfig();
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setPagingDirectory(getPageDir());
+
+      messagingService = createService(true, configuration, addressSettings);
+
+      // start the server
+      messagingService.start();
+
+      sessionFactory = createInVMFactory();
+      ClientSession clientSession;
+      clientSession = sessionFactory.createSession(true, false, false);
+
+      clientSession.createQueue(a1, a1, null, true, true);
+      clientSession.createQueue(a2, a2, null, true, true);
+      clientSession.close();
+   }
+
+   protected void tearDown() throws Exception
+   {
+
+      if (messagingService != null && messagingService.isStarted())
+      {
+         messagingService.stop();
+      }
+
+      super.tearDown();
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testAutoCleanup() throws Exception
+   {
+      ClientSession clientSession = sessionFactory.createSession(false, true, true);
+
+      ClientSession sessionConsumer = sessionFactory.createSession(null, null, false, true, true, false, 0);
+
+      int NUMBER_OF_MESSAGES = 6000;
+
+      CountDownLatch latch = new CountDownLatch(NUMBER_OF_MESSAGES);
+
+      try
+      {
+         ClientProducer prod = clientSession.createProducer(a1);
+         prod.send(createTextMessage(clientSession, "hello"));
+         prod.close();
+
+         ClientConsumer cons = sessionConsumer.createConsumer(a2);
+
+         sessionConsumer.start();
+
+         LocalHandler handler = new LocalHandler(latch);
+
+         cons.setMessageHandler(handler);
+
+         prod = clientSession.createProducer(a2);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+         {
+            prod.send(createBytesMessage(clientSession, new byte[1024], true));
+         }
+
+         latch.await(10, TimeUnit.SECONDS);
+
+         if (handler.e != null)
+         {
+            throw handler.e;
+         }
+
+         JournalStorageManager storage = (JournalStorageManager)messagingService.getServer().getStorageManager();
+         JournalImpl journal = (JournalImpl)storage.getMessageJournal();
+
+         // The cleanup is asynchronous, we keep trying the condition until a timeout of 15 seconds
+         for (long timeout = System.currentTimeMillis() + 15000; timeout > System.currentTimeMillis();)
+         {
+            // Wait the current task to finish before we test the condition again
+            journal.debugWait();
+
+            if (journal.getDataFilesCount() <= 5)
+            {
+               break;
+            }
+         }
+
+         assertTrue("DataFilesCount supposed to be less than 5, but it was "  + journal.getDataFilesCount(), journal.getDataFilesCount() <= 5);
+
+         
+         cons.close();
+         cons = sessionConsumer.createConsumer(a1);
+         
+         ClientMessage mess = cons.receive(1000);
+         
+         assertNotNull(mess);
+         mess.acknowledge();
+         sessionConsumer.commit();
+         
+         journal.forceMoveNextFile();
+         journal.checkAndReclaimFiles();
+
+         assertEquals(0, journal.getDataFilesCount());
+      }
+      finally
+      {
+         clientSession.close();
+         sessionConsumer.close();
+      }
+
+   }
+
+   class LocalHandler implements MessageHandler
+   {
+      Exception e;
+
+      CountDownLatch latch;
+
+      LocalHandler(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         try
+         {
+            message.acknowledge();
+            latch.countDown();
+         }
+         catch (MessagingException e)
+         {
+            this.e = e;
+         }
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -3019,6 +3019,552 @@
    }
 
 
+   public void testCleanupNonTransactional() throws Exception
+   {
+      setup(2, 20 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      add(1, 2, 3, 4, 5, 6, 7, 8, 9);
+      add(50);
+      delete(50);
+      System.out.println("Data = " + journal.getDataFilesCount());
+
+      journal.forceMoveNextFile();
+
+      update(1, 2, 3, 4, 5, 6, 7, 8);
+
+      journal.forceMoveNextFile();
+
+      System.out.println("Data = " + journal.getDataFilesCount());
+
+      delete(1, 2, 3, 4, 5, 6, 7, 8);
+
+      add(10, 11, 12, 13, 14, 15);
+      System.out.println("Data = " + journal.getDataFilesCount());
+
+      journal.forceMoveNextFile();
+
+      System.out.println("Data = " + journal.getDataFilesCount());
+      delete(10, 11, 12, 13, 14, 15);
+
+      System.out.println("Data = " + journal.getDataFilesCount());
+      journal.checkAndReclaimFiles();
+      System.out.println("Data = " + journal.getDataFilesCount());
+
+      System.out.println("Before ********************************************");
+      System.out.println(journal.debug());
+
+      journal.forceMoveNextFile();
+
+      System.out.println("After  ********************************************");
+
+      System.out.println("Journal: " + journal.debug());
+
+      journal.checkAndReclaimFiles();
+      System.out.println("Data = " + journal.getDataFilesCount());
+
+      log.debug("Debug on Journal before stopJournal - \n" + debugJournal());
+
+      journal.cleanup(1);
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      assertEquals(0, journal.getDataFilesCount());
+
+      add(99);
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      assertEquals(0, journal.getDataFilesCount());
+
+      delete(99);
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      assertEquals(0, journal.getDataFilesCount());
+
+   }
+
+   public void testCleanupTransactional() throws Exception
+   {
+      setup(2, 20 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      // File 1
+      {
+         addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+         System.out.println("Data = " + journal.getDataFilesCount());
+
+         // 9 positives
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // File 2
+      {
+         addTx(1, 10, 11, 12, 13, 14, 15);
+         commit(1);
+         add(16);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 2: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // File 3
+      {
+         delete(16);
+         update(1, 2, 3, 4, 5, 6, 7, 8);
+         delete(10, 11, 12, 13, 14, 15);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 3: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // file 4
+      {
+         delete(1, 2, 3, 4, 5, 6, 7, 8);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 4: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      journal.cleanup(1);
+
+      System.out.println("After Cleanup 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+       journal.cleanup(2);
+      
+
+      journal.checkAndReclaimFiles();
+      System.out.println("After Reclaim on Cleanup 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      System.out.println("After reload ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // assertEquals(2, journal.getDataFilesCount());
+
+      delete(9);
+
+      journal.forceMoveNextFile();
+
+      journal.checkAndReclaimFiles();
+
+      System.out.println("journal = " + journal.getDataFilesCount());
+
+      add(100);
+      update(100);
+      delete(100);
+
+      journal.forceMoveNextFile();
+
+      System.out.println("After add-update-delete on same file ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      journal.checkAndReclaimFiles();
+
+      System.out.println("Final ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+   }
+
+   public void testCleanupWithDeleteUpdatesDifferentFiles() throws Exception
+   {
+      setup(2, 20 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      // File 1
+      {
+         addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+         commit(1);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // File 2
+      {
+         updateTx(2, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+         commit(2);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 2: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // File 3
+      {
+         delete(1, 2, 3, 4, 5, 6, 7, 8);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("Holding for secondLevel = " + journal.checkAndReclaimFiles());
+
+      assertTrue(journal.getJournalFile(1).isLinkedDependency());
+      assertFalse(journal.getJournalFile(2).isLinkedDependency());
+      assertFalse(journal.getJournalFile(3).isLinkedDependency());
+
+      System.out.println("File 3: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      journal.cleanup(1);
+
+      System.out.println("After Cleanup 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      journal.checkAndReclaimFiles();
+
+      delete(9);
+
+      journal.forceMoveNextFile();
+
+      journal.checkAndReclaimFiles();
+
+      System.out.println("Final ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   public void testCleanupWithoutDeleteUpdatesDifferentFiles() throws Exception
+   {
+      setup(2, 20 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      // File 1
+      {
+         addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+         commit(1);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // File 2
+      {
+         updateTx(2, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+         commit(2);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 2: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // File 3
+      {
+         delete(1, 2, 3, 4, 5, 6, 7, 8);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 3: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      journal.cleanup(1);
+
+      System.out.println("After Cleanup 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      journal.cleanup(2);
+
+      journal.forceMoveNextFile();
+
+      journal.checkAndReclaimFiles();
+
+      System.out.println("Final ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      assertEquals(1, journal.getDataFilesCount());
+
+      delete(9);
+
+      journal.forceMoveNextFile();
+
+      journal.checkAndReclaimFiles();
+
+      assertEquals(0, journal.getDataFilesCount());
+
+      System.out.println("data:" + journal.getDataFilesCount());
+
+      System.out.println("After Delete ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   public void testCleanupWithUpdatesSameFiles() throws Exception
+   {
+      setup(2, 20 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      // File 1
+      {
+         addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+         commit(1);
+         update(1, 2, 3, 4, 5, 6, 7, 8);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      // File 2
+      {
+         delete(1, 2, 3, 4, 5, 6, 7, 8);
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("File 2: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      journal.cleanup(1);
+
+      System.out.println("After Cleanup 1: ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+      journal.forceMoveNextFile();
+      add(20);
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      System.out.println("Final ********************************************");
+      System.out.println(journal.debug());
+      System.out.println("***************************************************");
+
+   }
+
+   public void testCleanupWholeFile() throws Exception
+   {
+      setup(2, 20 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      add(50);
+      delete(50);
+
+      addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+      System.out.println("Data = " + journal.getDataFilesCount());
+
+      journal.forceMoveNextFile();
+
+      addTx(1, 10, 11, 12, 13, 14, 15);
+
+      commit(1);
+
+      journal.forceMoveNextFile();
+
+      delete(1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+      journal.forceMoveNextFile();
+
+      journal.checkAndReclaimFiles();
+
+      assertEquals(1, journal.getDataFilesCount());
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      delete(10, 11, 12, 13, 14, 15);
+      journal.checkAndReclaimFiles();
+      assertEquals(0, journal.getDataFilesCount());
+
+      System.out.println("Second one **********************************");
+      System.out.println(journal.debug());
+
+      assertEquals(0, journal.getDataFilesCount());
+
+   }
+
+   public void testAutomaticCleanup() throws Exception
+   {
+      setup(2, 3 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      add(50);
+      delete(50);
+
+      addTx(1, 1, 2);
+      journal.forceMoveNextFile();
+      updateTx(1, 1, 2);
+      journal.forceMoveNextFile();
+      commit(1);
+      System.out.println("Data = " + journal.getDataFilesCount());
+
+      journal.forceMoveNextFile();
+
+      delete(1);
+
+      for (int i = 10; i < 20; i++)
+      {
+         add(i);
+         journal.forceMoveNextFile();
+         update(i);
+         journal.forceMoveNextFile();
+         delete(i);
+      }
+
+      journal.forceMoveNextFile();
+
+      add(100);
+      update(100);
+
+      for (int i = 101; i < 120; i++)
+      {
+         add(i);
+         journal.forceMoveNextFile();
+         update(i);
+         journal.forceMoveNextFile();
+         delete(i);
+      }
+
+      journal.checkAndReclaimFiles();
+
+      journal.setAutoReclaim(true);
+      journal.forceMoveNextFile();
+
+      
+      // The cleanup is asynchronous, we keep trying the condition until a timeout of 5 seconds
+      for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis();)
+      {
+         // Wait the current task to finish before we test the condition again
+         journal.debugWait();
+
+         if (journal.getDataFilesCount() == 4)
+         {
+            break;
+         }
+      }
+
+      assertEquals(4, journal.getDataFilesCount());
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      delete(2, 100);
+      
+      // moving to the next file, so any deletes will already make way on reclaiming
+      journal.forceMoveNextFile();
+
+      journal.checkAndReclaimFiles();
+
+      assertEquals(0, journal.getDataFilesCount());
+
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      assertEquals(0, journal.getDataFilesCount());
+
+   }
+
    protected abstract int getAlignment();
 
 }

Modified: branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-02-18 02:00:12 UTC (rev 5886)
+++ branches/BRANCH_JBMESSAGING_1427/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-02-18 02:07:44 UTC (rev 5887)
@@ -24,6 +24,7 @@
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -32,6 +33,7 @@
 import org.jboss.messaging.core.journal.impl.Reclaimer;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.Pair;
 
 /**
  * 
@@ -748,6 +750,8 @@
 
       private boolean canDelete;
 
+      private boolean linkedDependency;
+
       public void extendOffset(final int delta)
       {
       }
@@ -794,6 +798,18 @@
          negCounts.put(file, c);
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#decNegCount(org.jboss.messaging.core.journal.impl.JournalFile)
+       */
+      public void decNegCount(JournalFile file)
+      {
+         Integer count = negCounts.get(file);
+
+         int c = count == null ? 1 : count.intValue() - 1;
+
+         negCounts.put(file, c);
+      }
+
       public int getPosCount()
       {
          return posCount;
@@ -863,5 +879,54 @@
       {
          return transactionIDs;
       }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#getTotalNegCount()
+       */
+      public int getTotalNegCount()
+      {
+         return 0;
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#setTotalNegCount(int)
+       */
+      public void setTotalNegCount(int total)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#addCleanupInfo(long, org.jboss.messaging.core.journal.impl.JournalFile)
+       */
+      public void addCleanupInfo(long id, JournalFile deleteFile)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#getCleanupInfo(long)
+       */
+      public JournalFile getCleanupInfo(long id)
+      {
+         return null;
+      }
+      
+      /**
+       * @return the linkedDependency
+       */
+      public boolean isLinkedDependency()
+      {
+         return linkedDependency;
+      }
+
+      /**
+       * @param linkedDependency the linkedDependency to set
+       */
+      public void setLinkedDependency(boolean linkedDependency)
+      {
+         this.linkedDependency = linkedDependency;
+      }
+
+      
+
    }
 }




More information about the jboss-cvs-commits mailing list