[hornetq-commits] JBoss hornetq SVN: r9591 - in branches/Branch_2_1: src/main/org/hornetq/core/journal and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 24 10:21:25 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-24 10:21:23 -0400 (Tue, 24 Aug 2010)
New Revision: 9591

Added:
   branches/Branch_2_1/merge-activity.txt
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java
Modified:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
merge journal fixes from trunk

Added: branches/Branch_2_1/merge-activity.txt
===================================================================
--- branches/Branch_2_1/merge-activity.txt	                        (rev 0)
+++ branches/Branch_2_1/merge-activity.txt	2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,9 @@
+Detailed list of merges that happened at this branch.
+
+
+- Date        - author         - Description
+
+- 24-aug-2010 - clebert        - Branch created from https://svn.jboss.org/repos/hornetq/tags/HornetQ_2_1_2_Final/
+
+- 24-aug-2010 - clebert        - merge from trunk -r9588:9590
+  There was also a manual copy of JournalImpl.java on this merge, since there was a minor change before that needed to be applied

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -23,7 +23,7 @@
  */
 public class RecordInfo
 {
-   public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate)
+   public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate, final short compactCount)
    {
       this.id = id;
 
@@ -32,8 +32,15 @@
       this.data = data;
 
       this.isUpdate = isUpdate;
+
+      this.compactCount = compactCount;
    }
 
+   /** How many times this record was compacted (up to 7 times)
+       After the record has reached 7 times, it will always be 7
+       As we only store up to 0x7 binary, as part of the recordID (binary 111) */
+   public final short compactCount;
+   
    public final long id;
 
    public final byte userRecordType;

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -57,8 +57,6 @@
 
    void compact() throws Exception;
    
-   void cleanUp(final JournalFile file) throws Exception;
-   
    JournalFile getCurrentFile();
    
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -218,7 +218,7 @@
 
       sequentialFile.open(1, false);
 
-      currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++);
+      currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
       
       JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
    }

Added: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java	                        (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.utils.Base64;
+
+/**
+ * This is an undocumented class, that will open a journal and force compacting on it.
+ * It may be used under special cases, but it shouldn't be needed under regular circunstances as the system should detect 
+ * the need for compacting.
+ * 
+ * The regular use is to configure min-compact parameters.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompactJournal
+{
+
+   public static void main(String arg[])
+   {
+      if (arg.length != 4)
+      {
+         System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.CompactJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>");
+         return;
+      }
+
+      try
+      {
+         compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]));
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
+   }
+
+   public static void compactJournal(String directory,
+                                    String journalPrefix,
+                                    String journalSuffix,
+                                    int minFiles,
+                                    int fileSize) throws Exception
+   {
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+
+      JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+      journal.start();
+      
+      journal.loadInternalOnly();
+      
+      journal.compact();
+      
+      journal.stop();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -183,6 +183,8 @@
              recordInfo.data.length +
              ",isUpdate@" +
              recordInfo.isUpdate +
+             ",compactCount@" +
+             recordInfo.compactCount +
              ",data@" +
              encode(recordInfo.data);
    }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -279,11 +279,11 @@
 
    protected static RecordInfo parseRecord(Properties properties) throws Exception
    {
-      int id = parseInt("id", properties);
+      long id = parseLong("id", properties);
       byte userRecordType = parseByte("userRecordType", properties);
       boolean isUpdate = parseBoolean("isUpdate", properties);
       byte[] data = parseEncoding("data", properties);
-      return new RecordInfo(id, userRecordType, data, isUpdate);
+      return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
    }
 
    private static byte[] parseEncoding(String name, Properties properties) throws Exception

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -1,212 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
-
-/**
- * A JournalCleaner
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class JournalCleaner extends AbstractJournalUpdateTask
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private final HashMap<Long, AtomicInteger> transactionCounter = new HashMap<Long, AtomicInteger>();
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-   /**
-    * @param fileFactory
-    * @param journal
-    * @param nextOrderingID
-    */
-   protected JournalCleaner(final SequentialFileFactory fileFactory,
-                            final JournalImpl journal,
-                            final Set<Long> recordsSnapshot,
-                            final long nextOrderingID) throws Exception
-   {
-      super(fileFactory, journal, recordsSnapshot, nextOrderingID);
-      openFile();
-   }
-
-   // Public --------------------------------------------------------
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#markAsDataFile(org.hornetq.core.journal.impl.JournalFile)
-    */
-   public void markAsDataFile(final JournalFile file)
-   {
-      // nothing to be done here
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecord(org.hornetq.core.journal.RecordInfo)
-    */
-   public void onReadAddRecord(final RecordInfo info) throws Exception
-   {
-      if (lookupRecord(info.id))
-      {
-         writeEncoder(new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)));
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecordTX(long, org.hornetq.core.journal.RecordInfo)
-    */
-   public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
-   {
-      if (lookupRecord(recordInfo.id))
-      {
-         incrementTransactionCounter(transactionID);
-
-         writeEncoder(new JournalAddRecordTX(true,
-                                             transactionID,
-                                             recordInfo.id,
-                                             recordInfo.getUserRecordType(),
-                                             new ByteArrayEncoding(recordInfo.data)));
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadCommitRecord(long, int)
-    */
-   public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
-   {
-      int txcounter = getTransactionCounter(transactionID);
-
-      writeEncoder(new JournalCompleteRecordTX(true, transactionID, null), txcounter);
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecord(long)
-    */
-   public void onReadDeleteRecord(final long recordID) throws Exception
-   {
-      writeEncoder(new JournalDeleteRecord(recordID));
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecordTX(long, org.hornetq.core.journal.RecordInfo)
-    */
-   public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
-   {
-      incrementTransactionCounter(transactionID);
-
-      writeEncoder(new JournalDeleteRecordTX(transactionID, recordInfo.id, new ByteArrayEncoding(recordInfo.data)));
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadPrepareRecord(long, byte[], int)
-    */
-   public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
-   {
-      int txcounter = getTransactionCounter(transactionID);
-
-      writeEncoder(new JournalCompleteRecordTX(false, transactionID, new ByteArrayEncoding(extraData)), txcounter);
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadRollbackRecord(long)
-    */
-   public void onReadRollbackRecord(final long transactionID) throws Exception
-   {
-      writeEncoder(new JournalRollbackRecordTX(transactionID));
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecord(org.hornetq.core.journal.RecordInfo)
-    */
-   public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
-   {
-      if (lookupRecord(recordInfo.id))
-      {
-         writeEncoder(new JournalAddRecord(false,
-                                           recordInfo.id,
-                                           recordInfo.userRecordType,
-                                           new ByteArrayEncoding(recordInfo.data)));
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecordTX(long, org.hornetq.core.journal.RecordInfo)
-    */
-   public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
-   {
-      if (lookupRecord(recordInfo.id))
-      {
-         incrementTransactionCounter(transactionID);
-
-         writeEncoder(new JournalAddRecordTX(false,
-                                             transactionID,
-                                             recordInfo.id,
-                                             recordInfo.userRecordType,
-                                             new ByteArrayEncoding(recordInfo.data)));
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected int incrementTransactionCounter(final long transactionID)
-   {
-      AtomicInteger counter = transactionCounter.get(transactionID);
-      if (counter == null)
-      {
-         counter = new AtomicInteger(0);
-         transactionCounter.put(transactionID, counter);
-      }
-
-      return counter.incrementAndGet();
-   }
-
-   protected int getTransactionCounter(final long transactionID)
-   {
-      AtomicInteger counter = transactionCounter.get(transactionID);
-      if (counter == null)
-      {
-         return 0;
-      }
-      else
-      {
-         return counter.intValue();
-      }
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -47,6 +47,11 @@
 {
 
    private static final Logger log = Logger.getLogger(JournalCompactor.class);
+   
+   // We try to separate old record from new ones when doing the compacting
+   // this is a split line
+   // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE 
+   private final short COMPACT_SPLIT_LINE = 2;
 
    // Snapshot of transactions that were pending when the compactor started
    private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
@@ -69,7 +74,7 @@
 
       if (controlFile.exists())
       {
-         JournalFile file = new JournalFileImpl(controlFile, 0);
+         JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
 
          final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
 
@@ -209,19 +214,65 @@
 
    private void checkSize(final int size) throws Exception
    {
+      checkSize(size, -1);
+   }
+   
+   private void checkSize(final int size, final int compactCount) throws Exception
+   {
       if (getWritingChannel() == null)
       {
-         openFile();
+         if (!checkCompact(compactCount))
+         {
+            // will need to open a file either way
+            openFile();
+         }
       }
       else
       {
+         if (compactCount >= 0)
+         {
+            if (checkCompact(compactCount))
+            {
+               // The file was already moved on this case, no need to check for the size.
+               // otherwise we will also need to check for the size
+               return;
+            }
+         }
+         
          if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
          {
             openFile();
          }
       }
    }
+   
+   int currentCount;
+   // This means we will need to split when the compactCount is bellow the watermark
+   boolean willNeedToSplit = false;
+   boolean splitted = false;
 
+   private boolean checkCompact(final int compactCount) throws Exception
+   {
+      if (compactCount >= COMPACT_SPLIT_LINE && !splitted)
+      {
+         willNeedToSplit = true;
+      }
+      
+      if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
+      {
+         willNeedToSplit = false;
+         splitted = false;
+         openFile();
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+   
+   
+
    /**
     * Replay pending counts that happened during compacting
     */
@@ -252,9 +303,10 @@
                                                                 info.id,
                                                                 info.getUserRecordType(),
                                                                 new ByteArrayEncoding(info.data));
+         addRecord.setCompactCount((short)(info.compactCount + 1));
+         
+         checkSize(addRecord.getEncodeSize(), info.compactCount);
 
-         checkSize(addRecord.getEncodeSize());
-
          writeEncoder(addRecord);
 
          newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize()));
@@ -273,7 +325,9 @@
                                                                info.getUserRecordType(),
                                                                new ByteArrayEncoding(info.data));
 
-         checkSize(record.getEncodeSize());
+         record.setCompactCount((short)(info.compactCount + 1));
+         
+         checkSize(record.getEncodeSize(), info.compactCount);
 
          newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
 
@@ -395,7 +449,9 @@
                                                                    info.userRecordType,
                                                                    new ByteArrayEncoding(info.data));
 
-         checkSize(updateRecord.getEncodeSize());
+         updateRecord.setCompactCount((short)(info.compactCount + 1));
+         
+         checkSize(updateRecord.getEncodeSize(), info.compactCount);
 
          JournalRecord newRecord = newRecords.get(info.id);
 
@@ -425,7 +481,9 @@
                                                                        info.userRecordType,
                                                                        new ByteArrayEncoding(info.data));
 
-         checkSize(updateRecordTX.getEncodeSize());
+         updateRecordTX.setCompactCount((short)(info.compactCount + 1));
+         
+         checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
 
          writeEncoder(updateRecordTX);
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -53,10 +53,6 @@
 
    boolean isCanReclaim();
 
-   void setNeedCleanup(boolean needCleanup);
-
-   boolean isNeedCleanup();
-
    long getOffset();
 
    /** This is a field to identify that records on this file actually belong to the current file.
@@ -64,6 +60,8 @@
    int getRecordID();
    
    long getFileID();
+   
+   int getJournalVersion();
 
    SequentialFile getFile();
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -36,7 +36,7 @@
    private final SequentialFile file;
 
    private final long fileID;
-   
+
    private final int recordID;
 
    private long offset;
@@ -47,19 +47,20 @@
 
    private boolean canReclaim;
 
-   private boolean needCleanup;
-
    private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
-   
 
+   private final int version;
+
    private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
 
-   public JournalFileImpl(final SequentialFile file, final long fileID)
+   public JournalFileImpl(final SequentialFile file, final long fileID, final int version)
    {
       this.file = file;
 
       this.fileID = fileID;
-      
+
+      this.version = version;
+
       this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
    }
 
@@ -81,16 +82,6 @@
       return canReclaim;
    }
 
-   public boolean isNeedCleanup()
-   {
-      return needCleanup;
-   }
-
-   public void setNeedCleanup(final boolean needCleanup)
-   {
-      this.needCleanup = needCleanup;
-   }
-
    public void setCanReclaim(final boolean canReclaim)
    {
       this.canReclaim = canReclaim;
@@ -119,6 +110,11 @@
       }
    }
 
+   public int getJournalVersion()
+   {
+      return version;
+   }
+
    public boolean resetNegCount(final JournalFile file)
    {
       return negCounts.remove(file) != null;
@@ -148,7 +144,7 @@
    {
       return fileID;
    }
-   
+
    public int getRecordID()
    {
       return recordID;
@@ -227,12 +223,10 @@
    {
       return liveBytes.get();
    }
-   
+
    public int getTotalNegativeToOthers()
    {
       return totalNegativeToOthers.get();
    }
-   
 
-   
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -67,6 +68,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.DataConstants;
 
+
 /**
  * 
  * <p>A circular log implementation.</p
@@ -88,7 +90,9 @@
 
    private static final int STATE_LOADED = 2;
 
-   private static final int FORMAT_VERSION = 1;
+   public static final int FORMAT_VERSION = 2;
+   
+   private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
 
    // Static --------------------------------------------------------
 
@@ -183,9 +187,9 @@
 
    public final String fileExtension;
 
-   private final LinkedBlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+   private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
 
-   private final LinkedBlockingDeque<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+   private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
 
    private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
 
@@ -398,9 +402,10 @@
 
          try
          {
-            long fileID = readFileHeader(file);
+            
+            JournalFileImpl jrnFile = readFileHeader(file);
 
-            orderedFiles.add(new JournalFileImpl(file, fileID));
+            orderedFiles.add(jrnFile);
          }
          finally
          {
@@ -499,6 +504,21 @@
                continue;
             }
 
+            short compactCount = 0;
+            
+            if (file.getJournalVersion() >= 2)
+            {
+               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
+               {
+                  reader.markAsDataFile(file);
+   
+                  wholeFileBuffer.position(pos + 1);
+                  continue;
+               }
+               
+               compactCount = wholeFileBuffer.get();
+            }
+            
             long transactionID = 0;
 
             if (JournalImpl.isTransaction(recordType))
@@ -602,7 +622,7 @@
                variableSize = 0;
             }
 
-            int recordSize = JournalImpl.getRecordSize(recordType);
+            int recordSize = JournalImpl.getRecordSize(recordType, file.getJournalVersion());
 
             // VI - this is completing V, We will validate the size at the end
             // of the record,
@@ -676,13 +696,13 @@
             {
                case ADD_RECORD:
                {
-                  reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
+                  reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
                   break;
                }
 
                case UPDATE_RECORD:
                {
-                  reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
+                  reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, compactCount));
                   break;
                }
 
@@ -694,19 +714,19 @@
 
                case ADD_RECORD_TX:
                {
-                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
                   break;
                }
 
                case UPDATE_RECORD_TX:
                {
-                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
                   break;
                }
 
                case DELETE_RECORD_TX:
                {
-                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true, compactCount));
                   break;
                }
 
@@ -1647,6 +1667,7 @@
                else
                {
                   log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+                  System.exit(-1);
                }
             }
          }
@@ -1693,6 +1714,7 @@
     *   <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
     *   <tr><td>RecordType</td><td>Byte (1)</td></tr>
     *   <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+    *   <tr><td>Compactor Counter</td><td>1 byte</td></tr>
     *   <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
     *   <tr><td>RecordID</td><td>Long (8 bytes)</td></tr>
     *   <tr><td>BodySize(Add, update and delete)</td><td>Integer (4 bytes)</td></tr>
@@ -1708,6 +1730,7 @@
     *   <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
     *   <tr><td>RecordType</td><td>Byte (1)</td></tr>
     *   <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+    *   <tr><td>Compactor Counter</td><td>1 byte</td></tr>
     *   <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
     *   <tr><td>ExtraDataLength (Prepares only)</td><td>Integer (4 bytes)</td></tr>
     *   <tr><td>Number Of Files (N)</td><td>Integer (4 bytes)</td></tr>
@@ -1776,7 +1799,7 @@
 
                loadManager.addRecord(info);
 
-               records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD));
+               records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1));
             }
 
             public void onReadUpdateRecord(final RecordInfo info) throws Exception
@@ -1795,7 +1818,7 @@
                   // have been deleted
                   // just leaving some updates in this file
 
-                  posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD);
+                  posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
                }
             }
 
@@ -1845,7 +1868,7 @@
                   transactions.put(transactionID, tnp);
                }
 
-               tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX);
+               tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
             }
 
             public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
@@ -2135,7 +2158,6 @@
             if (file.isCanReclaim())
             {
                // File can be reclaimed or deleted
-
                if (JournalImpl.trace)
                {
                   JournalImpl.trace("Reclaiming file " + file);
@@ -2149,67 +2171,6 @@
                addFreeFile(file, false);
             }
          }
-
-         int nCleanup = 0;
-         for (JournalFile file : dataFiles)
-         {
-            if (file.isNeedCleanup())
-            {
-               nCleanup++;
-            }
-         }
-
-         if (compactMinFiles > 0)
-         {
-            if (nCleanup > 0 && needsCompact())
-            {
-               for (JournalFile file : dataFiles)
-               {
-                  if (file.isNeedCleanup())
-                  {
-                     final JournalFile cleanupFile = file;
-
-                     if (compactorRunning.compareAndSet(false, true))
-                     {
-                        // The cleanup should happen rarely.
-                        // but when it happens it needs to use a different thread,
-                        // or opening new files or any other executor's usage will be blocked while the cleanUp is being
-                        // processed.
-
-                        compactorExecutor.execute(new Runnable()
-                        {
-                           public void run()
-                           {
-                              try
-                              {
-                                 cleanUp(cleanupFile);
-                              }
-                              catch (Throwable e)
-                              {
-                                 JournalImpl.log.warn(e.getMessage(), e);
-                              }
-                              finally
-                              {
-                                 compactorRunning.set(false);
-                                 if (autoReclaim)
-                                 {
-                                    scheduleReclaim();
-                                 }
-                              }
-                           }
-                        });
-                     }
-                     return true;
-                  }
-                  else
-                  {
-                     // We only cleanup the first files
-                     // if a middle file needs cleanup it will be done through compacting
-                     break;
-                  }
-               }
-            }
-         }
       }
       finally
       {
@@ -2219,132 +2180,9 @@
       return false;
    }
 
-   // This method is public for tests
-   public synchronized void cleanUp(final JournalFile file) throws Exception
-   {
-      if (state != JournalImpl.STATE_LOADED)
-      {
-         return;
-      }
+   
+   int deleteme = 0;
 
-      try
-      {
-         JournalCleaner cleaner = null;
-         ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
-         compactingLock.writeLock().lock();
-
-         try
-         {
-
-            if (JournalImpl.trace)
-            {
-               JournalImpl.trace("Cleaning up file " + file);
-            }
-            JournalImpl.log.debug("Cleaning up file " + file);
-
-            if (file.getPosCount() == 0)
-            {
-               // nothing to be done
-               return;
-            }
-
-            // We don't want this file to be reclaimed during the cleanup
-            file.incPosCount();
-
-            // The file will have all the deleted records removed, so all the NegCount towards the file being cleaned up
-            // could be reset
-            for (JournalFile jrnFile : dataFiles)
-            {
-               if (jrnFile.resetNegCount(file))
-               {
-                  dependencies.add(jrnFile);
-                  jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
-               }
-            }
-
-            currentFile.resetNegCount(file);
-            currentFile.incPosCount();
-            dependencies.add(currentFile);
-
-            cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
-         }
-         finally
-         {
-            compactingLock.writeLock().unlock();
-         }
-
-         compactingLock.readLock().lock();
-
-         try
-         {
-            JournalImpl.readJournalFile(fileFactory, file, cleaner);
-         }
-         catch (Throwable e)
-         {
-            log.warn("Error reading cleanup on " + file, e);
-            throw new Exception("Error reading cleanup on " + file, e);
-         }
-
-         cleaner.flush();
-
-         // pointcut for tests
-         // We need to test concurrent updates on the journal, as the compacting is being performed.
-         // Usually tests will use this to hold the compacting while other structures are being updated.
-         onCompactDone();
-
-         for (JournalFile jrnfile : dependencies)
-         {
-            jrnfile.decPosCount();
-         }
-         file.decPosCount();
-
-         SequentialFile tmpFile = cleaner.currentFile.getFile();
-         String tmpFileName = tmpFile.getFileName();
-         String cleanedFileName = file.getFile().getFileName();
-
-         SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
-                                                                                             cleanedFileName));
-
-         SequentialFile returningFile = fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
-
-         returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") + ".tmp");
-
-         tmpFile.renameTo(cleanedFileName);
-
-         controlFile.delete();
-
-         final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-
-         if (trace)
-         {
-            trace("Adding free file back from cleanup" + retJournalfile);
-         }
-
-         filesExecutor.execute(new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  addFreeFile(retJournalfile, true);
-               }
-               catch (Throwable e)
-               {
-                  log.warn("Error reinitializing file " + file, e);
-               }
-
-            }
-         });
-
-      }
-      finally
-      {
-         compactingLock.readLock().unlock();
-         JournalImpl.log.debug("Clean up on file " + file + " done");
-      }
-
-   }
-
    private boolean needsCompact() throws Exception
    {
       JournalFile[] dataFiles = getDataFiles();
@@ -2359,8 +2197,10 @@
       long totalBytes = (long)dataFiles.length * (long)fileSize;
 
       long compactMargin = (long)(totalBytes * compactPercentage);
+      
+      boolean needCompact = (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
 
-      return (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+      return needCompact;
 
    }
 
@@ -2840,7 +2680,7 @@
 
       int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
 
-      JournalFile jf = new JournalFileImpl(sf, newFileID);
+      JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
 
       sf.position(position);
 
@@ -2886,7 +2726,7 @@
       return recordType >= JournalImpl.ADD_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX;
    }
 
-   private static int getRecordSize(final byte recordType)
+   private static int getRecordSize(final byte recordType, final int journalVersion)
    {
       // The record size (without the variable portion)
       int recordSize = 0;
@@ -2925,7 +2765,14 @@
             throw new IllegalStateException("Record other than expected");
 
       }
-      return recordSize;
+      if (journalVersion >= 2)
+      {
+         return recordSize + 1;
+      }
+      else
+      {
+         return recordSize;
+      }
    }
 
    /**
@@ -2933,17 +2780,30 @@
     * @return
     * @throws Exception
     */
-   private long readFileHeader(SequentialFile file) throws Exception
+   private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
    {
       ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
 
       file.read(bb);
 
       int journalVersion = bb.getInt();
-
+      
       if (journalVersion != FORMAT_VERSION)
       {
-         throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch");
+         boolean isCompatible = false;
+         
+         for (int v : COMPATIBLE_VERSIONS)
+         {
+            if (v == journalVersion)
+            {
+               isCompatible = true;
+            }
+         }
+         
+         if (!isCompatible)
+         {
+            throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
+         }
       }
 
       int readUserVersion = bb.getInt();
@@ -2958,7 +2818,8 @@
       fileFactory.releaseBuffer(bb);
 
       bb = null;
-      return fileID;
+      
+      return new JournalFileImpl(file, fileID, journalVersion);
    }
 
    /**
@@ -3173,7 +3034,7 @@
          sequentialFile.position(position);
       }
 
-      return new JournalFileImpl(sequentialFile, fileID);
+      return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
    }
 
    /**

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -53,8 +53,6 @@
 
          JournalFile currentFile = files[i];
 
-         currentFile.setNeedCleanup(false);
-
          int posCount = currentFile.getPosCount();
 
          int totNeg = 0;
@@ -101,18 +99,7 @@
                      {
                         Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
                      }
-                     file.setNeedCleanup(true);
-
-                     if (file.getTotalNegativeToOthers() == 0)
-                     {
-                        file.setNeedCleanup(true);
-                     }
-                     else
-                     {
-                        // This file can't be cleared as the file has negatives to other files as well
-                        file.setNeedCleanup(false);
-                     }
-
+ 
                      currentFile.setCanReclaim(false);
 
                      break;

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -66,6 +66,8 @@
       }
 
       buffer.writeInt(fileID);
+      
+      buffer.writeByte(compactCount);
 
       buffer.writeLong(id);
 
@@ -81,6 +83,6 @@
    @Override
    public int getEncodeSize()
    {
-      return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize();
+      return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
    }
 }
\ No newline at end of file

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -75,6 +75,8 @@
       }
 
       buffer.writeInt(fileID);
+      
+      buffer.writeByte(compactCount);
 
       buffer.writeLong(txID);
 
@@ -92,6 +94,6 @@
    @Override
    public int getEncodeSize()
    {
-      return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize();
+      return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
    }
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -70,6 +70,8 @@
       }
 
       buffer.writeInt(fileID);
+      
+      buffer.writeByte(compactCount);
 
       buffer.writeLong(txID);
 
@@ -105,11 +107,11 @@
    {
       if (isCommit)
       {
-         return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD;
+         return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + 1;
       }
       else
       {
-         return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0);
+         return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0) + 1;
       }
    }
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -44,6 +44,8 @@
       buffer.writeByte(JournalImpl.DELETE_RECORD);
 
       buffer.writeInt(fileID);
+      
+      buffer.writeByte(compactCount);
 
       buffer.writeLong(id);
 
@@ -53,6 +55,6 @@
    @Override
    public int getEncodeSize()
    {
-      return JournalImpl.SIZE_DELETE_RECORD;
+      return JournalImpl.SIZE_DELETE_RECORD + 1;
    }
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -55,6 +55,8 @@
       buffer.writeByte(JournalImpl.DELETE_RECORD_TX);
 
       buffer.writeInt(fileID);
+      
+      buffer.writeByte(compactCount);
 
       buffer.writeLong(txID);
 
@@ -73,6 +75,6 @@
    @Override
    public int getEncodeSize()
    {
-      return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+      return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0) + 1;
    }
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -27,6 +27,8 @@
 {
 
    protected int fileID;
+   
+   protected byte compactCount;
 
    public int getFileID()
    {
@@ -50,6 +52,23 @@
    {
       return 0;
    }
+   
+   public short getCompactCount()
+   {
+      return compactCount;
+   }
+   
+   public void setCompactCount(final short compactCount)
+   {
+      if (compactCount > Byte.MAX_VALUE)
+      {
+         this.compactCount = Byte.MAX_VALUE;
+      }
+      else
+      {
+         this.compactCount = (byte)compactCount;
+      }
+   }
 
    public abstract int getEncodeSize();
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -39,14 +39,15 @@
    {
       buffer.writeByte(JournalImpl.ROLLBACK_RECORD);
       buffer.writeInt(fileID);
+      buffer.writeByte(compactCount);
       buffer.writeLong(txID);
-      buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD);
+      buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD + 1);
 
    }
 
    @Override
    public int getEncodeSize()
    {
-      return JournalImpl.SIZE_ROLLBACK_RECORD;
+      return JournalImpl.SIZE_ROLLBACK_RECORD + 1;
    }
 }

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -39,7 +39,6 @@
 import org.hornetq.utils.IDGenerator;
 import org.hornetq.utils.SimpleIDGenerator;
 import org.hornetq.utils.TimeAndCounterIDGenerator;
-import org.hornetq.utils.ReusableLatch;
 
 /**
  * 
@@ -66,7 +65,7 @@
       for (int i = 0; i < 5; i++)
       {
          SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst", 1);
-         dataFiles.add(new JournalFileImpl(file, 0));
+         dataFiles.add(new JournalFileImpl(file, 0, JournalImpl.FORMAT_VERSION));
       }
 
       ArrayList<JournalFile> newFiles = new ArrayList<JournalFile>();
@@ -74,7 +73,7 @@
       for (int i = 0; i < 3; i++)
       {
          SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst.new", 1);
-         newFiles.add(new JournalFileImpl(file, 0));
+         newFiles.add(new JournalFileImpl(file, 0, JournalImpl.FORMAT_VERSION));
       }
 
       ArrayList<Pair<String, String>> renames = new ArrayList<Pair<String, String>>();
@@ -806,33 +805,8 @@
 
       SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
 
-      final ReusableLatch reusableLatchDone = new ReusableLatch();
-      reusableLatchDone.countUp();
-      final ReusableLatch reusableLatchWait = new ReusableLatch();
-      reusableLatchWait.countUp();
-
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-
-         @Override
-         public void onCompactDone()
-         {
-            reusableLatchDone.countDown();
-            System.out.println("Waiting on Compact");
-            try
-            {
-               reusableLatchWait.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-            System.out.println("Done");
-         }
-      };
-
-      journal.setAutoReclaim(false);
-
+      createJournal();
+      
       startJournal();
       load();
 
@@ -844,26 +818,8 @@
 
       long addedRecord = idGen.generateID();
 
-      Thread tCompact = new Thread()
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               journal.compact();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
+      startCompact();
 
-      tCompact.start();
-
-      reusableLatchDone.await();
-
       addTx(consumerTX, firstID);
 
       addTx(appendTX, addedRecord);
@@ -876,10 +832,8 @@
 
       delete(addedRecord);
 
-      reusableLatchWait.countDown();
+      finishCompact();
 
-      tCompact.join();
-
       journal.forceMoveNextFile();
 
       long newRecord = idGen.generateID();
@@ -959,52 +913,11 @@
 
       setup(2, 60 * 1024, false);
 
-      final ReusableLatch reusableLatchDone = new ReusableLatch();
-      reusableLatchDone.countUp();
-      final ReusableLatch reusableLatchWait = new ReusableLatch();
-      reusableLatchWait.countUp();
+      createJournal();
 
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-
-         @Override
-         public void onCompactDone()
-         {
-            reusableLatchDone.countDown();
-            System.out.println("Waiting on Compact");
-            try
-            {
-               reusableLatchWait.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-            System.out.println("Done");
-         }
-      };
-
-      journal.setAutoReclaim(false);
-
       startJournal();
       load();
 
-      Thread tCompact = new Thread()
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               journal.cleanUp(journal.getDataFiles()[0]);
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
-
       for (int i = 0; i < 100; i++)
       {
          add(i);
@@ -1017,20 +930,16 @@
          delete(i);
       }
 
-      tCompact.start();
-
-      reusableLatchDone.await();
-
+      startCompact();
+ 
       // Delete part of the live records while cleanup still working
       for (int i = 1; i < 5; i++)
       {
          delete(i);
       }
 
-      reusableLatchWait.countDown();
-
-      tCompact.join();
-
+      finishCompact();
+ 
       // Delete part of the live records after cleanup is done
       for (int i = 5; i < 10; i++)
       {
@@ -1054,53 +963,12 @@
       setup(2, 60 * 1024, false);
 
       SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+      
+      createJournal();
 
-      final ReusableLatch reusableLatchDone = new ReusableLatch();
-      reusableLatchDone.countUp();
-      final ReusableLatch reusableLatchWait = new ReusableLatch();
-      reusableLatchWait.countUp();
-
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-
-         @Override
-         public void onCompactDone()
-         {
-            reusableLatchDone.countDown();
-            System.out.println("Waiting on Compact");
-            try
-            {
-               reusableLatchWait.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-            System.out.println("Done");
-         }
-      };
-
-      journal.setAutoReclaim(false);
-
       startJournal();
       load();
 
-      Thread tCompact = new Thread()
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               journal.compact();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
-
       long appendTX = idGen.generateID();
       long appendOne = idGen.generateID();
       long appendTwo = idGen.generateID();
@@ -1109,9 +977,8 @@
 
       addTx(appendTX, appendOne);
 
-      tCompact.start();
-      reusableLatchDone.await();
-
+      startCompact();
+      
       addTx(appendTX, appendTwo);
 
       commit(appendTX);
@@ -1122,8 +989,7 @@
       commit(updateTX);
       // delete(appendTwo);
 
-      reusableLatchWait.countDown();
-      tCompact.join();
+      finishCompact();
 
       journal.compact();
 
@@ -1239,13 +1105,13 @@
          long id = idGenerator.generateID();
          listToDelete.add(id);
 
-         expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
+         expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD + 1);
 
          add(id);
          journal.forceMoveNextFile();
          update(id);
 
-         expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
+         expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD + 1);
          journal.forceMoveNextFile();
       }
 
@@ -1295,7 +1161,55 @@
       }
 
    }
+   
+   public void testCompactFirstFileWithPendingCommits() throws Exception
+   {
+      setup(2, 60 * 1024, true);
 
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      long tx = idGenerator.generateID();
+      for (int i = 0; i < 10; i++)
+      {
+         addTx(tx, idGenerator.generateID());
+      }
+      
+      journal.forceMoveNextFile();
+      commit(tx);
+      
+
+      ArrayList<Long> listToDelete = new ArrayList<Long>();
+      for (int i = 0; i < 10; i++)
+      {
+         long id = idGenerator.generateID();
+         listToDelete.add(id);
+         add(id);
+      }
+      
+      journal.forceMoveNextFile();
+
+      for (Long id : listToDelete)
+      {
+         delete(id);
+      }
+      
+      journal.forceMoveNextFile();
+      
+      // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+      journal.compact();
+
+      journal.checkReclaimStatus();
+      
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
    public void testLiveSizeTransactional() throws Exception
    {
       setup(2, 60 * 1024, true);

Added: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java	                        (rev 0)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A OldFormatTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OldFormatTest extends JournalImplTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // This will generate records using the Version 1 format, and reading at the current version
+   public void testFormatOne() throws Exception
+   {
+      setup(2, 100 * 1024, true);
+
+      SequentialFile file = fileFactory.createSequentialFile("hq-1.hq", 1);
+
+      ByteBuffer buffer = ByteBuffer.allocateDirect(100 * 1024);
+
+      initHeader(buffer, 1);
+
+      byte[] record = new byte[1];
+
+      for (long i = 0 ; i < 10; i++)
+      {
+         add(buffer, 1, i, record);
+   
+         update(buffer, 1, i, record);
+      }
+
+      file.open(1, false);
+
+      buffer.rewind();
+
+      file.writeDirect(buffer, true);
+
+      file.close();
+
+      createJournal();
+      startJournal();
+      loadAndCheck();
+      
+      startCompact();
+      finishCompact();
+      
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   private void add(ByteBuffer buffer, int fileID, long id, byte[] record)
+   {
+      int pos = buffer.position();
+
+      buffer.put(JournalImpl.ADD_RECORD);
+
+      buffer.putInt(fileID);
+
+      buffer.putLong(id);
+
+      buffer.putInt(record.length);
+
+      buffer.put((byte)0);
+
+      buffer.put(record);
+
+      buffer.putInt(buffer.position() - pos + DataConstants.SIZE_INT);
+
+      records.add(new RecordInfo(id, (byte)0, record, false, (short)0));
+   }
+
+   private void update(ByteBuffer buffer, int fileID, long id, byte[] record)
+   {
+      int pos = buffer.position();
+
+      buffer.put(JournalImpl.UPDATE_RECORD);
+
+      buffer.putInt(fileID);
+
+      buffer.putLong(id);
+
+      buffer.putInt(record.length);
+
+      buffer.put((byte)0);
+
+      buffer.put(record);
+
+      buffer.putInt(buffer.position() - pos + DataConstants.SIZE_INT);
+
+      records.add(new RecordInfo(id, (byte)0, record, true, (short)0));
+
+   }
+
+   /**
+    * @param buffer
+    */
+   private void initHeader(ByteBuffer buffer, int fileID)
+   {
+      buffer.putInt(1);
+
+      buffer.putInt(0);
+
+      buffer.putLong(fileID);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+    */
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      return new NIOSequentialFileFactory(getTestDir());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      File file = new File(getTestDir());
+
+      deleteDirectory(file);
+
+      file.mkdir();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -15,15 +15,19 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -41,7 +45,6 @@
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.OrderedExecutorFactory;
 import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
 
 /**
  * A SoakJournal
@@ -56,6 +59,8 @@
    public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
 
    private static final int MAX_WRITES = 20000;
+   
+   private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
    // We want to maximize the difference between appends and deles, or we could get out of memory
    public Semaphore maxRecords;
@@ -82,6 +87,12 @@
 
    Executor testExecutor;
 
+   protected long getTotalTimeMilliseconds()
+   {
+      return TimeUnit.MINUTES.toMillis(2);
+   }
+
+
    @Override
    public void setUp() throws Exception
    {
@@ -114,7 +125,7 @@
 
       journal = new JournalImpl(50 * 1024,
                                 20,
-                                15,
+                                50, 
                                 ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
                                 factory,
                                 "hornetq-data",
@@ -181,11 +192,6 @@
       threadPool.shutdown();
    }
 
-   protected long getTotalTimeMilliseconds()
-   {
-      return TimeUnit.MINUTES.toMillis(10);
-   }
-
    public void testAppend() throws Exception
    {
 
@@ -225,6 +231,12 @@
                             ", liveRecords = " +
                             (numberOfRecords.get() - numberOfDeletes.get()));
          Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+         rwLock.writeLock().lock();
+         System.out.println("Restarting server");
+         journal.stop();
+         journal.start();
+         reloadJournal();
+         rwLock.writeLock().unlock();
       }
 
       running = false;
@@ -255,12 +267,36 @@
 
       latchExecutorDone.await();
 
-      assertEquals(0, errors.get());
-
       journal.stop();
 
       journal.start();
 
+      reloadJournal();
+      
+      Collection<Long> records = journal.getRecords().keySet();
+      
+      System.out.println("Deleting everything!");
+      for (Long delInfo : records)
+      {
+         journal.appendDeleteRecord(delInfo, false);
+      }
+      
+      journal.forceMoveNextFile();
+      
+      Thread.sleep(5000);
+      
+      assertEquals(0, journal.getDataFilesCount());
+
+      journal.stop();
+   }
+
+   /**
+    * @throws Exception
+    */
+   private void reloadJournal() throws Exception
+   {
+      assertEquals(0, errors.get());
+      
       ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
       ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
       journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
@@ -285,8 +321,6 @@
       }
 
       assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
-
-      journal.stop();
    }
 
    private byte[] generateRecord()
@@ -313,9 +347,10 @@
       @Override
       public void run()
       {
+         rwLock.readLock().lock();
+         
          try
          {
-
             while (running)
             {
                final int txSize = RandomUtil.randomMax(100);
@@ -358,6 +393,14 @@
                      }
                   }
                });
+               
+               rwLock.readLock().unlock();
+               
+               Thread.yield();
+             
+               rwLock.readLock().lock();
+               
+
             }
          }
          catch (Exception e)
@@ -366,6 +409,10 @@
             running = false;
             errors.incrementAndGet();
          }
+         finally
+         {
+            rwLock.readLock().unlock();
+         }
       }
    }
 
@@ -384,6 +431,9 @@
       @Override
       public void run()
       {
+
+         rwLock.readLock().lock();
+
          try
          {
             int txSize = RandomUtil.randomMax(100);
@@ -391,17 +441,30 @@
             long ids[] = new long[txSize];
 
             long txID = JournalCleanupCompactStressTest.idGen.generateID();
-
+            
             while (running)
             {
 
-               long id = queue.poll(60, TimeUnit.MINUTES);
-               ids[txCount] = id;
-               journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
-               if (++txCount == txSize)
+               Long id = queue.poll(10, TimeUnit.SECONDS);
+               if (id != null)
                {
-                  journal.appendCommitRecord(txID, true, ctx);
-                  ctx.executeOnCompletion(new DeleteTask(ids));
+                  ids[txCount++] = id;
+                  journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+               }
+               if (txCount == txSize || id == null)
+               {
+                  if (txCount > 0)
+                  {
+                     journal.appendCommitRecord(txID, true, ctx);
+                     ctx.executeOnCompletion(new DeleteTask(ids));
+                  }
+                  
+                  rwLock.readLock().unlock();
+                  
+                  Thread.yield();
+                  
+                  rwLock.readLock().lock();
+                  
                   txCount = 0;
                   txSize = RandomUtil.randomMax(100);
                   txID = JournalCleanupCompactStressTest.idGen.generateID();
@@ -420,6 +483,10 @@
             running = false;
             errors.incrementAndGet();
          }
+         finally
+         {
+            rwLock.readLock().unlock();
+         }
       }
    }
 
@@ -434,14 +501,18 @@
 
       public void done()
       {
+         rwLock.readLock().lock();
          numberOfUpdates.addAndGet(ids.length);
          try
          {
             for (long id : ids)
             {
-               journal.appendDeleteRecord(id, false);
-               maxRecords.release();
-               numberOfDeletes.incrementAndGet();
+               if (id != 0)
+               {
+                  journal.appendDeleteRecord(id, false);
+                  maxRecords.release();
+                  numberOfDeletes.incrementAndGet();
+               }
             }
          }
          catch (Exception e)
@@ -451,6 +522,10 @@
             running = false;
             errors.incrementAndGet();
          }
+         finally
+         {
+            rwLock.readLock().unlock();
+         }
       }
 
       public void onError(final int errorCode, final String errorMessage)
@@ -473,6 +548,7 @@
       @Override
       public void run()
       {
+         rwLock.readLock().lock();
          try
          {
             while (running)
@@ -481,18 +557,22 @@
                // Append
                for (int i = 0; running & i < ids.length; i++)
                {
-                  // System.out.println("append slow");
+                  System.out.println("append slow");
                   ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
                   maxRecords.acquire();
                   journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
                   numberOfRecords.incrementAndGet();
 
+                  rwLock.readLock().unlock();
+                  
                   Thread.sleep(TimeUnit.SECONDS.toMillis(50));
+                  
+                  rwLock.readLock().lock();
                }
                // Delete
                for (int i = 0; running & i < ids.length; i++)
                {
-                  // System.out.println("Deleting");
+                  System.out.println("Deleting");
                   maxRecords.release();
                   journal.appendDeleteRecord(ids[i], false);
                   numberOfDeletes.incrementAndGet();
@@ -504,6 +584,10 @@
             e.printStackTrace();
             System.exit(-1);
          }
+         finally
+         {
+            rwLock.readLock().unlock();
+         }
       }
    }
 

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -542,7 +542,7 @@
       journalImpl.appendAddRecordTransactional(1l,
                                                2l,
                                                (byte)3,
-                                               new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
+                                               new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX - 1, (byte)4));
 
       journalImpl.appendCommitRecord(1l, false);
 
@@ -587,11 +587,11 @@
 
       // jumping RecordType, FileId, TransactionID, RecordID, VariableSize,
       // RecordType, RecordBody (that we know it is 1 )
-      buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+      buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1 + 1);
 
       int posCheckSize = buffer.position();
 
-      Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+      Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 2, buffer.getInt());
 
       buffer.position(posCheckSize);
 
@@ -652,11 +652,11 @@
 
       // jumping RecordType, FileId, TransactionID, RecordID, VariableSize,
       // RecordType, RecordBody (that we know it is 1 )
-      buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+      buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1 + 1);
 
       int posCheckSize = buffer.position();
 
-      Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+      Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 2, buffer.getInt());
 
       buffer.position(posCheckSize);
 

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -333,7 +333,7 @@
 
          journal.appendAddRecord(element, (byte)0, record, sync);
 
-         records.add(new RecordInfo(element, (byte)0, record, false));
+         records.add(new RecordInfo(element, (byte)0, record, false, (short)0));
       }
 
       journal.debugWait();
@@ -349,7 +349,7 @@
 
          journal.appendUpdateRecord(element, (byte)0, updateRecord, sync);
 
-         records.add(new RecordInfo(element, (byte)0, updateRecord, true));
+         records.add(new RecordInfo(element, (byte)0, updateRecord, true, (short)0));
       }
 
       journal.debugWait();
@@ -377,13 +377,13 @@
       {
          // SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length +
          // SIZE_BYTE
-         byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+         byte[] record = generateRecord(recordLength - (JournalImpl.SIZE_ADD_RECORD_TX + 1));
 
          beforeJournalOperation();
 
          journal.appendAddRecordTransactional(txID, element, (byte)0, record);
 
-         tx.records.add(new RecordInfo(element, (byte)0, record, false));
+         tx.records.add(new RecordInfo(element, (byte)0, record, false, (short)0));
 
       }
 
@@ -396,13 +396,13 @@
 
       for (long element : arguments)
       {
-         byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+         byte[] updateRecord = generateRecord(recordLength - (JournalImpl.SIZE_ADD_RECORD_TX + 1));
 
          beforeJournalOperation();
 
          journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
 
-         tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
+         tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true, (short)0));
       }
       journal.debugWait();
    }
@@ -417,7 +417,7 @@
 
          journal.appendDeleteRecordTransactional(txID, element);
 
-         tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
+         tx.deletes.add(new RecordInfo(element, (byte)0, null, true, (short)0));
       }
 
       journal.debugWait();

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -492,7 +492,8 @@
    private int calculateRecordsPerFile(final int fileSize, final int alignment, int recordSize)
    {
       recordSize = calculateRecordSize(recordSize, alignment);
-      return fileSize / recordSize;
+      int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
+      return (fileSize - headerSize) / recordSize;
    }
 
    /** 
@@ -666,7 +667,9 @@
 
       int addRecordsPerFile = calculateRecordsPerFile(10 * 1024,
                                                       journal.getAlignment(),
-                                                      JournalImpl.SIZE_ADD_RECORD + recordLength);
+                                                      JournalImpl.SIZE_ADD_RECORD + 1 + recordLength);
+      
+      System.out.println(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength);
 
       // Fills exactly 10 files
       int initialNumberOfAddRecords = addRecordsPerFile * 10;
@@ -693,29 +696,11 @@
 
       // Now delete half of them
 
-      int deleteRecordsPerFile = calculateRecordsPerFile(10 * 1024,
-                                                         journal.getAlignment(),
-                                                         JournalImpl.SIZE_DELETE_RECORD);
-
       for (int i = 0; i < initialNumberOfAddRecords / 2; i++)
       {
          delete(i);
       }
 
-      int numberOfFiles = calculateNumberOfFiles(10 * 1024,
-                                                 journal.getAlignment(),
-                                                 initialNumberOfAddRecords,
-                                                 JournalImpl.SIZE_ADD_RECORD + recordLength,
-                                                 initialNumberOfAddRecords / 2,
-                                                 JournalImpl.SIZE_DELETE_RECORD);
-
-      if (initialNumberOfAddRecords / 2 % deleteRecordsPerFile == 0)
-      {
-         // The file is already full, next add would fix it
-         numberOfFiles--;
-      }
-
-      Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(initialNumberOfAddRecords / 2, journal.getIDMapSize());
 
@@ -726,16 +711,6 @@
          add(initialNumberOfAddRecords + i);
       }
 
-      numberOfFiles = calculateNumberOfFiles(10 * 1024,
-                                             journal.getAlignment(),
-                                             initialNumberOfAddRecords,
-                                             JournalImpl.SIZE_ADD_RECORD + recordLength,
-                                             initialNumberOfAddRecords / 2,
-                                             JournalImpl.SIZE_DELETE_RECORD,
-                                             10,
-                                             JournalImpl.SIZE_ADD_RECORD + recordLength);
-
-      Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(initialNumberOfAddRecords / 2 + 10, journal.getIDMapSize());
 
@@ -786,7 +761,7 @@
    public void testReclaimAddUpdateDeleteDifferentFiles1() throws Exception
    {
       // Make sure there is one record per file
-      setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+      setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength,
                                                                             getAlignment()), true);
       createJournal();
       startJournal();
@@ -825,7 +800,7 @@
    public void testReclaimAddUpdateDeleteDifferentFiles2() throws Exception
    {
       // Make sure there is one record per file
-      setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+      setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength,
                                                                             getAlignment()), true);
 
       createJournal();
@@ -1130,7 +1105,7 @@
 
       // Make sure we move on to the next file
 
-      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 2
+      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 2
 
       journal.debugWait();
 
@@ -1161,7 +1136,7 @@
 
       // Make sure we move on to the next file
 
-      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 4
+      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 4
 
       List<String> files5 = fileFactory.listFiles(fileExtension);
 
@@ -1491,7 +1466,7 @@
       Assert.assertEquals(1, journal.getOpenedFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
 
-      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2);
+      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2);
 
       // Move on to another file
 
@@ -1544,7 +1519,7 @@
 
       // Make sure we move on to the next file
 
-      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
+      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
 
       List<String> files2 = fileFactory.listFiles(fileExtension);
 
@@ -1565,7 +1540,7 @@
                                                  2,
                                                  recordLength,
                                                  1,
-                                                 JournalImpl.SIZE_COMMIT_RECORD) + 2, files3.size());
+                                                 JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1586,9 +1561,9 @@
                                                  2,
                                                  recordLength,
                                                  1,
-                                                 JournalImpl.SIZE_COMMIT_RECORD,
+                                                 JournalImpl.SIZE_COMMIT_RECORD + 1,
                                                  1,
-                                                 JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
+                                                 JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1596,9 +1571,9 @@
                                                  2,
                                                  recordLength,
                                                  1,
-                                                 JournalImpl.SIZE_COMMIT_RECORD,
+                                                 JournalImpl.SIZE_COMMIT_RECORD + 1,
                                                  1,
-                                                 JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+                                                 JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
 
@@ -1608,9 +1583,6 @@
 
       List<String> files5 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(4, files5.size());
-
-      Assert.assertEquals(2, journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
       Assert.assertEquals(2, journal.getIDMapSize());
@@ -1619,9 +1591,6 @@
 
       List<String> files6 = fileFactory.listFiles(fileExtension);
 
-      Assert.assertEquals(4, files6.size());
-
-      Assert.assertEquals(2, journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
       Assert.assertEquals(2, journal.getIDMapSize());
@@ -1633,11 +1602,6 @@
       startJournal();
       loadAndCheck();
 
-      List<String> files7 = fileFactory.listFiles(fileExtension);
-
-      Assert.assertEquals(4, files7.size());
-
-      Assert.assertEquals(2, journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
       Assert.assertEquals(2, journal.getIDMapSize());
@@ -1665,7 +1629,7 @@
 
       // Make sure we move on to the next file
 
-      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
+      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
 
       List<String> files2 = fileFactory.listFiles(fileExtension);
 
@@ -1678,14 +1642,12 @@
 
       rollback(1); // in file 1
 
-      List<String> files3 = fileFactory.listFiles(fileExtension);
-
       Assert.assertEquals(calculateNumberOfFiles(fileSize,
                                                  journal.getAlignment(),
                                                  2,
                                                  recordLength,
                                                  1,
-                                                 JournalImpl.SIZE_ROLLBACK_RECORD) + 2, files3.size());
+                                                 JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1693,7 +1655,7 @@
                                                  2,
                                                  recordLength,
                                                  1,
-                                                 JournalImpl.SIZE_ROLLBACK_RECORD), journal.getDataFilesCount());
+                                                 JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
 
@@ -1706,9 +1668,9 @@
                                                  2,
                                                  recordLength,
                                                  1,
-                                                 JournalImpl.SIZE_ROLLBACK_RECORD,
+                                                 JournalImpl.SIZE_ROLLBACK_RECORD + 1,
                                                  1,
-                                                 JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
+                                                 JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1716,43 +1678,19 @@
                                                  2,
                                                  recordLength,
                                                  1,
-                                                 JournalImpl.SIZE_ROLLBACK_RECORD,
+                                                 JournalImpl.SIZE_ROLLBACK_RECORD + 1,
                                                  1,
-                                                 JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+                                                 JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
       // Move on to another file
 
-      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
+      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 2
       // (current
       // file)
 
-      List<String> files5 = fileFactory.listFiles(fileExtension);
-
-      Assert.assertEquals(calculateNumberOfFiles(fileSize,
-                                                 journal.getAlignment(),
-                                                 2,
-                                                 recordLength,
-                                                 1,
-                                                 JournalImpl.SIZE_ROLLBACK_RECORD,
-                                                 1,
-                                                 JournalImpl.SIZE_DELETE_RECORD,
-                                                 1,
-                                                 recordLength) + 2, files5.size());
-
       Assert.assertEquals(1, journal.getOpenedFilesCount());
-
-      Assert.assertEquals(calculateNumberOfFiles(fileSize,
-                                                 journal.getAlignment(),
-                                                 2,
-                                                 recordLength,
-                                                 1,
-                                                 JournalImpl.SIZE_ROLLBACK_RECORD,
-                                                 1,
-                                                 JournalImpl.SIZE_DELETE_RECORD,
-                                                 1,
-                                                 recordLength), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
 
@@ -1859,76 +1797,23 @@
       EncodingSupport xid = new SimpleEncoding(10, (byte)0);
       prepare(1, xid); // in file 1
 
-      List<String> files3 = fileFactory.listFiles(fileExtension);
-
-      Assert.assertEquals(3, files3.size());
-
-      Assert.assertEquals(calculateNumberOfFiles(fileSize,
-                                                 journal.getAlignment(),
-                                                 2,
-                                                 recordLength,
-                                                 1,
-                                                 JournalImpl.SIZE_PREPARE_RECORD), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
       delete(2); // in file 1
 
-      List<String> files4 = fileFactory.listFiles(fileExtension);
-
-      Assert.assertEquals(calculateNumberOfFiles(fileSize,
-                                                 journal.getAlignment(),
-                                                 2,
-                                                 recordLength,
-                                                 1,
-                                                 JournalImpl.SIZE_PREPARE_RECORD,
-                                                 1,
-                                                 JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
-
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize,
-                                                 journal.getAlignment(),
-                                                 2,
-                                                 recordLength,
-                                                 1,
-                                                 JournalImpl.SIZE_PREPARE_RECORD,
-                                                 1,
-                                                 JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(0, journal.getIDMapSize());
 
       // Move on to another file
 
-      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
+      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD -1, 3); // in file 2
 
-      List<String> files5 = fileFactory.listFiles(fileExtension);
-
-      Assert.assertEquals(calculateNumberOfFiles(fileSize,
-                                                 journal.getAlignment(),
-                                                 2,
-                                                 recordLength,
-                                                 1,
-                                                 JournalImpl.SIZE_PREPARE_RECORD,
-                                                 1,
-                                                 JournalImpl.SIZE_DELETE_RECORD,
-                                                 1,
-                                                 recordLength) + 2, files5.size());
-
       Assert.assertEquals(1, journal.getOpenedFilesCount());
 
-      Assert.assertEquals(calculateNumberOfFiles(fileSize,
-                                                 journal.getAlignment(),
-                                                 2,
-                                                 recordLength,
-                                                 1,
-                                                 JournalImpl.SIZE_PREPARE_RECORD,
-                                                 1,
-                                                 JournalImpl.SIZE_DELETE_RECORD,
-                                                 1,
-                                                 recordLength), journal.getDataFilesCount());
-
       Assert.assertEquals(0, journal.getFreeFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
 
@@ -1943,7 +1828,7 @@
       Assert.assertEquals(1, journal.getOpenedFilesCount());
       Assert.assertEquals(1, journal.getIDMapSize());
 
-      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 4); // in file 3
+      addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD -1, 4); // in file 3
 
       List<String> files7 = fileFactory.listFiles(fileExtension);
 
@@ -2415,7 +2300,7 @@
 
          journal.appendAddRecord(i, (byte)0, record, false);
 
-         records.add(new RecordInfo(i, (byte)0, record, false));
+         records.add(new RecordInfo(i, (byte)0, record, false, (short)0));
       }
 
       for (int i = 0; i < 100; i++)
@@ -2424,7 +2309,7 @@
 
          journal.appendUpdateRecord(i, (byte)0, record, false);
 
-         records.add(new RecordInfo(i, (byte)0, record, true));
+         records.add(new RecordInfo(i, (byte)0, record, true, (short)0));
       }
 
       for (int i = 0; i < 100; i++)

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java	2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java	2010-08-24 14:21:23 UTC (rev 9591)
@@ -22,6 +22,7 @@
 
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.Reclaimer;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.tests.util.UnitTestCase;
@@ -357,23 +358,6 @@
       assertCantDelete(2);
    }
 
-   public void testCleanup() throws Exception
-   {
-      setup(3);
-      setupPosNeg(0, 11, 0, 0, 0);
-      setupPosNeg(1, 1, 10, 0, 0);
-      setupPosNeg(2, 1, 0, 1, 0);
-
-      reclaimer.scan(files);
-
-      debugFiles();
-
-      assertCantDelete(0);
-      Assert.assertTrue(files[0].isNeedCleanup());
-      assertCantDelete(1);
-      assertCantDelete(2);
-   }
-
    public void testThreeFiles10() throws Exception
    {
       setup(3);
@@ -741,9 +725,7 @@
                             "]=" +
                             files[i].getPosCount() +
                             ", canDelete = " +
-                            files[i].isCanReclaim() +
-                            ", cleanup = " +
-                            files[i].isNeedCleanup());
+                            files[i].isCanReclaim());
          for (int j = 0; j <= i; j++)
          {
             System.out.println("..." + files[i].getNegCount(files[j]));
@@ -1002,5 +984,31 @@
       {
          return totalDep;
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalFile#getJournalVersion()
+       */
+      public int getJournalVersion()
+      {
+         return JournalImpl.FORMAT_VERSION;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalFile#getTotNeg()
+       */
+      public int getTotNeg()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalFile#setTotNeg(int)
+       */
+      public void setTotNeg(int totNeg)
+      {
+         // TODO Auto-generated method stub
+         
+      }
    }
 }



More information about the hornetq-commits mailing list