[hornetq-commits] JBoss hornetq SVN: r7963 - in trunk: src/main/org/hornetq/core/journal/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 16 17:29:29 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-09-16 17:29:29 -0400 (Wed, 16 Sep 2009)
New Revision: 7963

Added:
   trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
Modified:
   trunk/src/main/org/hornetq/core/journal/Journal.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
   trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
   trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
   trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Log:
HORNETQ-35 - Journal cleanup

Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -81,8 +81,9 @@
 
    void perfBlast(int pages) throws Exception;
 
-   /** This method is called automatically when a new file is opened  */
-   void checkAndReclaimFiles() throws Exception;
+   /** This method is called automatically when a new file is opened.
+    * @return true if it needs to re-check due to cleanup or other factors  */
+   boolean checkReclaimStatus() throws Exception;
 
    /** This method check for the need of compacting based on the minCompactPercentage 
     * This method is usually called automatically when new files are opened

Added: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.hornetq.core.buffers.ChannelBuffer;
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.ConcurrentHashSet;
+import org.hornetq.utils.Pair;
+
+/**
+ * 
+ * Super class for Journal maintenances such as clean up and Compactor
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   protected static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
+
+   private static final Logger log = Logger.getLogger(AbstractJournalUpdateTask.class);
+
+   protected final JournalImpl journal;
+
+   protected final SequentialFileFactory fileFactory;
+
+   protected JournalFile currentFile;
+
+   protected SequentialFile sequentialFile;
+
+   protected int fileID;
+
+   protected int nextOrderingID;
+
+   private ChannelBuffer writingChannel;
+
+   private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();
+
+   protected final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
+                                      final JournalImpl journal,
+                                      final Set<Long> recordsSnapshot,
+                                      final int nextOrderingID)
+   {
+      super();
+      this.journal = journal;
+      this.fileFactory = fileFactory;
+      this.nextOrderingID = nextOrderingID;
+      this.recordsSnapshot.addAll(recordsSnapshot);
+   }
+
+   // Public --------------------------------------------------------
+   
+   /**
+    * @param tmpRenameFile
+    * @param files
+    * @param newFiles
+    */
+   public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
+                                                 final List<JournalFile> files,
+                                                 final List<JournalFile> newFiles,
+                                                 final List<Pair<String, String>> renames) throws Exception
+   {
+
+      SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
+
+      try
+      {
+         controlFile.open(1);
+
+         ChannelBuffer renameBuffer = ChannelBuffers.dynamicBuffer(1);
+
+         renameBuffer.writeInt(-1);
+         renameBuffer.writeInt(-1);
+
+         HornetQBuffer filesToRename = ChannelBuffers.dynamicBuffer(1);
+
+         // DataFiles first
+
+         if (files == null)
+         {
+            filesToRename.writeInt(0);
+         }
+         else
+         {
+            filesToRename.writeInt(files.size());
+
+            for (JournalFile file : files)
+            {
+               filesToRename.writeUTF(file.getFile().getFileName());
+            }
+         }
+
+         // New Files second
+
+         if (newFiles == null)
+         {
+            filesToRename.writeInt(0);
+         }
+         else
+         {
+            filesToRename.writeInt(newFiles.size());
+
+            for (JournalFile file : newFiles)
+            {
+               filesToRename.writeUTF(file.getFile().getFileName());
+            }
+         }
+
+         // Renames from clean up third
+         if (renames == null)
+         {
+            filesToRename.writeInt(0);
+         }
+         else
+         {
+            filesToRename.writeInt(renames.size());
+            for (Pair<String, String> rename : renames)
+            {
+               filesToRename.writeUTF(rename.a);
+               filesToRename.writeUTF(rename.b);
+            }
+         }
+
+         JournalImpl.writeAddRecord(-1,
+                                    1,
+                                    (byte)0,
+                                    new JournalImpl.ByteArrayEncoding(filesToRename.array()),
+                                    JournalImpl.SIZE_ADD_RECORD + filesToRename.array().length,
+                                    renameBuffer);
+
+         ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
+
+         writeBuffer.put(renameBuffer.array(), 0, renameBuffer.writerIndex());
+
+         writeBuffer.rewind();
+
+         controlFile.write(writeBuffer, true);
+
+         return controlFile;
+      }
+      finally
+      {
+         controlFile.close();
+      }
+   }
+
+   /** Write pending output into file */
+   public void flush() throws Exception
+   {
+      if (writingChannel != null)
+      {
+         sequentialFile.position(0);
+         sequentialFile.write(writingChannel.toByteBuffer(), true);
+         sequentialFile.close();
+         newDataFiles.add(currentFile);
+      }
+
+      writingChannel = null;
+   }
+
+   public boolean lookupRecord(final long id)
+   {
+      return recordsSnapshot.contains(id);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   /**
+    * @throws Exception
+    */
+
+   protected void openFile() throws Exception
+   {
+      flush();
+
+      ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
+      writingChannel = ChannelBuffers.wrappedBuffer(bufferWrite);
+
+      currentFile = journal.getFile(false, false, false, true);
+      sequentialFile = currentFile.getFile();
+
+      sequentialFile.open(1);
+      fileID = nextOrderingID++;
+      currentFile = new JournalFileImpl(sequentialFile, fileID);
+
+      writingChannel.writeInt(fileID);
+   }
+
+   protected void addToRecordsSnaptsho(long id)
+   {
+      recordsSnapshot.add(id);
+   }
+
+   /**
+    * @return the writingChannel
+    */
+   protected ChannelBuffer getWritingChannel()
+   {
+      return writingChannel;
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A JournalCleaner
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCleaner extends AbstractJournalUpdateTask
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final HashMap<Long, AtomicInteger> transactionCounter = new HashMap<Long, AtomicInteger>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   /**
+    * @param fileFactory
+    * @param journal
+    * @param nextOrderingID
+    */
+   protected JournalCleaner(final SequentialFileFactory fileFactory,
+                            final JournalImpl journal,
+                            final Set<Long> recordsSnapshot,
+                            final int nextOrderingID) throws Exception
+   {
+      super(fileFactory, journal, recordsSnapshot, nextOrderingID);
+      openFile();
+   }
+
+   // Public --------------------------------------------------------
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#markAsDataFile(org.hornetq.core.journal.impl.JournalFile)
+    */
+   public void markAsDataFile(final JournalFile file)
+   {
+      // nothing to be done here
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecord(org.hornetq.core.journal.RecordInfo)
+    */
+   public void onReadAddRecord(final RecordInfo info) throws Exception
+   {
+      if (lookupRecord(info.id))
+      {
+         int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
+
+         JournalImpl.writeAddRecord(fileID,
+                                    info.id,
+                                    info.getUserRecordType(),
+                                    new JournalImpl.ByteArrayEncoding(info.data),
+                                    size,
+                                    getWritingChannel());
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecordTX(long, org.hornetq.core.journal.RecordInfo)
+    */
+   public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+   {
+      if (lookupRecord(recordInfo.id))
+      {
+         incrementTransactionCounter(transactionID);
+
+         int size = JournalImpl.SIZE_ADD_RECORD_TX + recordInfo.data.length;
+
+         JournalImpl.writeAddRecordTX(fileID,
+                                      transactionID,
+                                      recordInfo.id,
+                                      recordInfo.getUserRecordType(),
+                                      new JournalImpl.ByteArrayEncoding(recordInfo.data),
+                                      size,
+                                      getWritingChannel());
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadCommitRecord(long, int)
+    */
+   public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+   {
+      int txcounter = getTransactionCounter(transactionID);
+
+      JournalImpl.writeTransaction(fileID,
+                                   JournalImpl.COMMIT_RECORD,
+                                   transactionID,
+                                   null,
+                                   JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD,
+                                   txcounter,
+                                   getWritingChannel());
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecord(long)
+    */
+   public void onReadDeleteRecord(final long recordID) throws Exception
+   {
+      JournalImpl.writeDeleteRecord(fileID, recordID, JournalImpl.SIZE_DELETE_RECORD, getWritingChannel());
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecordTX(long, org.hornetq.core.journal.RecordInfo)
+    */
+   public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+   {
+      int size = JournalImpl.SIZE_DELETE_RECORD_TX + recordInfo.data.length;
+
+      incrementTransactionCounter(transactionID);
+
+      JournalImpl.writeDeleteRecordTransactional(fileID,
+                                                 transactionID,
+                                                 recordInfo.id,
+                                                 new JournalImpl.ByteArrayEncoding(recordInfo.data),
+                                                 size,
+                                                 getWritingChannel());
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadPrepareRecord(long, byte[], int)
+    */
+   public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+   {
+      int txcounter = getTransactionCounter(transactionID);
+
+      int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
+
+      JournalImpl.writeTransaction(fileID,
+                                   JournalImpl.PREPARE_RECORD,
+                                   transactionID,
+                                   new JournalImpl.ByteArrayEncoding(extraData),
+                                   size,
+                                   txcounter,
+                                   getWritingChannel());
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadRollbackRecord(long)
+    */
+   public void onReadRollbackRecord(final long transactionID) throws Exception
+   {
+      JournalImpl.writeRollback(fileID, transactionID, getWritingChannel());
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecord(org.hornetq.core.journal.RecordInfo)
+    */
+   public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+   {
+      if (lookupRecord(recordInfo.id))
+      {
+         int size = JournalImpl.SIZE_UPDATE_RECORD + recordInfo.data.length;
+         JournalImpl.writeUpdateRecord(fileID,
+                                       recordInfo.id,
+                                       recordInfo.userRecordType,
+                                       new JournalImpl.ByteArrayEncoding(recordInfo.data),
+                                       size,
+                                       getWritingChannel());
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecordTX(long, org.hornetq.core.journal.RecordInfo)
+    */
+   public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+   {
+      if (lookupRecord(recordInfo.id))
+      {
+         incrementTransactionCounter(transactionID);
+         int size = JournalImpl.SIZE_UPDATE_RECORD_TX + recordInfo.data.length;
+         JournalImpl.writeUpdateRecordTX(fileID,
+                                         transactionID,
+                                         recordInfo.id,
+                                         recordInfo.userRecordType,
+                                         new JournalImpl.ByteArrayEncoding(recordInfo.data),
+                                         size,
+                                         getWritingChannel());
+      }
+   }
+
+   /**
+    * Read files that depend on this file.
+    * Commits and rollbacks are also counted as negatives. We need to fix those also.
+    * @param dependencies
+    */
+   public void fixDependencies(final JournalFile originalFile, final ArrayList<JournalFile> dependencies)  throws Exception
+   {
+      for (JournalFile dependency : dependencies)
+      {
+         fixDependency(originalFile, dependency);
+      }
+      
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected int incrementTransactionCounter(final long transactionID)
+   {
+      AtomicInteger counter = transactionCounter.get(transactionID);
+      if (counter == null)
+      {
+         counter = new AtomicInteger(0);
+         transactionCounter.put(transactionID, counter);
+      }
+
+      return counter.incrementAndGet();
+   }
+
+   protected int getTransactionCounter(final long transactionID)
+   {
+      AtomicInteger counter = transactionCounter.get(transactionID);
+      if (counter == null)
+      {
+         return 0;
+      }
+      else
+      {
+         return counter.intValue();
+      }
+   }
+
+   // Private -------------------------------------------------------
+   private void fixDependency(final JournalFile originalFile, final JournalFile dependency) throws Exception
+   {
+      JournalReaderCallback txfix = new JournalReaderCallbackAbstract()
+      {
+         public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+         {
+            if (transactionCounter.containsKey(transactionID))
+            {
+               dependency.incNegCount(originalFile);
+            }
+         }
+         
+         public void onReadRollbackRecord(long transactionID) throws Exception
+         {
+            if (transactionCounter.containsKey(transactionID))
+            {
+               dependency.incNegCount(originalFile);
+            }
+         }
+      };
+      
+      JournalImpl.readJournalFile(fileFactory, dependency, txfix);
+   }
+
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.journal.impl;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -29,9 +28,8 @@
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.Pair;
 
 /**
  * A JournalCompactor
@@ -40,31 +38,11 @@
  *
  *
  */
-public class JournalCompactor implements JournalReaderCallback
+public class JournalCompactor extends AbstractJournalUpdateTask
 {
 
-   private static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
-
    private static final Logger log = Logger.getLogger(JournalCompactor.class);
-
-   private final JournalImpl journal;
-
-   private final SequentialFileFactory fileFactory;
-
-   private JournalFile currentFile;
-
-   private SequentialFile sequentialFile;
-
-   private int fileID;
-
-   private ChannelBuffer writingChannel;
-
-   private int nextOrderingID;
-
-   private final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
-
-   private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();;
-
+   
    // Snapshot of transactions that were pending when the compactor started
    private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
 
@@ -77,71 +55,10 @@
     *  we cache those updates. As soon as we are done we take the right account. */
    private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
 
-   /**
-    * @param tmpRenameFile
-    * @param files
-    * @param newFiles
-    */
-   public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
-                                                 final List<JournalFile> files,
-                                                 final List<JournalFile> newFiles) throws Exception
-   {
-
-      SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
-
-      try
-      {
-         controlFile.open(1);
-
-         ChannelBuffer renameBuffer = ChannelBuffers.dynamicBuffer(1);
-
-         renameBuffer.writeInt(-1);
-         renameBuffer.writeInt(-1);
-
-         HornetQBuffer filesToRename = ChannelBuffers.dynamicBuffer(1);
-
-         // DataFiles first
-
-         filesToRename.writeInt(files.size());
-
-         for (JournalFile file : files)
-         {
-            filesToRename.writeUTF(file.getFile().getFileName());
-         }
-
-         filesToRename.writeInt(newFiles.size());
-
-         for (JournalFile file : newFiles)
-         {
-            filesToRename.writeUTF(file.getFile().getFileName());
-         }
-
-         JournalImpl.writeAddRecord(-1,
-                                    1,
-                                    (byte)0,
-                                    new JournalImpl.ByteArrayEncoding(filesToRename.array()),
-                                    JournalImpl.SIZE_ADD_RECORD + filesToRename.array().length,
-                                    renameBuffer);
-
-         ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
-
-         writeBuffer.put(renameBuffer.array(), 0, renameBuffer.writerIndex());
-
-         writeBuffer.rewind();
-
-         controlFile.write(writeBuffer, true);
-
-         return controlFile;
-      }
-      finally
-      {
-         controlFile.close();
-      }
-   }
-
    public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
                                                 final List<String> dataFiles,
-                                                final List<String> newFiles) throws Exception
+                                                final List<String> newFiles,
+                                                final List<Pair<String, String>> renameFile) throws Exception
    {
       SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
 
@@ -182,6 +99,14 @@
                newFiles.add(input.readUTF());
             }
 
+            int numberRenames = input.readInt();
+            for (int i = 0; i < numberRenames; i++)
+            {
+               String from = input.readUTF();
+               String to = input.readUTF();
+               renameFile.add(new Pair<String, String>(from, to));
+            }
+
          }
 
          return controlFile;
@@ -212,10 +137,7 @@
                            final Set<Long> recordsSnapshot,
                            final int firstFileID)
    {
-      this.fileFactory = fileFactory;
-      this.journal = journal;
-      this.recordsSnapshot.addAll(recordsSnapshot);
-      nextOrderingID = firstFileID;
+      super(fileFactory, journal, recordsSnapshot, firstFileID);
    }
 
    /** This methods informs the Compactor about the existence of a pending (non committed) transaction */
@@ -248,7 +170,7 @@
       {
          for (long id : ids)
          {
-            recordsSnapshot.add(id);
+            addToRecordsSnaptsho(id);
          }
       }
 
@@ -256,7 +178,7 @@
       {
          for (long id : ids2)
          {
-            recordsSnapshot.add(id);
+            addToRecordsSnaptsho(id);
          }
       }
    }
@@ -284,40 +206,20 @@
       pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
    }
 
-   public boolean lookupRecord(final long id)
-   {
-      return recordsSnapshot.contains(id);
-   }
-
    private void checkSize(final int size) throws Exception
    {
-      if (writingChannel == null)
+      if (getWritingChannel() == null)
       {
          openFile();
       }
       else
       {
-         if (writingChannel.writerIndex() + size > writingChannel.capacity())
+         if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
          {
             openFile();
          }
       }
    }
-
-   /** Write pending output into file */
-   public void flush() throws Exception
-   {
-      if (writingChannel != null)
-      {
-         sequentialFile.position(0);
-         sequentialFile.write(writingChannel.toByteBuffer(), true);
-         sequentialFile.close();
-         newDataFiles.add(currentFile);
-      }
-
-      writingChannel = null;
-   }
-
    /**
     * Replay pending counts that happened during compacting
     */
@@ -342,7 +244,7 @@
 
    public void onReadAddRecord(final RecordInfo info) throws Exception
    {
-      if (recordsSnapshot.contains(info.id))
+      if (lookupRecord(info.id))
       {
          int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
 
@@ -353,7 +255,7 @@
                                     info.getUserRecordType(),
                                     new JournalImpl.ByteArrayEncoding(info.data),
                                     size,
-                                    writingChannel);
+                                    getWritingChannel());
 
          newRecords.put(info.id, new JournalRecord(currentFile, size));
       }
@@ -377,7 +279,7 @@
                                       info.getUserRecordType(),
                                       new JournalImpl.ByteArrayEncoding(info.data),
                                       size,
-                                      writingChannel);
+                                      getWritingChannel());
       }
       else
       {
@@ -388,6 +290,7 @@
 
    public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
    {
+      
       if (pendingTransactions.get(transactionID) != null)
       {
          // Sanity check, this should never happen
@@ -421,7 +324,7 @@
                                                     info.id,
                                                     new JournalImpl.ByteArrayEncoding(info.data),
                                                     size,
-                                                    writingChannel);
+                                                    getWritingChannel());
 
          newTransaction.addNegative(currentFile, info.id);
       }
@@ -447,11 +350,10 @@
          JournalImpl.writeTransaction(fileID,
                                       JournalImpl.PREPARE_RECORD,
                                       transactionID,
-                                      newTransaction,
                                       new JournalImpl.ByteArrayEncoding(extraData),
                                       size,
                                       newTransaction.getCounter(currentFile),
-                                      writingChannel);
+                                      getWritingChannel());
 
          newTransaction.prepare(currentFile);
 
@@ -470,7 +372,7 @@
 
    public void onReadUpdateRecord(final RecordInfo info) throws Exception
    {
-      if (recordsSnapshot.contains(info.id))
+      if (lookupRecord(info.id))
       {
          int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
 
@@ -492,7 +394,7 @@
                                        info.userRecordType,
                                        new JournalImpl.ByteArrayEncoding(info.data),
                                        size,
-                                       writingChannel);
+                                       getWritingChannel());
 
       }
    }
@@ -513,7 +415,7 @@
                                          info.userRecordType,
                                          new JournalImpl.ByteArrayEncoding(info.data),
                                          size,
-                                         writingChannel);
+                                         getWritingChannel());
 
          newTransaction.addPositive(currentFile, info.id, size);
       }
@@ -538,26 +440,6 @@
       return newTransaction;
    }
 
-   /**
-    * @throws Exception
-    */
-   private void openFile() throws Exception
-   {
-      flush();
-
-      ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
-      writingChannel = ChannelBuffers.wrappedBuffer(bufferWrite);
-
-      currentFile = journal.getFile(false, false, false, true);
-      sequentialFile = currentFile.getFile();
-            
-      sequentialFile.open(1);
-      fileID = nextOrderingID++;
-      currentFile = new JournalFileImpl(sequentialFile, fileID);
-
-      writingChannel.writeInt(fileID);
-   }
-
    private static abstract class CompactCommand
    {
       abstract void execute() throws Exception;
@@ -599,7 +481,7 @@
       private long id;
 
       private JournalFile usedFile;
-      
+
       private final int size;
 
       public UpdateCompactCommand(final long id, final JournalFile usedFile, final int size)

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -19,8 +19,6 @@
  * 
  * A JournalFile
  * 
- * TODO combine this with JournalFileImpl
- * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
@@ -33,6 +31,8 @@
    int getNegCount(JournalFile file);
 
    void incNegCount(JournalFile file);
+   
+   boolean resetNegCount(JournalFile file);
 
    int getPosCount();
 
@@ -49,6 +49,10 @@
    void setCanReclaim(boolean canDelete);
 
    boolean isCanReclaim();
+   
+   void setNeedCleanup(boolean needCleanup);
+   
+   boolean isNeedCleanup();
 
    long getOffset();
    

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -44,6 +44,8 @@
    private final AtomicInteger liveBytes = new AtomicInteger(0);
 
    private boolean canReclaim;
+   
+   private boolean needCleanup;   
 
    private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
    
@@ -70,7 +72,18 @@
    {
       return canReclaim;
    }
+   
+   public boolean isNeedCleanup()
+   {
+      return needCleanup;
+   }
 
+   public void setNeedCleanup(boolean needCleanup)
+   {
+      this.needCleanup = needCleanup;
+   }
+   
+
    public void setCanReclaim(final boolean canReclaim)
    {
       this.canReclaim = canReclaim;
@@ -94,6 +107,11 @@
          return count.intValue();
       }
    }
+   
+   public boolean resetNegCount(JournalFile file)
+   {
+      return negCounts.remove(file) != null;
+   }
 
    public void incPosCount()
    {

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -33,11 +33,11 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.hornetq.core.buffers.ChannelBuffer;
@@ -81,7 +81,7 @@
 
    private static final Logger log = Logger.getLogger(JournalImpl.class);
 
-   private static final boolean trace = log.isTraceEnabled();
+   private static final boolean trace = false;
 
    /** This is to be set to true at DEBUG & development only */
    private static final boolean LOAD_TRACE = false;
@@ -197,13 +197,16 @@
    // This will be set only while the JournalCompactor is being executed
    private volatile JournalCompactor compactor;
 
-   // Latch used to wait compactor finish, to make sure we won't stop the journal with the compactor running
    private final AtomicBoolean compactorRunning = new AtomicBoolean();
 
    private ExecutorService filesExecutor = null;
+   
+   private ExecutorService compactorExecutor = null;
 
    // Lock used during the append of records
-   private final Semaphore lockAppend = new Semaphore(1);
+   // This lock doesn't represent a global lock.
+   // After a record is appended, the usedFile can't be changed until the positives and negatives are updated
+   private final ReentrantLock lockAppend = new ReentrantLock();
 
    /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
    private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
@@ -313,7 +316,6 @@
    public static void writeTransaction(final int fileID,
                                        final byte recordType,
                                        final long txID,
-                                       final JournalTransaction tx,
                                        final EncodingSupport transactionData,
                                        final int size,
                                        final int numberOfRecords,
@@ -322,7 +324,7 @@
       bb.writeByte(recordType);
       bb.writeInt(fileID); // skip ID part
       bb.writeLong(txID);
-      bb.writeInt(numberOfRecords); // skip number of pendingTransactions part
+      bb.writeInt(numberOfRecords);
 
       if (transactionData != null)
       {
@@ -364,6 +366,18 @@
    }
 
    /**
+    * @param txID
+    * @param bb
+    */
+   public static void writeRollback(final int fileID, final long txID, ChannelBuffer bb)
+   {
+      bb.writeByte(ROLLBACK_RECORD);
+      bb.writeInt(fileID);
+      bb.writeLong(txID);
+      bb.writeInt(SIZE_ROLLBACK_RECORD);
+   }
+
+   /**
     * @param id
     * @param recordType
     * @param record
@@ -410,6 +424,19 @@
    }
 
    /**
+    * @param id
+    * @param size
+    * @param bb
+    */
+   public static void writeDeleteRecord(final int fileId, final long id, int size, ChannelBuffer bb)
+   {
+      bb.writeByte(DELETE_RECORD);
+      bb.writeInt(fileId);
+      bb.writeLong(id);
+      bb.writeInt(size);
+   }
+
+   /**
     * @param txID
     * @param id
     * @param record
@@ -836,7 +863,7 @@
 
          callback = getSyncCallback(sync);
 
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -845,7 +872,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
       }
       finally
@@ -896,7 +923,7 @@
 
          callback = getSyncCallback(sync);
 
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -914,7 +941,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
       }
       finally
@@ -956,14 +983,11 @@
 
          ChannelBuffer bb = newBuffer(size);
 
-         bb.writeByte(DELETE_RECORD);
-         bb.writeInt(-1); // skip ID part
-         bb.writeLong(id);
-         bb.writeInt(size);
+         writeDeleteRecord(-1, id, size, bb);
 
          callback = getSyncCallback(sync);
 
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -982,7 +1006,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
       }
       finally
@@ -1025,7 +1049,7 @@
 
          JournalTransaction tx = getTransactionInfo(txID);
 
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1034,7 +1058,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
       }
       finally
@@ -1074,7 +1098,7 @@
 
          JournalTransaction tx = getTransactionInfo(txID);
 
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1083,7 +1107,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
       }
       finally
@@ -1116,7 +1140,7 @@
 
          JournalTransaction tx = getTransactionInfo(txID);
 
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1125,7 +1149,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
       }
       finally
@@ -1174,9 +1198,9 @@
          int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + DataConstants.SIZE_INT;
          ChannelBuffer bb = newBuffer(size);
 
-         writeTransaction(-1, PREPARE_RECORD, txID, tx, transactionData, size, -1, bb);
+         writeTransaction(-1, PREPARE_RECORD, txID, transactionData, size, -1, bb);
 
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
@@ -1185,7 +1209,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
 
       }
@@ -1236,9 +1260,15 @@
 
          ChannelBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
 
-         writeTransaction(-1, COMMIT_RECORD, txID, tx, null, SIZE_COMPLETE_TRANSACTION_RECORD, -1, bb);
+         writeTransaction(-1,
+                          COMMIT_RECORD,
+                          txID,
+                          null,
+                          SIZE_COMPLETE_TRANSACTION_RECORD,
+                          -1 /* number of records on this transaction will be filled later inside append record */,
+                          bb);
 
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
@@ -1247,7 +1277,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
 
       }
@@ -1283,16 +1313,11 @@
             throw new IllegalStateException("Cannot find tx with id " + txID);
          }
 
-         int size = SIZE_ROLLBACK_RECORD;
+         ChannelBuffer bb = newBuffer(SIZE_ROLLBACK_RECORD);
 
-         ChannelBuffer bb = newBuffer(size);
+         writeRollback(-1, txID, bb);
 
-         bb.writeByte(ROLLBACK_RECORD);
-         bb.writeInt(-1); // skip ID part
-         bb.writeLong(txID);
-         bb.writeInt(size);
-
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
@@ -1301,7 +1326,7 @@
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
 
       }
@@ -1464,7 +1489,7 @@
 
          // This is where most of the work is done, taking most of the time of the compacting routine.
          // Notice there are no locks while this is being done.
-         
+
          // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
          // well
          for (final JournalFile file : dataFilesToProcess)
@@ -1479,12 +1504,12 @@
          // Usually tests will use this to hold the compacting while other structures are being updated.
          onCompactDone();
 
-         SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles());
-
          List<JournalFile> newDatafiles = null;
 
          JournalCompactor localCompactor = compactor;
 
+         SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles(), null);
+
          compactingLock.writeLock().lock();
          try
          {
@@ -2005,18 +2030,21 @@
 
       state = STATE_LOADED;
 
-      checkAndReclaimFiles();
+      checkReclaimStatus();
 
       return maxID;
    }
 
-   public void checkAndReclaimFiles() throws Exception
+   /** 
+    * @return true if cleanup was called
+    */
+   public boolean checkReclaimStatus() throws Exception
    {
-      // We can't start compacting while compacting is working
+      // We can't start reclaim while compacting is working
       compactingLock.readLock().lock();
       try
       {
-         checkReclaimStatus();
+         reclaimer.scan(getDataFiles());
 
          for (JournalFile file : dataFiles)
          {
@@ -2037,13 +2065,145 @@
                addFreeFile(file);
             }
          }
+
+         int nCleanup = 0;
+         for (JournalFile file : dataFiles)
+         {
+            if (file.isNeedCleanup())
+            {
+               nCleanup++;
+            }
+         }
+
+         // TODO: make this configurable
+         if (nCleanup > 5)
+         {
+            for (JournalFile file : dataFiles)
+            {
+               if (file.isNeedCleanup())
+               {
+                  final JournalFile cleanupFile = file;
+                  
+                  if (compactorRunning.compareAndSet(false, true))
+                  {
+                     // The cleanup should happen rarely.
+                     // but when it happens it needs to use a different thread,
+                     // or opening new files or any other executor's usage will be blocked while the cleanUp is being
+                     // processed.
+                     
+                     compactorExecutor.execute(new Runnable()
+                     {
+                        public void run()
+                        {
+                           try
+                           {
+                              cleanUp(cleanupFile);
+                           }
+                           catch (Exception e)
+                           {
+                              log.warn(e.getMessage(), e);
+                           }
+                           finally
+                           {
+                              compactorRunning.set(false);
+                              if (autoReclaim)
+                              {
+                                 scheduleReclaim();
+                              }
+                           }
+                        }
+                     });
+                  }
+                  return true;
+               }
+            }
+         }
       }
       finally
       {
          compactingLock.readLock().unlock();
       }
+
+      return false;
    }
 
+   public synchronized void cleanUp(final JournalFile file) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         return;
+      }
+
+      compactingLock.readLock().lock();
+
+      try
+      {
+         JournalCleaner cleaner = null;
+         ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
+         lockAppend.lock();
+         
+         try
+         {
+
+            log.info("Cleaning up file "  + file);
+            
+            if (file.getPosCount() == 0)
+            {
+               // nothing to be done
+               return;
+            }
+            
+            // We don't want this file to be reclaimed during the cleanup
+            file.incPosCount();
+
+            // The file will have all the deleted records removed, so all the NegCount towards the file being cleaned up
+            // could be reset
+            for (JournalFile jrnFile : dataFiles)
+            {
+               if (jrnFile.resetNegCount(file))
+               {
+                  dependencies.add(jrnFile);
+                  jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done  
+               }
+            }
+
+            cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+
+         readJournalFile(fileFactory, file, cleaner);
+
+         cleaner.flush();
+
+         cleaner.fixDependencies(file, dependencies);
+         
+         for (JournalFile jrnfile : dependencies)
+         {
+            jrnfile.decPosCount();
+         }
+         file.decPosCount();
+
+         SequentialFile tmpFile = cleaner.currentFile.getFile();
+         String tmpFileName = tmpFile.getFileName();
+         String cleanedFileName = file.getFile().getFileName();
+
+         SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
+                                                                                             cleanedFileName));
+         file.getFile().delete();
+         tmpFile.renameTo(cleanedFileName);
+         controlFile.delete();
+      }
+      finally
+      {
+         compactingLock.readLock().unlock();
+         log.info("Clean up on file "  + file + " done");
+      }
+
+    }
+
    public void checkCompact() throws Exception
    {
       if (compactMinFiles == 0)
@@ -2072,10 +2232,9 @@
             return;
          }
 
-         // We can't use the executor for the compacting... or we would lock files opening and creation (besides other
-         // operations)
-         // that would freeze the journal while compacting
-         Thread t = new Thread()
+         // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+         // operations (that will use the executor)
+         compactorExecutor.execute(new Runnable()
          {
             public void run()
             {
@@ -2093,9 +2252,7 @@
                   compactorRunning.set(false);
                }
             }
-         };
-
-         t.start();
+         });
       }
    }
 
@@ -2114,7 +2271,7 @@
 
    public String debug() throws Exception
    {
-      checkReclaimStatus();
+      reclaimer.scan(getDataFiles());
 
       StringBuilder builder = new StringBuilder();
 
@@ -2245,19 +2402,19 @@
       compactingLock.readLock().lock();
       try
       {
-         lockAppend.acquire();
+         lockAppend.lock();
          try
          {
             moveNextFile(true);
             if (autoReclaim)
             {
-               checkAndReclaimFiles();
+               checkReclaimStatus();
             }
             debugWait();
          }
          finally
          {
-            lockAppend.release();
+            lockAppend.unlock();
          }
       }
       finally
@@ -2287,6 +2444,8 @@
       }
 
       filesExecutor = Executors.newSingleThreadExecutor();
+      
+      compactorExecutor = Executors.newCachedThreadPool();
 
       fileFactory.start();
 
@@ -2302,7 +2461,7 @@
          throw new IllegalStateException("Journal is already stopped");
       }
 
-      lockAppend.acquire();
+      lockAppend.lock();
 
       try
       {
@@ -2337,7 +2496,7 @@
       }
       finally
       {
-         lockAppend.release();
+         lockAppend.unlock();
       }
    }
 
@@ -2347,6 +2506,23 @@
    // Protected
    // -----------------------------------------------------------------------------
 
+   protected SequentialFile createControlFile(List<JournalFile> files,
+                                              List<JournalFile> newFiles,
+                                              Pair<String, String> cleanupRename) throws Exception
+   {
+      ArrayList<Pair<String, String>> cleanupList;
+      if (cleanupRename == null)
+      {
+         cleanupList = null;
+      }
+      else
+      {
+         cleanupList = new ArrayList<Pair<String, String>>();
+         cleanupList.add(cleanupRename);
+      }
+      return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, cleanupList);
+   }
+
    protected void deleteControlFile(final SequentialFile controlFile) throws Exception
    {
       controlFile.delete();
@@ -2374,14 +2550,6 @@
    {
    }
 
-   /**
-    * @throws Exception
-    */
-   protected SequentialFile createControlFile(final List<JournalFile> files, final List<JournalFile> newFiles) throws Exception
-   {
-      return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
-   }
-
    // Private
    // -----------------------------------------------------------------------------
 
@@ -2406,11 +2574,6 @@
       }
    }
 
-   private void checkReclaimStatus() throws Exception
-   {
-      reclaimer.scan(getDataFiles());
-   }
-
    // Discard the old JournalFile and set it with a new ID
    private JournalFile reinitializeFile(final JournalFile file) throws Exception
    {
@@ -2535,7 +2698,7 @@
          file.read(bb);
 
          int fileID = bb.getInt();
-         
+
          fileFactory.releaseBuffer(bb);
 
          bb = null;
@@ -2546,7 +2709,7 @@
          }
 
          int fileNameID = getFileNameID(fileName);
-         
+
          // The compactor could create a fileName but use a previously assigned ID.
          // Because of that we need to take both parts into account
          if (nextFileID.get() < fileNameID)
@@ -2554,7 +2717,6 @@
             nextFileID.set(fileNameID);
          }
 
-
          orderedFiles.add(new JournalFileImpl(file, fileID));
 
          file.close();
@@ -2594,7 +2756,7 @@
             throw new IllegalArgumentException("Record is too large to store " + size);
          }
 
-         // Disable auto flush on the timer. The Timer should'nt flush anything 
+         // Disable auto flush on the timer. The Timer should'nt flush anything
          currentFile.getFile().disableAutoFlush();
 
          if (!currentFile.getFile().fits(size))
@@ -2669,13 +2831,13 @@
       }
 
    }
-   
+
    /** Get the ID part of the name */
    private int getFileNameID(String fileName)
    {
       try
       {
-         return Integer.parseInt(fileName.substring(filePrefix.length()+1, fileName.indexOf('.')));
+         return Integer.parseInt(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
       }
       catch (Throwable e)
       {
@@ -2819,34 +2981,7 @@
 
       if (autoReclaim && !synchronous)
       {
-         filesExecutor.execute(new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  checkAndReclaimFiles();
-               }
-               catch (Exception e)
-               {
-                  log.error(e.getMessage(), e);
-               }
-            }
-         });
-         filesExecutor.execute(new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  checkCompact();
-               }
-               catch (Exception e)
-               {
-                  log.error(e.getMessage(), e);
-               }
-            }
-         });
+         scheduleReclaim();
       }
 
       JournalFile nextFile = null;
@@ -2863,6 +2998,32 @@
       return nextFile;
    }
 
+   private void scheduleReclaim()
+   {
+      if (state != STATE_LOADED)
+      {
+         return;
+      }
+
+      filesExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               if (!checkReclaimStatus())
+               {
+                  checkCompact();
+               }
+            }
+            catch (Exception e)
+            {
+               log.error(e.getMessage(), e);
+            }
+         }
+      });
+   }
+
    /** 
     * 
     * Open a file and place it into the openedFiles queue
@@ -3002,8 +3163,9 @@
    {
       ArrayList<String> dataFiles = new ArrayList<String>();
       ArrayList<String> newFiles = new ArrayList<String>();
+      ArrayList<Pair<String, String>> renames = new ArrayList<Pair<String, String>>();
 
-      SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles);
+      SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles, renames);
       if (controlFile != null)
       {
          for (String dataFile : dataFiles)
@@ -3026,6 +3188,19 @@
             }
          }
 
+         for (Pair<String, String> rename : renames)
+         {
+            SequentialFile fileTmp = fileFactory.createSequentialFile(rename.a, 1);
+            SequentialFile fileTo = fileFactory.createSequentialFile(rename.b, 1);
+            // We should do the rename only if the tmp file still exist, or else we could
+            // delete a valid file depending on where the crash occured during the control file delete
+            if (fileTmp.exists())
+            {
+               fileTo.delete();
+               fileTmp.renameTo(rename.b);
+            }
+         }
+
          controlFile.delete();
       }
 
@@ -3236,7 +3411,7 @@
       {
          try
          {
-            lockAppend.acquire();
+            lockAppend.lock();
 
             HornetQBuffer bb = newBuffer(128 * 1024);
 
@@ -3245,7 +3420,7 @@
                appendRecord(bb, false, false, null, null);
             }
 
-            lockAppend.release();
+            lockAppend.unlock();
          }
          catch (Exception e)
          {

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -281,7 +281,7 @@
    }
 
    /** 
-    * The caller of this method needs to guarantee lock.acquire at the journal. (unless this is being called from load what is a single thread process).
+    * The caller of this method needs to guarantee appendLock.lock at the journal. (unless this is being called from load what is a single thread process).
     * */
    public void commit(final JournalFile file)
    {
@@ -371,7 +371,7 @@
    }
 
    /** 
-    * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+    * The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context.
     * or else potFilesMap could be affected
     * */
    public void rollback(final JournalFile file)
@@ -402,7 +402,7 @@
    }
 
    /** 
-    * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+    * The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context.
     * or else potFilesMap could be affected
     * */
    public void prepare(final JournalFile file)

Modified: trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -17,8 +17,6 @@
 
 /**
  * 
- * <p>A ReclaimerTest</p>
- * 
  * <p>The journal consists of an ordered list of journal files Fn where 0 <= n <= N</p>
  * 
  * <p>A journal file can contain either positives (pos) or negatives (neg)</p>
@@ -33,6 +31,7 @@
  * which are also marked for deletion in the same pass of the algorithm.</p>
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 public class Reclaimer
@@ -54,6 +53,8 @@
 
          JournalFile currentFile = files[i];
 
+         currentFile.setNeedCleanup(false);
+
          int posCount = currentFile.getPosCount();
 
          int totNeg = 0;
@@ -101,6 +102,7 @@
                         trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
                      }
 
+                     file.setNeedCleanup(true);
                      currentFile.setCanReclaim(false);
 
                      break;

Modified: trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -25,6 +25,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.Message;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.JournalType;
 import org.hornetq.tests.util.ServiceTestBase;
@@ -67,6 +68,101 @@
 
    // Public --------------------------------------------------------
 
+   public void testCleanupAIO() throws Throwable
+   {
+      for (int i = 0; i < 3; i++)
+      {
+         System.out.println("Test # " + i);
+         internalTestCleanup(JournalType.ASYNCIO);
+         tearDown();
+         setUp();
+      }
+   }
+
+   public void testCleanupNIO() throws Throwable
+   {
+      for (int i = 0; i < 3; i++)
+      {
+         System.out.println("Test # " + i);
+         internalTestCleanup(JournalType.NIO);
+         tearDown();
+         setUp();
+      }
+   }
+
+   private void internalTestCleanup(JournalType journalType) throws Throwable
+   {
+      setupServer(journalType);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      ClientProducer prod = session.createProducer(AD1);
+
+      for (int i = 0; i < 500; i++)
+      {
+         prod.send(session.createClientMessage(true));
+      }
+
+      session.commit();
+
+      prod.close();
+
+      ClientConsumer cons = session.createConsumer(Q2);
+      prod = session.createProducer(AD2);
+
+      session.start();
+
+      for (int i = 0; i < 200; i++)
+      {
+         System.out.println("Iteration " + i);
+         for (int j = 0; j < 1000; j++)
+         {
+            Message msg = session.createClientMessage(true);
+            msg.getBody().writeBytes(new byte[1024]);
+
+            prod.send(msg);
+         }
+
+         session.commit();
+
+         for (int j = 0; j < 1000; j++)
+         {
+            ClientMessage msg = cons.receive(2000);
+            assertNotNull(msg);
+            msg.acknowledge();
+         }
+
+         session.commit();
+
+      }
+
+      assertNull(cons.receiveImmediate());
+
+      session.close();
+
+      server.stop();
+
+      server.start();
+
+      session = sf.createSession(false, true, true);
+      cons = session.createConsumer(Q1);
+      session.start();
+
+      for (int i = 0; i < 500; i++)
+      {
+         ClientMessage msg = cons.receive(1000);
+         assertNotNull(msg);
+         msg.acknowledge();
+      }
+
+      assertNull(cons.receiveImmediate());
+
+      prod = session.createProducer(AD2);
+
+      session.close();
+
+   }
+
    public void testMultiProducerAndCompactAIO() throws Throwable
    {
       internalTestMultiProducer(JournalType.ASYNCIO);
@@ -107,11 +203,11 @@
       {
          session.close();
       }
-      
+
       server.stop();
-      
+
       setupServer(journalType);
-      
+
       final AtomicInteger numberOfMessages = new AtomicInteger(0);
       final int NUMBER_OF_FAST_MESSAGES = 100000;
       final int SLOW_INTERVAL = 100;
@@ -292,7 +388,7 @@
             assertNotNull(msg);
             msg.acknowledge();
          }
-         
+
          assertNull(cons.receiveImmediate());
 
       }
@@ -327,7 +423,7 @@
 
       config.setJournalType(journalType);
 
-      config.setJournalCompactMinFiles(3);
+      config.setJournalCompactMinFiles(10);
       config.setJournalCompactPercentage(50);
 
       server = createServer(true, config);
@@ -363,16 +459,27 @@
       }
 
       sess.close();
-
-      sf = createInVMFactory();
    }
 
    @Override
    protected void tearDown() throws Exception
    {
-      sf.close();
+      try
+      {
+         if (sf != null)
+         {
+            sf.close();
+         }
 
-      server.stop();
+         if (server != null)
+         {
+            server.stop();
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace(); // system.out -> junit reports
+      }
 
       server = null;
 

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -68,17 +68,26 @@
          SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst.new", 1);
          newFiles.add(new JournalFileImpl(file, 0));
       }
+      
+      ArrayList<Pair<String, String>> renames = new ArrayList<Pair<String, String>>();
+      renames.add(new Pair<String, String>("a", "b"));
+      renames.add(new Pair<String, String>("c", "d"));
+      
+      
 
-      JournalCompactor.writeControlFile(fileFactory, dataFiles, newFiles);
+      JournalCompactor.writeControlFile(fileFactory, dataFiles, newFiles, renames);
 
       ArrayList<String> strDataFiles = new ArrayList<String>();
 
       ArrayList<String> strNewFiles = new ArrayList<String>();
+      
+      ArrayList<Pair<String, String>> renamesRead = new ArrayList<Pair<String, String>>();
 
-      assertNotNull(JournalCompactor.readControlFile(fileFactory, strDataFiles, strNewFiles));
+      assertNotNull(JournalCompactor.readControlFile(fileFactory, strDataFiles, strNewFiles, renamesRead));
 
       assertEquals(dataFiles.size(), strDataFiles.size());
       assertEquals(newFiles.size(), strNewFiles.size());
+      assertEquals(renames.size(), renamesRead.size());
 
       Iterator<String> iterDataFiles = strDataFiles.iterator();
       for (JournalFile file : dataFiles)
@@ -94,6 +103,16 @@
       }
       assertFalse(iterNewFiles.hasNext());
 
+
+      Iterator<Pair<String,String>> iterRename = renames.iterator();
+      for (Pair<String,String> rename : renamesRead)
+      {
+         Pair<String, String> original = iterRename.next();
+         assertEquals(original.a, rename.a);
+         assertEquals(original.b, rename.b);
+      }
+      assertFalse(iterNewFiles.hasNext());
+
    }
 
    public void testCrashRenamingFiles() throws Exception
@@ -197,11 +216,11 @@
       {
 
          @Override
-         protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
+         protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles, Pair<String, String> pair) throws Exception
          {
             if (createControlFile)
             {
-               return super.createControlFile(files, newFiles);
+               return super.createControlFile(files, newFiles, pair);
             }
             else
             {
@@ -517,7 +536,7 @@
 
          journal.forceMoveNextFile();
 
-         journal.checkAndReclaimFiles();
+         journal.checkReclaimStatus();
       }
 
       long transactionID = 0;
@@ -666,9 +685,9 @@
          long id = idGenerator.generateID();
          listToDelete.add(id);
 
+         // Append Record Transaction will make the recordSize as exactly recordLength (discounting SIZE_ADD_RECORD_TX)
          addTx(tx, id);
 
-         // Append Record Transaction will make the recordSize as exactly recordLength (discounting SIZE_ADD_RECORD_TX)
          expectedSizes.add(recordLength);
          journal.forceMoveNextFile();
 

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -204,7 +204,7 @@
       }
 
       impl.forceMoveNextFile();
-      impl.checkAndReclaimFiles();
+      impl.checkReclaimStatus();
       
       impl.stop();
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -156,7 +156,7 @@
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       setupAndLoadJournal(JOURNAL_SIZE, 10);
 
@@ -270,7 +270,7 @@
 
       journalImpl.setAutoReclaim(false);
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       journalImpl.debugWait();
 
@@ -316,7 +316,7 @@
 
       journalImpl.setAutoReclaim(false);
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       journalImpl.debugWait();
 
@@ -356,7 +356,7 @@
 
       assertEquals(1000, records.get(0).id);
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       log.debug(journalImpl.debug());
 
@@ -484,7 +484,7 @@
       assertEquals(10, records.size());
       assertEquals(0, transactions.size());
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(10, journalImpl.getDataFilesCount());
 
@@ -504,7 +504,7 @@
 
       journalImpl.appendAddRecord(101, (byte)1, new SimpleEncoding(5, (byte)1), false);
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(1, journalImpl.getDataFilesCount());
 
@@ -595,7 +595,7 @@
 
       assertEquals(0, records.size());
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(0, journalImpl.getDataFilesCount());
 
@@ -660,7 +660,7 @@
 
       assertEquals(20, records.size());
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
    }
 
@@ -695,7 +695,7 @@
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       setupAndLoadJournal(JOURNAL_SIZE, 100, 2);
 
@@ -756,7 +756,7 @@
 
       assertEquals(0, records.size());
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(0, journalImpl.getDataFilesCount());
 
@@ -798,7 +798,7 @@
       // the
       // file
       journalImpl.forceMoveNextFile();
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       setupAndLoadJournal(JOURNAL_SIZE, 100);
 
@@ -836,7 +836,7 @@
 
       journalImpl.appendCommitRecord(2l, false);
       journalImpl.forceMoveNextFile();
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       setupAndLoadJournal(JOURNAL_SIZE, 100);
 
@@ -931,7 +931,7 @@
          assertEquals((byte)1, transactions.get(0).extraData[i]);
       }
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(10, journalImpl.getDataFilesCount());
 
@@ -943,7 +943,7 @@
 
       assertEquals(10, records.size());
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       for (int i = 0; i < 10; i++)
       {
@@ -980,7 +980,7 @@
 
       // Reclaiming should still be able to reclaim a file if a transaction was
       // ignored
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(2, factory.listFiles("tt").size());
 
@@ -1053,7 +1053,7 @@
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(0, journalImpl.getDataFilesCount());
 
@@ -1147,7 +1147,7 @@
 
       journalImpl.forceMoveNextFile();
       journalImpl.debugWait();
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(0, transactions.size());
       assertEquals(0, journalImpl.getDataFilesCount());
@@ -1238,7 +1238,7 @@
 
       journalImpl.debugWait();
 
-      journalImpl.checkAndReclaimFiles();
+      journalImpl.checkReclaimStatus();
 
       assertEquals(0, journalImpl.getDataFilesCount());
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -104,7 +104,7 @@
    protected void checkAndReclaimFiles() throws Exception
    {
       journal.debugWait();
-      journal.checkAndReclaimFiles();
+      journal.checkReclaimStatus();
       journal.debugWait();
    }
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -1057,7 +1057,7 @@
 
       // Now restart
 
-      journal.checkAndReclaimFiles();
+      journal.checkReclaimStatus();
 
       System.out.println("journal:" + journal.debug());
 
@@ -3094,7 +3094,7 @@
       System.out.println("*****************************************");
 
       journal.forceMoveNextFile();
-      journal.checkAndReclaimFiles();
+      journal.checkReclaimStatus();
 
       assertEquals(0, journal.getDataFilesCount());
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -354,6 +354,23 @@
       assertCanDelete(1);
       assertCantDelete(2);
    }
+   
+   public void testCleanup() throws Exception
+   {
+      setup(3);
+      setupPosNeg(0, 11, 0, 0, 0);
+      setupPosNeg(1, 1, 10, 0, 0);
+      setupPosNeg(2, 1, 0, 1, 0);
+      
+      reclaimer.scan(files);
+      
+      debugFiles();
+      
+      assertCantDelete(0);
+      assertTrue(files[0].isNeedCleanup());
+      assertCantDelete(1);
+      assertCantDelete(2);
+   }
 
    public void testThreeFiles10() throws Exception
    {
@@ -708,6 +725,18 @@
          }
       }
    }
+   
+   private void debugFiles()
+   {
+      for (int i = 0 ; i < files.length; i++)
+      {
+         System.out.println("[" + i + "]=" + files[i].getPosCount() + ", canDelete = " + files[i].isCanReclaim() + ", cleanup = " + files[i].isNeedCleanup());
+         for (int j = 0 ; j <= i; j++)
+         {
+            System.out.println("..." + files[i].getNegCount(files[j]));
+         }
+      }
+   }
 
    private void assertCanDelete(final int... fileNumber)
    {
@@ -738,7 +767,10 @@
       private int posCount;
 
       private boolean canDelete;
+      
+      private boolean needCleanup;
 
+
       public void extendOffset(final int delta)
       {
       }
@@ -913,5 +945,30 @@
       {
          return 0;
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalFile#isNeedCleanup()
+       */
+      public boolean isNeedCleanup()
+      {
+         return this.needCleanup;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalFile#resetNegCount(org.hornetq.core.journal.impl.JournalFile)
+       */
+      public boolean resetNegCount(JournalFile file)
+      {
+         return false;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalFile#setNeedCleanup(boolean)
+       */
+      public void setNeedCleanup(boolean needCleanup)
+      {
+         this.needCleanup = needCleanup;
+         
+      }
    }
 }

Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java	2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java	2009-09-16 21:29:29 UTC (rev 7963)
@@ -83,7 +83,7 @@
             System.out.println("user record: " + record);
          }
 
-         journal.checkAndReclaimFiles();
+         journal.checkReclaimStatus();
 
          System.out.println("Data = " + journal.debug());
 



More information about the hornetq-commits mailing list