[jboss-cvs] JBoss Messaging SVN: r7496 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 29 14:02:57 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-29 14:02:57 -0400 (Mon, 29 Jun 2009)
New Revision: 7496

Added:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/TransactionCallback.java
Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Log:
Separating Compactor and JournalTransaction as a separate class

Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	                        (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-29 18:02:57 UTC (rev 7496)
@@ -0,0 +1,366 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.buffers.ChannelBuffer;
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl.JournalRecord;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.Pair;
+
+/**
+ * A JournalCompactor
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCompactor implements JournalReaderCallback
+{
+
+   private static final Logger log = Logger.getLogger(JournalCompactor.class);
+
+   final JournalImpl journal;
+
+   final SequentialFileFactory fileFactory;
+
+   JournalFile currentFile;
+
+   SequentialFile sequentialFile;
+
+   int fileID;
+
+   ChannelBuffer channelWrapper;
+
+   int nextOrderingID;
+
+   final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
+
+   final Map<Long, JournalRecord> recordsSnapshot;
+
+   final Map<Long, JournalTransaction> pendingTransactions;
+
+   final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
+
+   final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
+
+   final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
+
+   final LinkedList<Pair<Long, JournalFile>> pendingDeletes = new LinkedList<Pair<Long, JournalFile>>();
+
+   public JournalCompactor(final SequentialFileFactory fileFactory,
+                           final JournalImpl journal,
+                           Map<Long, JournalRecord> recordsSnapshot,
+                           Map<Long, JournalTransaction> pendingTransactions,
+                           int firstFileID)
+   {
+      this.fileFactory = fileFactory;
+      this.journal = journal;
+      this.recordsSnapshot = recordsSnapshot;
+      this.nextOrderingID = firstFileID;
+      this.pendingTransactions = pendingTransactions;
+   }
+
+   /**
+    * @param id
+    * @param usedFile
+    */
+   public void addPendingDelete(long id, JournalFile usedFile)
+   {
+      pendingDeletes.add(new Pair<Long, JournalFile>(id, usedFile));
+   }
+
+   /**
+    * @param id
+    * @param usedFile
+    */
+   public void addPendingUpdate(long id, JournalFile usedFile)
+   {
+      pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));
+   }
+
+   public boolean lookupRecord(long id)
+   {
+      return recordsSnapshot.get(id) != null;
+   }
+
+   private void checkSize(int size) throws Exception
+   {
+      if (channelWrapper == null)
+      {
+         openFile();
+      }
+      else
+      {
+         if (channelWrapper.writerIndex() + size > channelWrapper.capacity())
+         {
+            openFile();
+         }
+      }
+   }
+
+   public void flush() throws Exception
+   {
+      if (channelWrapper != null)
+      {
+         sequentialFile.position(0);
+         sequentialFile.write(channelWrapper.toByteBuffer(), true);
+         sequentialFile.close();
+         newDataFiles.add(currentFile);
+      }
+
+      channelWrapper = null;
+   }
+
+   /**
+    * @throws Exception
+    */
+   private void openFile() throws Exception
+   {
+      flush();
+
+      ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
+      channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
+
+      currentFile = journal.getFile(false, false);
+      sequentialFile = currentFile.getFile();
+      sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+      sequentialFile.open(1);
+      fileID = nextOrderingID++;
+      currentFile = new JournalFileImpl(sequentialFile, fileID, fileID);
+
+      channelWrapper.writeInt(fileID);
+      channelWrapper.writeInt(fileID);
+   }
+
+   public void addRecord(RecordInfo info) throws Exception
+   {
+      if (recordsSnapshot.get(info.id) != null)
+      {
+         int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
+
+         checkSize(size);
+
+         journal.writeAddRecord(fileID,
+                                info.id,
+                                info.getUserRecordType(),
+                                new JournalImpl.ByteArrayEncoding(info.data),
+                                size,
+                                channelWrapper);
+
+         newRecords.put(info.id, new JournalRecord(currentFile));
+      }
+   }
+
+   public void addRecordTX(long transactionID, RecordInfo info) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null)
+      {
+         JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+         int size = JournalImpl.SIZE_ADD_RECORD_TX + info.data.length;
+
+         checkSize(size);
+
+         newTransaction.addPositive(currentFile, info.id);
+
+         journal.writeAddRecordTX(fileID,
+                                  transactionID,
+                                  info.id,
+                                  info.getUserRecordType(),
+                                  new JournalImpl.ByteArrayEncoding(info.data),
+                                  size,
+                                  channelWrapper);
+      }
+      else
+      {
+         // Will try it as a regular record, the method addRecord will validate if this is a live record or not
+         addRecord(info);
+      }
+   }
+
+   public void commitRecord(long transactionID, int numberOfRecords) throws Exception
+   {
+      JournalTransaction pendingTx = pendingTransactions.get(transactionID);
+
+      if (pendingTx != null)
+      {
+         // Sanity check, this should never happen
+         throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
+                                         " for an already committed transaction during compacting");
+      }
+   }
+
+   public void deleteRecord(long recordID) throws Exception
+   {
+      if (newRecords.get(recordID) != null)
+      {
+         // Sanity check, it should never happen
+         throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record");
+      }
+
+   }
+
+   public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null)
+      {
+         JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+         int size = JournalImpl.SIZE_DELETE_RECORD_TX + info.data.length;
+
+         checkSize(size);
+
+         journal.writeDeleteRecordTransactional(fileID,
+                                                transactionID,
+                                                info.id,
+                                                new JournalImpl.ByteArrayEncoding(info.data),
+                                                size,
+                                                channelWrapper);
+
+         newTransaction.addNegative(currentFile, info.id);
+      }
+   }
+
+   public void markAsDataFile(JournalFile file)
+   {
+      // nothing to be done here
+   }
+
+   public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null)
+      {
+
+         JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+         int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
+
+         checkSize(size);
+
+         journal.writeTransaction(fileID,
+                                  JournalImpl.PREPARE_RECORD,
+                                  transactionID,
+                                  newTransaction,
+                                  new JournalImpl.ByteArrayEncoding(extraData),
+                                  size,
+                                  newTransaction.getCounter(currentFile),
+                                  channelWrapper);
+
+         newTransaction.prepare(currentFile);
+
+      }
+   }
+
+   public void rollbackRecord(long transactionID) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null)
+      {
+         // Sanity check, this should never happen
+         throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+                                         " for an already rolled back transaction during compacting");
+      }
+   }
+
+   public void updateRecord(RecordInfo info) throws Exception
+   {
+      if (recordsSnapshot.get(info.id) != null)
+      {
+         int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
+
+         checkSize(size);
+
+         JournalRecord newRecord = newRecords.get(info.id);
+
+         if (newRecord == null)
+         {
+            log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
+         }
+
+         journal.writeUpdateRecord(fileID,
+                                   info.id,
+                                   info.userRecordType,
+                                   new JournalImpl.ByteArrayEncoding(info.data),
+                                   size,
+                                   channelWrapper);
+
+         newRecord.addUpdateFile(currentFile);
+
+      }
+   }
+
+   public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
+   {
+
+      if (pendingTransactions.get(transactionID) != null)
+      {
+         JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+         int size = JournalImpl.SIZE_UPDATE_RECORD_TX + info.data.length;
+
+         checkSize(size);
+
+         journal.writeUpdateRecordTX(fileID,
+                                     transactionID,
+                                     info.id,
+                                     info.userRecordType,
+                                     new JournalImpl.ByteArrayEncoding(info.data),
+                                     size,
+                                     channelWrapper);
+
+         newTransaction.addPositive(currentFile, info.id);
+      }
+      else
+      {
+
+         updateRecord(info);
+      }
+   }
+
+   /**
+    * @param transactionID
+    * @return
+    */
+   private JournalTransaction getNewJournalTransaction(long transactionID)
+   {
+      JournalTransaction newTransaction = newTransactions.get(transactionID);
+      if (newTransaction == null)
+      {
+         newTransaction = new JournalTransaction(journal);
+         newTransactions.put(transactionID, newTransaction);
+      }
+      return newTransaction;
+   }
+
+}

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-29 14:55:19 UTC (rev 7495)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-29 18:02:57 UTC (rev 7496)
@@ -26,11 +26,9 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -53,7 +51,6 @@
 
 import org.jboss.messaging.core.buffers.ChannelBuffer;
 import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.LoaderCallback;
@@ -64,8 +61,8 @@
 import org.jboss.messaging.core.journal.TestableJournal;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.DataConstants;
 import org.jboss.messaging.utils.Pair;
-import org.jboss.messaging.utils.VariableLatch;
 import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
 
 /**
@@ -115,47 +112,41 @@
 
    // The sizes of primitive types
 
-   private static final int SIZE_LONG = 8;
-
-   private static final int SIZE_INT = 4;
-
-   private static final int SIZE_BYTE = 1;
-
    public static final int MIN_FILE_SIZE = 1024;
 
-   public static final int SIZE_HEADER = SIZE_INT * 2;
+   public static final int SIZE_HEADER = DataConstants.SIZE_INT * 2;
 
-   public static final int BASIC_SIZE = SIZE_BYTE + SIZE_INT + SIZE_INT;
+   public static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
 
-   public static final int SIZE_ADD_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_INT /* + record.length */;
+   public static final int SIZE_ADD_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT /* + record.length */;
 
    // Record markers - they must be all unique
 
    public static final byte ADD_RECORD = 11;
 
-   public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_INT /* + record.length */;
+   public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT /* + record.length */;
 
    public static final byte UPDATE_RECORD = 12;
 
-   public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT /* + record.length */;
+   public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
 
    public static final byte ADD_RECORD_TX = 13;
 
-   public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT /* + record.length */;
+   public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
 
    public static final byte UPDATE_RECORD_TX = 14;
 
-   public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_LONG + SIZE_INT /* + record.length */;
+   public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
 
    public static final byte DELETE_RECORD_TX = 15;
 
-   public static final int SIZE_DELETE_RECORD = BASIC_SIZE + SIZE_LONG;
+   public static final int SIZE_DELETE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG;
 
    public static final byte DELETE_RECORD = 16;
 
-   public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_INT;
+   public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
 
-   public static final int SIZE_PREPARE_RECORD = SIZE_COMPLETE_TRANSACTION_RECORD + SIZE_INT;
+   public static final int SIZE_PREPARE_RECORD = SIZE_COMPLETE_TRANSACTION_RECORD + DataConstants.SIZE_INT;
 
    public static final byte PREPARE_RECORD = 17;
 
@@ -163,7 +154,7 @@
 
    public static final byte COMMIT_RECORD = 18;
 
-   public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + SIZE_LONG;
+   public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG;
 
    public static final byte ROLLBACK_RECORD = 19;
 
@@ -200,8 +191,8 @@
    // Compacting may replace this structure
    private volatile ConcurrentMap<Long, JournalTransaction> pendingTransactions = new ConcurrentHashMap<Long, JournalTransaction>();
 
-   // This will be filled only while the Compactor is being done
-   private volatile Compactor compactor;
+   // This will be set only while the JournalCompactor is being executed
+   private volatile JournalCompactor compactor;
 
    private ExecutorService filesExecutor = null;
 
@@ -268,6 +259,13 @@
 
       this.maxAIO = maxAIO;
    }
+   
+   // Public methods (used by package members) (those are not part of the JournalImpl interface)
+   
+   public Map<Long, JournalRecord> getRecords()
+   {
+      return records;
+   }
 
    // Journal implementation
    // ----------------------------------------------------------------
@@ -363,7 +361,8 @@
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
 
-            // record== null here could only mean there is a compactor, and computing the delete should be done after compacting is done
+            // record== null here could only mean there is a compactor, and computing the delete should be done after
+            // compacting is done
             if (posFiles == null)
             {
                compactor.addPendingUpdate(id, usedFile);
@@ -429,7 +428,8 @@
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
 
-            // record== null here could only mean there is a compactor, and computing the delete should be done after compacting is done
+            // record== null here could only mean there is a compactor, and computing the delete should be done after
+            // compacting is done
             if (record == null)
             {
                compactor.addPendingDelete(id, usedFile);
@@ -623,7 +623,7 @@
 
       if (sync)
       {
-         tx.syncPreviousFiles();
+         tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
       }
 
       compactingLock.readLock().lock();
@@ -631,7 +631,7 @@
       try
       {
 
-         int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + SIZE_INT;
+         int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + DataConstants.SIZE_INT;
          ChannelBuffer bb = newBuffer(size);
 
          writeTransaction(-1, PREPARE_RECORD, txID, tx, transactionData, size, -1, bb);
@@ -855,7 +855,13 @@
 
             recordsSnapshot = JournalImpl.this.records;
             pendingTransactions = JournalImpl.this.pendingTransactions;
+            pendingTransactions.putAll(this.pendingTransactions);
 
+            for (Map.Entry<Long, JournalTransaction> entry : pendingTransactions.entrySet())
+            {
+               System.out.println("TransactionID = " + entry.getKey());
+            }
+
             JournalImpl.this.records = new ConcurrentHashMap<Long, JournalRecord>();
 
             records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -864,7 +870,7 @@
 
             dataFiles.clear();
 
-            this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
+            this.compactor = new JournalCompactor(fileFactory, this, recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
 
          }
          finally
@@ -872,7 +878,7 @@
             compactingLock.writeLock().unlock();
          }
 
-         // Read the files, and use the Compactor class to create the new outputFiles, and the new collections as well
+         // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as well
          JournalFile previousFile = null;
          for (final JournalFile file : dataFilesToProcess)
          {
@@ -921,8 +927,8 @@
                JournalRecord updateRecord = this.records.get(pendingRecord.a);
                updateRecord.addUpdateFile(pendingRecord.b);
             }
-            
-            for (Pair<Long, JournalFile> pendingRecord: compactor.pendingDeletes)
+
+            for (Pair<Long, JournalFile> pendingRecord : compactor.pendingDeletes)
             {
                JournalRecord deleteRecord = this.records.remove(pendingRecord.a);
                deleteRecord.delete(pendingRecord.b);
@@ -946,6 +952,30 @@
       }
       finally
       {
+         // An Exception was probably thrown, and the compactor was not cleared
+         if (compactor != null)
+         {
+            try
+            {
+               compactor.flush();
+            }
+            catch (Throwable ignored)
+            {
+            }
+
+            if (compactor.sequentialFile != null)
+            {
+               try
+               {
+                  compactor.sequentialFile = null;
+               }
+               catch (Throwable ignored)
+               {
+               }
+            }
+
+            compactor = null;
+         }
          autoReclaim = previousReclaimValue;
       }
 
@@ -1025,315 +1055,6 @@
       return tmpRenameFile;
    }
 
-   class Compactor implements JournalReaderCallback
-   {
-
-      JournalFile currentFile;
-
-      SequentialFile sequentialFile;
-
-      int fileID;
-
-      ChannelBuffer channelWrapper;
-
-      int nextOrderingID;
-
-      final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
-
-      final Map<Long, JournalRecord> recordsSnapshot;
-
-      final Map<Long, JournalTransaction> pendingTransactions;
-
-      final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
-
-      final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
-
-      final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
-
-      final LinkedList<Pair<Long, JournalFile>> pendingDeletes = new LinkedList<Pair<Long, JournalFile>>();
-
-      public Compactor(Map<Long, JournalRecord> recordsSnapshot,
-                       Map<Long, JournalTransaction> pendingTransactions,
-                       int firstFileID)
-      {
-         this.recordsSnapshot = recordsSnapshot;
-         this.nextOrderingID = firstFileID;
-         this.pendingTransactions = pendingTransactions;
-      }
-
-      /**
-       * @param id
-       * @param usedFile
-       */
-      public void addPendingDelete(long id, JournalFile usedFile)
-      {
-         pendingDeletes.add(new Pair<Long,JournalFile>(id, usedFile));        
-      }
-
-      /**
-       * @param id
-       * @param usedFile
-       */
-      public void addPendingUpdate(long id, JournalFile usedFile)
-      {
-         pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));
-      }
-
-      public boolean lookupRecord(long id)
-      {
-         return recordsSnapshot.get(id) != null;
-      }
-
-      private void checkSize(int size) throws Exception
-      {
-         if (channelWrapper == null)
-         {
-            openFile();
-         }
-         else
-         {
-            if (channelWrapper.writerIndex() + size > channelWrapper.capacity())
-            {
-               openFile();
-            }
-         }
-      }
-
-      public void flush() throws Exception
-      {
-         if (channelWrapper != null)
-         {
-            sequentialFile.position(0);
-            sequentialFile.write(channelWrapper.toByteBuffer(), true);
-            sequentialFile.close();
-            newDataFiles.add(currentFile);
-         }
-
-         channelWrapper = null;
-      }
-
-      /**
-       * @throws Exception
-       */
-      private void openFile() throws Exception
-      {
-         flush();
-
-         ByteBuffer bufferWrite = fileFactory.newBuffer(fileSize);
-         channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
-
-         currentFile = getFile(false, false);
-         sequentialFile = currentFile.getFile();
-         sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
-         sequentialFile.open(1);
-         fileID = nextOrderingID++;
-         currentFile = new JournalFileImpl(sequentialFile, fileID, fileID);
-
-         channelWrapper.writeInt(fileID);
-         channelWrapper.writeInt(fileID);
-      }
-
-      public void addRecord(RecordInfo info) throws Exception
-      {
-         if (recordsSnapshot.get(info.id) != null)
-         {
-            int size = SIZE_ADD_RECORD + info.data.length;
-
-            checkSize(size);
-
-            writeAddRecord(fileID,
-                           info.id,
-                           info.getUserRecordType(),
-                           new ByteArrayEncoding(info.data),
-                           size,
-                           channelWrapper);
-
-            newRecords.put(info.id, new JournalRecord(currentFile));
-         }
-      }
-
-      public void addRecordTX(long transactionID, RecordInfo info) throws Exception
-      {
-         if (pendingTransactions.get(transactionID) != null)
-         {
-            JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
-            int size = SIZE_ADD_RECORD_TX + info.data.length;
-
-            checkSize(size);
-
-            newTransaction.addPositive(currentFile, info.id);
-
-            writeAddRecordTX(fileID,
-                             transactionID,
-                             info.id,
-                             info.getUserRecordType(),
-                             new ByteArrayEncoding(info.data),
-                             size,
-                             channelWrapper);
-         }
-         else
-         {
-            // Will try it as a regular record, the method addRecord will validate if this is a live record or not
-            addRecord(info);
-         }
-      }
-
-      public void commitRecord(long transactionID, int numberOfRecords) throws Exception
-      {
-         JournalTransaction pendingTx = pendingTransactions.get(transactionID);
-
-         if (pendingTx != null)
-         {
-            // Sanity check, this should never happen
-            throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
-                                            " for an already committed transaction during compacting");
-         }
-      }
-
-      public void deleteRecord(long recordID) throws Exception
-      {
-         if (records.get(recordID) != null)
-         {
-            // Sanity check, it should never happen
-            throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record");
-         }
-
-      }
-
-      public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
-      {
-         if (pendingTransactions.get(transactionID) != null)
-         {
-            JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
-            int size = SIZE_DELETE_RECORD_TX + info.data.length;
-
-            checkSize(size);
-
-            writeDeleteRecordTransactional(fileID,
-                                           transactionID,
-                                           info.id,
-                                           new ByteArrayEncoding(info.data),
-                                           size,
-                                           channelWrapper);
-
-            newTransaction.addNegative(currentFile, info.id);
-         }
-      }
-
-      public void markAsDataFile(JournalFile file)
-      {
-         // nothing to be done here
-      }
-
-      public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
-      {
-         if (pendingTransactions.get(transactionID) != null)
-         {
-
-            JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
-            int size = SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + SIZE_INT;
-
-            checkSize(size);
-
-            writeTransaction(fileID,
-                             PREPARE_RECORD,
-                             transactionID,
-                             newTransaction,
-                             new ByteArrayEncoding(extraData),
-                             size,
-                             newTransaction.getCounter(currentFile),
-                             channelWrapper);
-
-            newTransaction.prepare(currentFile);
-
-         }
-      }
-
-      public void rollbackRecord(long transactionID) throws Exception
-      {
-         if (pendingTransactions.get(transactionID) != null)
-         {
-            // Sanity check, this should never happen
-            throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
-                                            " for an already rolled back transaction during compacting");
-         }
-      }
-
-      public void updateRecord(RecordInfo info) throws Exception
-      {
-         if (recordsSnapshot.get(info.id) != null)
-         {
-            int size = SIZE_UPDATE_RECORD + info.data.length;
-
-            checkSize(size);
-
-            JournalRecord newRecord = newRecords.get(info.id);
-
-            if (newRecord == null)
-            {
-               log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
-            }
-
-            writeUpdateRecord(fileID,
-                              info.id,
-                              info.userRecordType,
-                              new ByteArrayEncoding(info.data),
-                              size,
-                              channelWrapper);
-
-            newRecord.addUpdateFile(currentFile);
-
-         }
-      }
-
-      public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
-      {
-
-         if (pendingTransactions.get(transactionID) != null)
-         {
-            JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
-            int size = SIZE_UPDATE_RECORD_TX + info.data.length;
-
-            checkSize(size);
-
-            writeUpdateRecordTX(fileID,
-                                transactionID,
-                                info.id,
-                                info.userRecordType,
-                                new ByteArrayEncoding(info.data),
-                                size,
-                                channelWrapper);
-
-            newTransaction.addPositive(currentFile, info.id);
-         }
-         else
-         {
-
-            updateRecord(info);
-         }
-      }
-
-      /**
-       * @param transactionID
-       * @return
-       */
-      private JournalTransaction getNewJournalTransaction(long transactionID)
-      {
-         JournalTransaction newTransaction = newTransactions.get(transactionID);
-         if (newTransaction == null)
-         {
-            newTransaction = new JournalTransaction();
-            newTransactions.put(transactionID, newTransaction);
-         }
-         return newTransaction;
-      }
-
-   }
-
    private boolean isInvalidSize(int bufferPos, int size)
    {
       if (size < 0)
@@ -1490,7 +1211,7 @@
 
                if (tnp == null)
                {
-                  tnp = new JournalTransaction();
+                  tnp = new JournalTransaction(JournalImpl.this);
 
                   pendingTransactions.put(transactionID, tnp);
                }
@@ -1522,7 +1243,7 @@
 
                if (tnp == null)
                {
-                  tnp = new JournalTransaction();
+                  tnp = new JournalTransaction(JournalImpl.this);
 
                   pendingTransactions.put(transactionID, tnp);
                }
@@ -1558,7 +1279,7 @@
 
                if (journalTransaction == null)
                {
-                  journalTransaction = new JournalTransaction();
+                  journalTransaction = new JournalTransaction(JournalImpl.this);
 
                   pendingTransactions.put(transactionID, journalTransaction);
                }
@@ -2096,299 +1817,314 @@
    {
 
       file.getFile().open(1);
-
-      ByteBuffer wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
-
-      int bytesRead = file.getFile().read(wholeFileBuffer);
-
-      if (bytesRead != file.getFile().size())
+      ByteBuffer wholeFileBuffer = null;
+      try
       {
-         throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
-      }
 
-      wholeFileBuffer.position(0);
+         wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
 
-      // First long is the ordering timestamp, we just jump its position
-      wholeFileBuffer.position(SIZE_HEADER);
+         int bytesRead = file.getFile().read(wholeFileBuffer);
 
-      int lastDataPos = SIZE_HEADER;
-
-      while (wholeFileBuffer.hasRemaining())
-      {
-         final int pos = wholeFileBuffer.position();
-
-         byte recordType = wholeFileBuffer.get();
-
-         if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+         if (bytesRead != file.getFile().size())
          {
-            // I - We scan for any valid record on the file. If a hole
-            // happened on the middle of the file we keep looking until all
-            // the possibilities are gone
-            continue;
+            throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
          }
 
-         if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
-         {
-            reader.markAsDataFile(file);
+         wholeFileBuffer.position(0);
 
-            wholeFileBuffer.position(pos + 1);
-            // II - Ignore this record, lets keep looking
-            continue;
-         }
+         // First long is the ordering timestamp, we just jump its position
+         wholeFileBuffer.position(SIZE_HEADER);
 
-         // III - Every record has the file-id.
-         // This is what supports us from not re-filling the whole file
-         int readFileId = wholeFileBuffer.getInt();
+         int lastDataPos = SIZE_HEADER;
 
-         long transactionID = 0;
-
-         if (isTransaction(recordType))
+         while (wholeFileBuffer.hasRemaining())
          {
-            if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+            final int pos = wholeFileBuffer.position();
+
+            byte recordType = wholeFileBuffer.get();
+
+            if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
             {
-               wholeFileBuffer.position(pos + 1);
-               reader.markAsDataFile(file);
+               // I - We scan for any valid record on the file. If a hole
+               // happened on the middle of the file we keep looking until all
+               // the possibilities are gone
                continue;
             }
 
-            transactionID = wholeFileBuffer.getLong();
-         }
-
-         long recordID = 0;
-
-         // If prepare or commit
-         if (!isCompleteTransaction(recordType))
-         {
-            if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+            if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_INT))
             {
-               wholeFileBuffer.position(pos + 1);
                reader.markAsDataFile(file);
+
+               wholeFileBuffer.position(pos + 1);
+               // II - Ignore this record, lets keep looking
                continue;
             }
 
-            recordID = wholeFileBuffer.getLong();
-         }
+            // III - Every record has the file-id.
+            // This is what supports us from not re-filling the whole file
+            int readFileId = wholeFileBuffer.getInt();
 
-         // We use the size of the record to validate the health of the
-         // record.
-         // (V) We verify the size of the record
+            long transactionID = 0;
 
-         // The variable record portion used on Updates and Appends
-         int variableSize = 0;
-
-         // Used to hold extra data on transaction prepares
-         int preparedTransactionExtraDataSize = 0;
-
-         byte userRecordType = 0;
-
-         byte record[] = null;
-
-         if (isContainsBody(recordType))
-         {
-            if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+            if (isTransaction(recordType))
             {
-               wholeFileBuffer.position(pos + 1);
-               reader.markAsDataFile(file);
-               continue;
+               if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
+
+               transactionID = wholeFileBuffer.getLong();
             }
 
-            variableSize = wholeFileBuffer.getInt();
+            long recordID = 0;
 
-            if (isInvalidSize(wholeFileBuffer.position(), variableSize))
+            // If prepare or commit
+            if (!isCompleteTransaction(recordType))
             {
-               wholeFileBuffer.position(pos + 1);
-               continue;
-            }
+               if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
 
-            if (recordType != DELETE_RECORD_TX)
-            {
-               userRecordType = wholeFileBuffer.get();
+               recordID = wholeFileBuffer.getLong();
             }
 
-            record = new byte[variableSize];
+            // We use the size of the record to validate the health of the
+            // record.
+            // (V) We verify the size of the record
 
-            wholeFileBuffer.get(record);
-         }
+            // The variable record portion used on Updates and Appends
+            int variableSize = 0;
 
-         // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
-         // currentFile
-         int transactionCheckNumberOfRecords = 0;
+            // Used to hold extra data on transaction prepares
+            int preparedTransactionExtraDataSize = 0;
 
-         if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
-         {
-            transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
+            byte userRecordType = 0;
 
-            if (recordType == PREPARE_RECORD)
+            byte record[] = null;
+
+            if (isContainsBody(recordType))
             {
-               // Add the variable size required for preparedTransactions
-               preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
-            }
-            variableSize = 0;
-         }
+               if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_INT))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
 
-         int recordSize = getRecordSize(recordType);
+               variableSize = wholeFileBuffer.getInt();
 
-         // VI - this is completing V, We will validate the size at the end
-         // of the record,
-         // But we avoid buffer overflows by damaged data
-         if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
-         {
-            // Avoid a buffer overflow caused by damaged data... continue
-            // scanning for more pendingTransactions...
-            trace("Record at position " + pos +
-                  " recordType = " +
-                  recordType +
-                  " file:" +
-                  file.getFile().getFileName() +
-                  " recordSize: " +
-                  recordSize +
-                  " variableSize: " +
-                  variableSize +
-                  " preparedTransactionExtraDataSize: " +
-                  preparedTransactionExtraDataSize +
-                  " is corrupted and it is being ignored (II)");
-            // If a file has damaged pendingTransactions, we make it a dataFile, and the
-            // next reclaiming will fix it
-            reader.markAsDataFile(file);
-            wholeFileBuffer.position(pos + 1);
+               if (isInvalidSize(wholeFileBuffer.position(), variableSize))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  continue;
+               }
 
-            continue;
-         }
+               if (recordType != DELETE_RECORD_TX)
+               {
+                  userRecordType = wholeFileBuffer.get();
+               }
 
-         int oldPos = wholeFileBuffer.position();
+               record = new byte[variableSize];
 
-         wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+               wholeFileBuffer.get(record);
+            }
 
-         int checkSize = wholeFileBuffer.getInt();
+            // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
+            // currentFile
+            int transactionCheckNumberOfRecords = 0;
 
-         // VII - The checkSize at the end has to match with the size
-         // informed at the beggining.
-         // This is like testing a hash for the record. (We could replace the
-         // checkSize by some sort of calculated hash)
-         if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
-         {
-            trace("Record at position " + pos +
-                  " recordType = " +
-                  recordType +
-                  " file:" +
-                  file.getFile().getFileName() +
-                  " is corrupted and it is being ignored (III)");
+            if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+            {
+               transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
 
-            // If a file has damaged pendingTransactions, we make it a dataFile, and the
-            // next reclaiming will fix it
-            reader.markAsDataFile(file);
+               if (recordType == PREPARE_RECORD)
+               {
+                  // Add the variable size required for preparedTransactions
+                  preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
+               }
+               variableSize = 0;
+            }
 
-            wholeFileBuffer.position(pos + SIZE_BYTE);
+            int recordSize = getRecordSize(recordType);
 
-            continue;
-         }
+            // VI - this is completing V, We will validate the size at the end
+            // of the record,
+            // But we avoid buffer overflows by damaged data
+            if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+            {
+               // Avoid a buffer overflow caused by damaged data... continue
+               // scanning for more pendingTransactions...
+               trace("Record at position " + pos +
+                     " recordType = " +
+                     recordType +
+                     " file:" +
+                     file.getFile().getFileName() +
+                     " recordSize: " +
+                     recordSize +
+                     " variableSize: " +
+                     variableSize +
+                     " preparedTransactionExtraDataSize: " +
+                     preparedTransactionExtraDataSize +
+                     " is corrupted and it is being ignored (II)");
+               // If a file has damaged pendingTransactions, we make it a dataFile, and the
+               // next reclaiming will fix it
+               reader.markAsDataFile(file);
+               wholeFileBuffer.position(pos + 1);
 
-         // This record is from a previous file-usage. The file was
-         // reused and we need to ignore this record
-         if (readFileId != file.getFileID())
-         {
-            // If a file has damaged pendingTransactions, we make it a dataFile, and the
-            // next reclaiming will fix it
-            reader.markAsDataFile(file);
+               continue;
+            }
 
-            continue;
-         }
+            int oldPos = wholeFileBuffer.position();
 
-         wholeFileBuffer.position(oldPos);
+            wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - DataConstants.SIZE_INT);
 
-         // At this point everything is checked. So we relax and just load
-         // the data now.
+            int checkSize = wholeFileBuffer.getInt();
 
-         switch (recordType)
-         {
-            case ADD_RECORD:
+            // VII - The checkSize at the end has to match with the size
+            // informed at the beggining.
+            // This is like testing a hash for the record. (We could replace the
+            // checkSize by some sort of calculated hash)
+            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
             {
-               reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
-               break;
-            }
+               trace("Record at position " + pos +
+                     " recordType = " +
+                     recordType +
+                     " file:" +
+                     file.getFile().getFileName() +
+                     " is corrupted and it is being ignored (III)");
 
-            case UPDATE_RECORD:
-            {
-               reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
-               break;
-            }
+               // If a file has damaged pendingTransactions, we make it a dataFile, and the
+               // next reclaiming will fix it
+               reader.markAsDataFile(file);
 
-            case DELETE_RECORD:
-            {
-               reader.deleteRecord(recordID);
-               break;
-            }
+               wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);
 
-            case ADD_RECORD_TX:
-            {
-               reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
-               break;
+               continue;
             }
 
-            case UPDATE_RECORD_TX:
+            // This record is from a previous file-usage. The file was
+            // reused and we need to ignore this record
+            if (readFileId != file.getFileID())
             {
-               reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
-               break;
-            }
+               // If a file has damaged pendingTransactions, we make it a dataFile, and the
+               // next reclaiming will fix it
+               reader.markAsDataFile(file);
 
-            case DELETE_RECORD_TX:
-            {
-               reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
-               break;
+               continue;
             }
 
-            case PREPARE_RECORD:
+            wholeFileBuffer.position(oldPos);
+
+            // At this point everything is checked. So we relax and just load
+            // the data now.
+
+            switch (recordType)
             {
+               case ADD_RECORD:
+               {
+                  reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+                  break;
+               }
 
-               byte extraData[] = new byte[preparedTransactionExtraDataSize];
+               case UPDATE_RECORD:
+               {
+                  reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+                  break;
+               }
 
-               wholeFileBuffer.get(extraData);
+               case DELETE_RECORD:
+               {
+                  reader.deleteRecord(recordID);
+                  break;
+               }
 
-               reader.prepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+               case ADD_RECORD_TX:
+               {
+                  reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+                  break;
+               }
 
-               break;
-            }
-            case COMMIT_RECORD:
-            {
+               case UPDATE_RECORD_TX:
+               {
+                  reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+                  break;
+               }
 
-               reader.commitRecord(transactionID, transactionCheckNumberOfRecords);
-               break;
+               case DELETE_RECORD_TX:
+               {
+                  reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+                  break;
+               }
+
+               case PREPARE_RECORD:
+               {
+
+                  byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+                  wholeFileBuffer.get(extraData);
+
+                  reader.prepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+
+                  break;
+               }
+               case COMMIT_RECORD:
+               {
+
+                  reader.commitRecord(transactionID, transactionCheckNumberOfRecords);
+                  break;
+               }
+               case ROLLBACK_RECORD:
+               {
+                  reader.rollbackRecord(transactionID);
+                  break;
+               }
+               default:
+               {
+                  throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+                                                  " is corrupt, invalid record type " +
+                                                  recordType);
+               }
             }
-            case ROLLBACK_RECORD:
+
+            checkSize = wholeFileBuffer.getInt();
+
+            // This is a sanity check about the loading code itself.
+            // If this checkSize doesn't match, it means the reading method is
+            // not doing what it was supposed to do
+            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
             {
-               reader.rollbackRecord(transactionID);
-               break;
+               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
+                                               ", pos = " +
+                                               pos);
             }
-            default:
-            {
-               throw new IllegalStateException("Journal " + file.getFile().getFileName() +
-                                               " is corrupt, invalid record type " +
-                                               recordType);
-            }
-         }
 
-         checkSize = wholeFileBuffer.getInt();
+            lastDataPos = wholeFileBuffer.position();
 
-         // This is a sanity check about the loading code itself.
-         // If this checkSize doesn't match, it means the reading method is
-         // not doing what it was supposed to do
-         if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
-         {
-            throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
-                                            ", pos = " +
-                                            pos);
          }
 
-         lastDataPos = wholeFileBuffer.position();
-
+         return lastDataPos;
       }
 
-      fileFactory.releaseBuffer(wholeFileBuffer);
+      finally
+      {
+         if (wholeFileBuffer != null)
+         {
+            fileFactory.releaseBuffer(wholeFileBuffer);
+         }
 
-      file.getFile().close();
-
-      return lastDataPos;
-
+         try
+         {
+            file.getFile().close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
    }
 
    /**
@@ -2432,7 +2168,7 @@
     * @return
     * @throws Exception
     */
-   private void writeTransaction(final int fileID,
+   void writeTransaction(final int fileID,
                                  final byte recordType,
                                  final long txID,
                                  final JournalTransaction tx,
@@ -2467,7 +2203,7 @@
     * @param size
     * @param bb
     */
-   private void writeUpdateRecordTX(final int fileID,
+   void writeUpdateRecordTX(final int fileID,
                                     final long txID,
                                     final long id,
                                     final byte recordType,
@@ -2492,7 +2228,7 @@
     * @param size
     * @param bb
     */
-   private void writeUpdateRecord(final int fileId,
+   void writeUpdateRecord(final int fileId,
                                   final long id,
                                   final byte recordType,
                                   final EncodingSupport record,
@@ -2515,7 +2251,7 @@
     * @param size
     * @param bb
     */
-   private void writeAddRecord(final int fileId,
+   void writeAddRecord(final int fileId,
                                final long id,
                                final byte recordType,
                                final EncodingSupport record,
@@ -2538,7 +2274,7 @@
     * @param size
     * @param bb
     */
-   private void writeDeleteRecordTransactional(final int fileID,
+   void writeDeleteRecordTransactional(final int fileID,
                                                final long txID,
                                                final long id,
                                                final EncodingSupport record,
@@ -2566,7 +2302,7 @@
     * @param size
     * @param bb
     */
-   private void writeAddRecordTX(final int fileID,
+   void writeAddRecordTX(final int fileID,
                                  final long txID,
                                  final long id,
                                  final byte recordType,
@@ -2748,14 +2484,14 @@
             // The callback of a transaction has to be taken inside the lock,
             // when we guarantee the currentFile will not be changed,
             // since we individualize the callback per file
-            callback = tx.getCallback(currentFile);
+            callback = tx.getCallback(fileFactory.isSupportsCallbacks(), currentFile);
 
             if (sync)
             {
                // 99 % of the times this will be already synced, as previous files should be closed already.
                // This is to have 100% guarantee the transaction will be persisted and no loss of information would
                // happen
-               tx.syncPreviousFiles();
+               tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
             }
 
             // We need to add the number of records on currentFile if prepare or commit
@@ -2767,7 +2503,7 @@
          }
 
          // Adding fileID
-         bb.writerIndex(SIZE_BYTE);
+         bb.writerIndex(DataConstants.SIZE_BYTE);
          bb.writeInt(currentFile.getFileID());
 
          if (callback != null)
@@ -2946,7 +2682,7 @@
     * @return
     * @throws Exception
     */
-   private JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
+   public JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
    {
       JournalFile nextOpenedFile = null;
       try
@@ -2998,7 +2734,7 @@
 
       if (tx == null)
       {
-         tx = new JournalTransaction();
+         tx = new JournalTransaction(this);
 
          JournalTransaction trans = pendingTransactions.putIfAbsent(txID, tx);
 
@@ -3038,53 +2774,12 @@
    // Inner classes
    // ---------------------------------------------------------------------------
 
-   // Just encapsulates the VariableLatch waiting for transaction completions
-   // Used if the SequentialFile supports Callbacks
-   private static class TransactionCallback implements IOCallback
-   {
-      private final VariableLatch countLatch = new VariableLatch();
-
-      private volatile String errorMessage = null;
-
-      private volatile int errorCode = 0;
-
-      public void countUp()
-      {
-         countLatch.up();
-      }
-
-      public void done()
-      {
-         countLatch.down();
-      }
-
-      public void waitCompletion() throws InterruptedException
-      {
-         countLatch.waitCompletion();
-
-         if (errorMessage != null)
-         {
-            throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
-         }
-      }
-
-      public void onError(final int errorCode, final String errorMessage)
-      {
-         this.errorMessage = errorMessage;
-
-         this.errorCode = errorCode;
-
-         countLatch.down();
-      }
-
-   }
-
    /** 
     * This holds the relationship a record has with other files in regard to reference counting.
     * Note: This class used to be called PosFiles
     * 
     * Used on the ref-count for reclaiming */
-   private static class JournalRecord
+   public static class JournalRecord
    {
       private final JournalFile addFile;
 
@@ -3143,278 +2838,6 @@
       }
    }
 
-   private class JournalTransaction
-   {
-      private List<Pair<JournalFile, Long>> pos;
-
-      private List<Pair<JournalFile, Long>> neg;
-
-      // All the files this transaction is touching on.
-      // We can't have those files being reclaimed or compacted if there is a pending transaction
-      private Set<JournalFile> pendingFiles;
-
-      private TransactionCallback currentCallback;
-
-      private Map<JournalFile, TransactionCallback> callbackList;
-
-      private JournalFile lastFile = null;
-
-      private final AtomicInteger counter = new AtomicInteger();
-
-      private AtomicInteger internalgetCounter(final JournalFile file)
-      {
-         if (lastFile != file)
-         {
-            lastFile = file;
-            counter.set(0);
-         }
-         return counter;
-      }
-
-      public int getCounter(final JournalFile file)
-      {
-         return internalgetCounter(file).intValue();
-      }
-
-      public void incCounter(final JournalFile file)
-      {
-         internalgetCounter(file).incrementAndGet();
-      }
-
-      /**
-       * @param currentFile
-       * @param bb
-       */
-      public void fillNumberOfRecords(JournalFile currentFile, MessagingBuffer bb)
-      {
-         bb.writerIndex(SIZE_BYTE + SIZE_INT + SIZE_LONG);
-
-         bb.writeInt(getCounter(currentFile));
-
-      }
-
-      /** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
-       *  Because of that, this operation should be almost very fast.*/
-      public void syncPreviousFiles() throws Exception
-      {
-         if (fileFactory.isSupportsCallbacks())
-         {
-            if (callbackList != null)
-            {
-               for (Map.Entry<JournalFile, TransactionCallback> entry : callbackList.entrySet())
-               {
-                  if (entry.getKey() != currentFile)
-                  {
-                     entry.getValue().waitCompletion();
-                  }
-               }
-            }
-         }
-         else
-         {
-            for (JournalFile file : pendingFiles)
-            {
-               if (file != currentFile)
-               {
-                  file.getFile().waitForClose();
-               }
-            }
-         }
-      }
-
-      /**
-       * @return
-       */
-      public TransactionCallback getCallback(JournalFile file) throws Exception
-      {
-         if (fileFactory.isSupportsCallbacks())
-         {
-            if (callbackList == null)
-            {
-               callbackList = new HashMap<JournalFile, TransactionCallback>();
-            }
-
-            currentCallback = callbackList.get(file);
-
-            if (currentCallback == null)
-            {
-               currentCallback = new TransactionCallback();
-               callbackList.put(file, currentCallback);
-            }
-
-            if (currentCallback.errorMessage != null)
-            {
-               throw new MessagingException(currentCallback.errorCode, currentCallback.errorMessage);
-            }
-
-            currentCallback.countUp();
-
-            return currentCallback;
-         }
-         else
-         {
-            return null;
-         }
-      }
-
-      public void addPositive(final JournalFile file, final long id)
-      {
-         incCounter(file);
-
-         addFile(file);
-
-         if (pos == null)
-         {
-            pos = new ArrayList<Pair<JournalFile, Long>>();
-         }
-
-         pos.add(new Pair<JournalFile, Long>(file, id));
-      }
-
-      public void addNegative(final JournalFile file, final long id)
-      {
-         incCounter(file);
-
-         addFile(file);
-
-         if (neg == null)
-         {
-            neg = new ArrayList<Pair<JournalFile, Long>>();
-         }
-
-         neg.add(new Pair<JournalFile, Long>(file, id));
-      }
-
-      /** 
-       * The caller of this method needs to guarantee lock.acquire. (unless this is being called from load what is a single thread process).
-       * */
-      public void commit(final JournalFile file)
-      {
-         if (pos != null)
-         {
-            for (Pair<JournalFile, Long> p : pos)
-            {
-               JournalRecord posFiles = records.get(p.b);
-
-               if (posFiles == null)
-               {
-                  posFiles = new JournalRecord(p.a);
-
-                  records.put(p.b, posFiles);
-               }
-               else
-               {
-                  posFiles.addUpdateFile(p.a);
-               }
-            }
-         }
-
-         if (neg != null)
-         {
-            for (Pair<JournalFile, Long> n : neg)
-            {
-               JournalRecord posFiles = records.remove(n.b);
-
-               if (posFiles != null)
-               {
-                  posFiles.delete(n.a);
-               }
-            }
-         }
-
-         // Now add negs for the pos we added in each file in which there were
-         // transactional operations
-
-         for (JournalFile jf : pendingFiles)
-         {
-            file.incNegCount(jf);
-         }
-      }
-
-      public void waitCallbacks() throws Exception
-      {
-         if (callbackList != null)
-         {
-            for (TransactionCallback callback : callbackList.values())
-            {
-               callback.waitCompletion();
-            }
-         }
-      }
-
-      /** Wait completion at the latest file only */
-      public void waitCompletion() throws Exception
-      {
-         if (currentCallback != null)
-         {
-            currentCallback.waitCompletion();
-         }
-      }
-
-      /** 
-       * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
-       * or else potFilesMap could be affected
-       * */
-      public void rollback(final JournalFile file)
-      {
-         // Now add negs for the pos we added in each file in which there were
-         // transactional operations
-         // Note that we do this on rollback as we do on commit, since we need
-         // to ensure the file containing
-         // the rollback record doesn't get deleted before the files with the
-         // transactional operations are deleted
-         // Otherwise we may run into problems especially with XA where we are
-         // just left with a prepare when the tx
-         // has actually been rolled back
-
-         for (JournalFile jf : pendingFiles)
-         {
-            file.incNegCount(jf);
-         }
-      }
-
-      /** 
-       * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
-       * or else potFilesMap could be affected
-       * */
-      public void prepare(final JournalFile file)
-      {
-         // We don't want the prepare record getting deleted before time
-
-         addFile(file);
-      }
-
-      /** Used by load, when the transaction was not loaded correctly */
-      public void forget()
-      {
-         // The transaction was not committed or rolled back in the file, so we
-         // reverse any pos counts we added
-         for (JournalFile jf : pendingFiles)
-         {
-            jf.decPosCount();
-         }
-
-      }
-
-      private void addFile(final JournalFile file)
-      {
-         if (pendingFiles == null)
-         {
-            pendingFiles = new HashSet<JournalFile>();
-         }
-
-         if (!pendingFiles.contains(file))
-         {
-            pendingFiles.add(file);
-
-            // We add a pos for the transaction itself in the file - this
-            // prevents any transactional operations
-            // being deleted before a commit or rollback is written
-            file.incPosCount();
-         }
-      }
-   }
-
    private static class NullEncoding implements EncodingSupport
    {
 
@@ -3435,7 +2858,7 @@
 
    }
 
-   private static class ByteArrayEncoding implements EncodingSupport
+   public static class ByteArrayEncoding implements EncodingSupport
    {
 
       final byte[] data;

Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	                        (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-29 18:02:57 UTC (rev 7496)
@@ -0,0 +1,326 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.Pair;
+
+/**
+ * A JournalTransaction
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalTransaction
+{
+   
+   private JournalImpl journal;
+   
+   private List<Pair<JournalFile, Long>> pos;
+
+   private List<Pair<JournalFile, Long>> neg;
+
+   // All the files this transaction is touching on.
+   // We can't have those files being reclaimed or compacted if there is a pending transaction
+   private Set<JournalFile> pendingFiles;
+
+   private TransactionCallback currentCallback;
+
+   private Map<JournalFile, TransactionCallback> callbackList;
+
+   private JournalFile lastFile = null;
+
+   private final AtomicInteger counter = new AtomicInteger();
+
+   
+   public JournalTransaction(JournalImpl journal)
+   {
+      this.journal = journal;
+   }
+   
+   public int getCounter(final JournalFile file)
+   {
+      return internalgetCounter(file).intValue();
+   }
+
+   public void incCounter(final JournalFile file)
+   {
+      internalgetCounter(file).incrementAndGet();
+   }
+
+   /**
+    * @param currentFile
+    * @param bb
+    */
+   public void fillNumberOfRecords(JournalFile currentFile, MessagingBuffer bb)
+   {
+      bb.writerIndex(DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG);
+
+      bb.writeInt(getCounter(currentFile));
+
+   }
+
+   /** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
+    *  Because of that, this operation should be almost very fast.*/
+   public void syncPreviousFiles(boolean callbacks, JournalFile currentFile) throws Exception
+   {
+      if (callbacks)
+      {
+         if (callbackList != null)
+         {
+            for (Map.Entry<JournalFile, TransactionCallback> entry : callbackList.entrySet())
+            {
+               if (entry.getKey() != currentFile)
+               {
+                  entry.getValue().waitCompletion();
+               }
+            }
+         }
+      }
+      else
+      {
+         for (JournalFile file : pendingFiles)
+         {
+            if (file != currentFile)
+            {
+               file.getFile().waitForClose();
+            }
+         }
+      }
+   }
+
+   /**
+    * @return
+    */
+   public TransactionCallback getCallback(boolean callbacks, JournalFile file) throws Exception
+   {
+      if (callbacks)
+      {
+         if (callbackList == null)
+         {
+            callbackList = new HashMap<JournalFile, TransactionCallback>();
+         }
+
+         currentCallback = callbackList.get(file);
+
+         if (currentCallback == null)
+         {
+            currentCallback = new TransactionCallback();
+            callbackList.put(file, currentCallback);
+         }
+
+         if (currentCallback.getErrorMessage() != null)
+         {
+            throw new MessagingException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
+         }
+
+         currentCallback.countUp();
+
+         return currentCallback;
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   public void addPositive(final JournalFile file, final long id)
+   {
+      incCounter(file);
+
+      addFile(file);
+
+      if (pos == null)
+      {
+         pos = new ArrayList<Pair<JournalFile, Long>>();
+      }
+
+      pos.add(new Pair<JournalFile, Long>(file, id));
+   }
+
+   public void addNegative(final JournalFile file, final long id)
+   {
+      incCounter(file);
+
+      addFile(file);
+
+      if (neg == null)
+      {
+         neg = new ArrayList<Pair<JournalFile, Long>>();
+      }
+
+      neg.add(new Pair<JournalFile, Long>(file, id));
+   }
+
+   /** 
+    * The caller of this method needs to guarantee lock.acquire. (unless this is being called from load what is a single thread process).
+    * */
+   public void commit(final JournalFile file)
+   {
+      if (pos != null)
+      {
+         for (Pair<JournalFile, Long> p : pos)
+         {
+            JournalImpl.JournalRecord posFiles = journal.getRecords().get(p.b);
+
+            if (posFiles == null)
+            {
+               posFiles = new JournalImpl.JournalRecord(p.a);
+
+               journal.getRecords().put(p.b, posFiles);
+            }
+            else
+            {
+               posFiles.addUpdateFile(p.a);
+            }
+         }
+      }
+
+      if (neg != null)
+      {
+         for (Pair<JournalFile, Long> n : neg)
+         {
+            JournalImpl.JournalRecord posFiles = journal.getRecords().remove(n.b);
+
+            if (posFiles != null)
+            {
+               posFiles.delete(n.a);
+            }
+         }
+      }
+
+      // Now add negs for the pos we added in each file in which there were
+      // transactional operations
+
+      for (JournalFile jf : pendingFiles)
+      {
+         file.incNegCount(jf);
+      }
+   }
+
+   public void waitCallbacks() throws Exception
+   {
+      if (callbackList != null)
+      {
+         for (TransactionCallback callback : callbackList.values())
+         {
+            callback.waitCompletion();
+         }
+      }
+   }
+
+   /** Wait completion at the latest file only */
+   public void waitCompletion() throws Exception
+   {
+      if (currentCallback != null)
+      {
+         currentCallback.waitCompletion();
+      }
+   }
+
+   /** 
+    * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+    * or else potFilesMap could be affected
+    * */
+   public void rollback(final JournalFile file)
+   {
+      // Now add negs for the pos we added in each file in which there were
+      // transactional operations
+      // Note that we do this on rollback as we do on commit, since we need
+      // to ensure the file containing
+      // the rollback record doesn't get deleted before the files with the
+      // transactional operations are deleted
+      // Otherwise we may run into problems especially with XA where we are
+      // just left with a prepare when the tx
+      // has actually been rolled back
+
+      for (JournalFile jf : pendingFiles)
+      {
+         file.incNegCount(jf);
+      }
+   }
+
+   /** 
+    * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+    * or else potFilesMap could be affected
+    * */
+   public void prepare(final JournalFile file)
+   {
+      // We don't want the prepare record getting deleted before time
+
+      addFile(file);
+   }
+
+   /** Used by load, when the transaction was not loaded correctly */
+   public void forget()
+   {
+      // The transaction was not committed or rolled back in the file, so we
+      // reverse any pos counts we added
+      for (JournalFile jf : pendingFiles)
+      {
+         jf.decPosCount();
+      }
+
+   }
+
+   private AtomicInteger internalgetCounter(final JournalFile file)
+   {
+      if (lastFile != file)
+
+      {
+         lastFile = file;
+         counter.set(0);
+      }
+      return counter;
+   }
+
+   private void addFile(final JournalFile file)
+   {
+      if (pendingFiles == null)
+      {
+         pendingFiles = new HashSet<JournalFile>();
+      }
+
+      if (!pendingFiles.contains(file))
+      {
+         pendingFiles.add(file);
+
+         // We add a pos for the transaction itself in the file - this
+         // prevents any transactional operations
+         // being deleted before a commit or rollback is written
+         file.incPosCount();
+      }
+   }
+}

Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/TransactionCallback.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/TransactionCallback.java	                        (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/TransactionCallback.java	2009-06-29 18:02:57 UTC (rev 7496)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.utils.VariableLatch;
+
+/**
+ * A TransactionCallback
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TransactionCallback implements IOCallback
+{
+   private final VariableLatch countLatch = new VariableLatch();
+
+   private volatile String errorMessage = null;
+
+   private volatile int errorCode = 0;
+
+   public void countUp()
+   {
+      countLatch.up();
+   }
+
+   public void done()
+   {
+      countLatch.down();
+   }
+
+   public void waitCompletion() throws InterruptedException
+   {
+      countLatch.waitCompletion();
+
+      if (errorMessage != null)
+      {
+         throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
+      }
+   }
+
+   public void onError(final int errorCode, final String errorMessage)
+   {
+      this.errorMessage = errorMessage;
+
+      this.errorCode = errorCode;
+
+      countLatch.down();
+   }
+
+   /**
+    * @return the errorMessage
+    */
+   public String getErrorMessage()
+   {
+      return errorMessage;
+   }
+
+   /**
+    * @return the errorCode
+    */
+   public int getErrorCode()
+   {
+      return errorCode;
+   }
+   
+   
+   
+
+}

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-29 14:55:19 UTC (rev 7495)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-29 18:02:57 UTC (rev 7496)
@@ -64,36 +64,39 @@
 
    public void testCompactwithPendingCommit() throws Exception
    {
-      InternalCompactTest(false, false, false, true);
+      InternalCompactTest(false, false, false, false, true);
    }
 
    public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
    {
-      InternalCompactTest(false, true, true, true);
+      InternalCompactTest(true, false, true, true, false);
    }
 
-
    public void testCompactwithConcurrentDeletes() throws Exception
    {
-      InternalCompactTest(false, false, true, true);
+      InternalCompactTest(true, false, false, true, false);
    }
 
    public void testCompactwithConcurrentUpdates() throws Exception
    {
-      InternalCompactTest(false, true, false, true);
+      InternalCompactTest(true, false, true, false, false);
    }
 
    public void testCompactWithConcurrentAppend() throws Exception
    {
-      InternalCompactTest(true, false, false, true);
+      InternalCompactTest(true, true, false, false, false);
    }
 
-   private void InternalCompactTest(final boolean performAppend, final boolean performUpdate, final boolean performDelete, final boolean pendingTransactions) throws Exception
+   private void InternalCompactTest(final boolean regularAdd,
+                                    final boolean performAppend,
+                                    final boolean performUpdate,
+                                    final boolean performDelete,
+                                    final boolean pendingTransactions) throws Exception
    {
       setup(50, 60 * 1024, true);
 
       ArrayList<Long> liveIDs = new ArrayList<Long>();
-      
+
       ArrayList<Long> listPendingTransactions = new ArrayList<Long>();
 
       final CountDownLatch latchDone = new CountDownLatch(1);
@@ -121,32 +124,37 @@
 
       long transactionID = 0;
 
-      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+      if (regularAdd)
       {
-         add(i);
-         if (i % 10 == 0 && i > 0)
+
+
+         for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
          {
-            journal.forceMoveNextFile();
+            add(i);
+            if (i % 10 == 0 && i > 0)
+            {
+               journal.forceMoveNextFile();
+            }
+            update(i);
          }
-         update(i);
-      }
 
-      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
-      {
+         for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+         {
 
-         addTx(transactionID, i);
-         updateTx(transactionID, i);
-         if (i % 10 == 0)
-         {
-            journal.forceMoveNextFile();
+            addTx(transactionID, i);
+            updateTx(transactionID, i);
+            if (i % 10 == 0)
+            {
+               journal.forceMoveNextFile();
+            }
+            commit(transactionID++);
+            update(i);
          }
-         commit(transactionID++);
-         update(i);
       }
-      
+
       if (pendingTransactions)
       {
-         for (long i = 0; i < 100 ; i++)
+         for (long i = 0; i < 100; i++)
          {
             addTx(transactionID, idGenerator.generateID());
             updateTx(transactionID, idGenerator.generateID());
@@ -156,16 +164,19 @@
 
       System.out.println("Number of Files: " + journal.getDataFilesCount());
 
-      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+      if (regularAdd)
       {
-         if (!(i % 10 == 0))
+         for (int i = 0; i < NUMBER_OF_RECORDS; i++)
          {
-            delete(i);
+            if (!(i % 10 == 0))
+            {
+               delete(i);
+            }
+            else
+            {
+               liveIDs.add((long)i);
+            }
          }
-         else
-         {
-            liveIDs.add((long)i);
-         }
       }
 
       journal.forceMoveNextFile();
@@ -211,7 +222,7 @@
             update(liveID);
          }
       }
-      
+
       if (performDelete)
       {
          for (long liveID : liveIDs)
@@ -219,7 +230,7 @@
             delete(liveID);
          }
       }
-      
+
       if (pendingTransactions)
       {
          for (long tx : listPendingTransactions)
@@ -256,7 +267,7 @@
 
       add(idGenerator.generateID());
 
-      journal.compact();
+      //journal.compact();
 
       stopJournal();
       createJournal();




More information about the jboss-cvs-commits mailing list