[hornetq-commits] JBoss hornetq SVN: r9599 - in branches/Branch_2_1: src/main/org/hornetq/core/journal/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 26 17:11:43 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-26 17:11:42 -0400 (Thu, 26 Aug 2010)
New Revision: 9599

Added:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
Modified:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-485 Improvements on compacting and file management on the journal

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java	2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -49,8 +49,6 @@
 
    void forceMoveNextFile() throws Exception;
 
-   void forceMoveNextFile(boolean synchronous) throws Exception;
-
    void setAutoReclaim(boolean autoReclaim);
 
    boolean isAutoReclaim();

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -54,6 +54,8 @@
    protected JournalFile currentFile;
 
    protected SequentialFile sequentialFile;
+   
+   protected final FilesRepository filesRepository;
 
    protected long nextOrderingID;
 
@@ -69,11 +71,13 @@
 
    protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
                                        final JournalImpl journal,
+                                       final FilesRepository filesRepository,
                                        final Set<Long> recordsSnapshot,
                                        final long nextOrderingID)
    {
       super();
       this.journal = journal;
+      this.filesRepository = filesRepository;
       this.fileFactory = fileFactory;
       this.nextOrderingID = nextOrderingID;
       this.recordsSnapshot.addAll(recordsSnapshot);
@@ -212,7 +216,7 @@
 
       writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
 
-      currentFile = journal.getFile(false, false, false, true);
+      currentFile = filesRepository.takeFile(false, false, false, true);
       
       sequentialFile = currentFile.getFile();
 

Added: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java	                        (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -0,0 +1,602 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class FilesRepository
+{
+
+   private static final Logger log = Logger.getLogger(JournalImpl.class);
+
+   private static final boolean trace = false;
+
+   // This method exists just to make debug easier.
+   // I could replace log.trace by log.info temporarily while I was debugging
+   // Journal
+   private static final void trace(final String message)
+   {
+      log.trace(message);
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final SequentialFileFactory fileFactory;
+
+   private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+
+   private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+
+   private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+   private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+
+   private final AtomicLong nextFileID = new AtomicLong(0);
+
+   private final int maxAIO;
+
+   private final int minFiles;
+
+   private final int fileSize;
+
+   private final String filePrefix;
+
+   private final String fileExtension;
+
+   private final int userVersion;
+   
+   private Executor filesExecutor;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public FilesRepository(final SequentialFileFactory fileFactory,
+                          final String filePrefix,
+                          final String fileExtension,
+                          final int userVersion,
+                          final int maxAIO,
+                          final int fileSize,
+                          final int minFiles)
+   {
+      this.fileFactory = fileFactory;
+      this.maxAIO = maxAIO;
+      this.filePrefix = filePrefix;
+      this.fileExtension = fileExtension;
+      this.minFiles = minFiles;
+      this.fileSize = fileSize;
+      this.userVersion = userVersion;
+   }
+
+   // Public --------------------------------------------------------
+
+   
+   public void setExecutor(Executor executor)
+   {
+      this.filesExecutor = executor;
+   }
+   
+   public void clear()
+   {
+      dataFiles.clear();
+
+      pendingCloseFiles.clear();
+
+      freeFiles.clear();
+
+      openedFiles.clear();
+   }
+
+   public int getMaxAIO()
+   {
+      return maxAIO;
+   }
+
+   public String getFileExtension()
+   {
+      return fileExtension;
+   }
+
+   public String getFilePrefix()
+   {
+      return filePrefix;
+   }
+
+   public void calculateNextfileID(List<JournalFile> files)
+   {
+
+      for (JournalFile file : files)
+      {
+         long fileID = file.getFileID();
+         if (nextFileID.get() < fileID)
+         {
+            nextFileID.set(fileID);
+         }
+
+         long fileNameID = getFileNameID(file.getFile().getFileName());
+
+         // The compactor could create a fileName but use a previously assigned ID.
+         // Because of that we need to take both parts into account
+         if (nextFileID.get() < fileNameID)
+         {
+            nextFileID.set(fileNameID);
+         }
+      }
+
+   }
+
+   public void ensureMinFiles() throws Exception
+   {
+      // FIXME - size() involves a scan
+      int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+
+      if (filesToCreate > 0)
+      {
+         for (int i = 0; i < filesToCreate; i++)
+         {
+            // Keeping all files opened can be very costly (mainly on AIO)
+            freeFiles.add(createFile(false, false, true, false));
+         }
+      }
+
+   }
+
+   public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+   {
+      if (multiAIO)
+      {
+         file.getFile().open();
+      }
+      else
+      {
+         file.getFile().open(1, false);
+      }
+
+      file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+   }
+
+   // Data File Operations ==========================================
+
+   public JournalFile[] getDataFilesArray()
+   {
+      return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+   }
+
+   public JournalFile pollLastDataFile()
+   {
+      /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
+
+      JournalFile currentFile = null;
+      while (iter.hasNext())
+      {
+         currentFile = iter.next();
+
+         if (!iter.hasNext())
+         {
+            iter.remove();
+         }
+      } 
+      
+      return currentFile; */
+
+      return dataFiles.pollLast();
+   }
+
+   public void removeDataFile(JournalFile file)
+   {
+      if (!dataFiles.remove(file))
+      {
+         log.warn("Could not remove file " + file + " from the list of data files");
+      }
+   }
+
+   public int getDataFilesCount()
+   {
+      return dataFiles.size();
+   }
+
+   public Collection<JournalFile> getDataFiles()
+   {
+      return dataFiles;
+   }
+
+   public void clearDataFiles()
+   {
+      dataFiles.clear();
+   }
+
+   public void addDataFileOnTop(JournalFile file)
+   {
+      dataFiles.addFirst(file);
+   }
+
+   public void addDataFileOnBottom(JournalFile file)
+   {
+      dataFiles.add(file);
+   }
+
+   // Free File Operations ==========================================
+
+   public int getFreeFilesCount()
+   {
+      return freeFiles.size();
+   }
+
+   /**
+    * Add directly to the freeFiles structure without reinitializing the file.
+    * used on load() only
+    */
+   public void addFreeFileNoInit(JournalFile file)
+   {
+      freeFiles.add(file);
+   }
+
+   /**
+    * @param file
+    * @throws Exception
+    */
+   public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+   {
+      if (file.getFile().size() != fileSize)
+      {
+         log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
+         file.getFile().delete();
+      }
+      else
+      // FIXME - size() involves a scan!!!
+      if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+      {
+         // Re-initialise it
+
+         if (trace)
+         {
+            trace("Adding free file " + file);
+         }
+
+         JournalFile jf = reinitializeFile(file);
+
+         if (renameTmp)
+         {
+            jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+         }
+
+         freeFiles.add(jf);
+      }
+      else
+      {
+         file.getFile().delete();
+      }
+   }
+
+   public Collection<JournalFile> getFreeFiles()
+   {
+      return freeFiles;
+   }
+
+   public JournalFile getFreeFile()
+   {
+      return freeFiles.remove();
+   }
+
+   // Opened files operations =======================================
+
+   public int getOpenedFilesCount()
+   {
+      return openedFiles.size();
+   }
+
+   public void drainClosedFiles()
+   {
+      JournalFile file;
+      try
+      {
+         while ((file = pendingCloseFiles.poll()) != null)
+         {
+            file.getFile().close();
+         }
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+      }
+
+   }
+
+   /** 
+    * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
+    * <p>In case there are no cached opened files, this method will block until the file was opened,
+    * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p> 
+    * */
+   public JournalFile openFile() throws InterruptedException
+   {
+      if (trace)
+      {
+         trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+      }
+
+      Runnable run = new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               pushOpenedFile();
+            }
+            catch (Exception e)
+            {
+               log.error(e.getMessage(), e);
+            }
+         }
+      };
+
+      if (filesExecutor == null)
+      {
+         run.run();
+      }
+      else
+      {
+         filesExecutor.execute(run);
+      }
+
+      JournalFile nextFile = null;
+
+      while (nextFile == null)
+      {
+         nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+         if (nextFile == null)
+         {
+            log.warn("Couldn't open a file in 60 Seconds", new Exception("Warning: Couldn't open a file in 60 Seconds"));
+         }
+      }
+
+      if (trace)
+      {
+         trace("Returning file " + nextFile);
+      }
+
+      return nextFile;
+   }
+   
+   /** 
+    * 
+    * Open a file and place it into the openedFiles queue
+    * */
+   public void pushOpenedFile() throws Exception
+   {
+      JournalFile nextOpenedFile = takeFile(true, true, true, false);
+
+      if (trace)
+      {
+         trace("pushing openFile " + nextOpenedFile);
+      }
+
+      openedFiles.offer(nextOpenedFile);
+   }
+
+   
+   public void closeFile(final JournalFile file)
+   {
+      fileFactory.deactivateBuffer();
+      pendingCloseFiles.add(file);
+      dataFiles.add(file);
+
+      Runnable run = new Runnable()
+      {
+         public void run()
+         {
+            drainClosedFiles();
+         }
+      };
+
+      if (filesExecutor == null)
+      {
+         run.run();
+      }
+      else
+      {
+         filesExecutor.execute(run);
+      }
+
+   }
+   
+   
+   /**
+    * This will get a File from freeFile without initializing it
+    * @return
+    * @throws Exception
+    */
+   public JournalFile takeFile(final boolean keepOpened,
+                       final boolean multiAIO,
+                       final boolean initFile,
+                       final boolean tmpCompactExtension) throws Exception
+   {
+      JournalFile nextOpenedFile = null;
+
+      nextOpenedFile = freeFiles.poll();
+
+      if (nextOpenedFile == null)
+      {
+         nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+      }
+      else
+      {
+         if (tmpCompactExtension)
+         {
+            SequentialFile sequentialFile = nextOpenedFile.getFile();
+            sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+         }
+
+         if (keepOpened)
+         {
+            openFile(nextOpenedFile, multiAIO);
+         }
+      }
+      return nextOpenedFile;
+   }
+
+   
+   
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+    * @param keepOpened
+    * @return
+    * @throws Exception
+    */
+   private JournalFile createFile(final boolean keepOpened,
+                                  final boolean multiAIO,
+                                  final boolean init,
+                                  final boolean tmpCompact) throws Exception
+   {
+      long fileID = generateFileID();
+
+      String fileName;
+
+      fileName = createFileName(tmpCompact, fileID);
+
+      if (trace)
+      {
+         trace("Creating file " + fileName);
+      }
+
+      String tmpFileName = fileName + ".tmp";
+
+      SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+
+      sequentialFile.open(1, false);
+
+      if (init)
+      {
+         sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+
+         JournalImpl.initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
+      }
+
+      long position = sequentialFile.position();
+
+      sequentialFile.close();
+
+      if (trace)
+      {
+         trace("Renaming file " + tmpFileName + " as " + fileName);
+      }
+
+      sequentialFile.renameTo(fileName);
+
+      if (keepOpened)
+      {
+         if (multiAIO)
+         {
+            sequentialFile.open();
+         }
+         else
+         {
+            sequentialFile.open(1, false);
+         }
+         sequentialFile.position(position);
+      }
+
+      return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+   }
+
+   /**
+    * @param tmpCompact
+    * @param fileID
+    * @return
+    */
+   private String createFileName(final boolean tmpCompact, long fileID)
+   {
+      String fileName;
+      if (tmpCompact)
+      {
+         fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+      }
+      else
+      {
+         fileName = filePrefix + "-" + fileID + "." + fileExtension;
+      }
+      return fileName;
+   }
+
+   private long generateFileID()
+   {
+      return nextFileID.incrementAndGet();
+   }
+
+   /** Get the ID part of the name */
+   private long getFileNameID(final String fileName)
+   {
+      try
+      {
+         return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+      }
+      catch (Throwable e)
+      {
+         log.warn("Impossible to get the ID part of the file name " + fileName, e);
+         return 0;
+      }
+   }
+
+   // Discard the old JournalFile and set it with a new ID
+   private JournalFile reinitializeFile(final JournalFile file) throws Exception
+   {
+      long newFileID = generateFileID();
+
+      SequentialFile sf = file.getFile();
+
+      sf.open(1, false);
+
+      int position = JournalImpl.initFileHeader(this.fileFactory, sf, userVersion, newFileID);
+
+      JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+
+      sf.position(position);
+
+      sf.close();
+
+      return jf;
+   }
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -144,10 +144,11 @@
 
    public JournalCompactor(final SequentialFileFactory fileFactory,
                            final JournalImpl journal,
+                           final FilesRepository filesRepository,
                            final Set<Long> recordsSnapshot,
                            final long firstFileID)
    {
-      super(fileFactory, journal, recordsSnapshot, firstFileID);
+      super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
    }
 
    /** This methods informs the Compactor about the existence of a pending (non committed) transaction */
@@ -531,7 +532,14 @@
       void execute() throws Exception
       {
          JournalRecord deleteRecord = journal.getRecords().remove(id);
-         deleteRecord.delete(usedFile);
+         if (deleteRecord == null)
+         {
+            log.warn("Can't find record " + id + " during compact replay");
+         }
+         else
+         {
+            deleteRecord.delete(usedFile);
+         }
       }
    }
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -24,16 +24,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,6 +94,10 @@
    private static final Logger log = Logger.getLogger(JournalImpl.class);
 
    private static final boolean trace = log.isTraceEnabled();
+ 
+   // This is useful at debug time...
+   // if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
+   private static final boolean TRACE_RECORDS = false;
 
    // This method exists just to make debug easier.
    // I could replace log.trace by log.info temporarily while I was debugging
@@ -107,6 +106,11 @@
    {
       log.trace(message);
    }
+   
+   private static final void traceRecord(final String message)
+   {
+      System.out.println(message);
+   }
 
    // The sizes of primitive types
 
@@ -167,12 +171,8 @@
 
    private volatile boolean autoReclaim = true;
 
-   private final AtomicLong nextFileID = new AtomicLong(0);
-
    private final int userVersion;
 
-   private final int maxAIO;
-
    private final int fileSize;
 
    private final int minFiles;
@@ -183,18 +183,9 @@
 
    private final SequentialFileFactory fileFactory;
 
-   public final String filePrefix;
 
-   public final String fileExtension;
-
-   private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
-
-   private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
-
-   private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
-
-   private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
-
+   private final FilesRepository filesRepository;
+   
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
 
@@ -300,13 +291,9 @@
       this.minFiles = minFiles;
 
       this.fileFactory = fileFactory;
+      
+      filesRepository = new FilesRepository(fileFactory, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
 
-      this.filePrefix = filePrefix;
-
-      this.fileExtension = fileExtension;
-
-      this.maxAIO = maxAIO;
-
       this.userVersion = userVersion;
    }
 
@@ -390,13 +377,13 @@
     *  It won't be part of the interface as the tools should be specific to the implementation */
    public List<JournalFile> orderFiles() throws Exception
    {
-      List<String> fileNames = fileFactory.listFiles(fileExtension);
+      List<String> fileNames = fileFactory.listFiles(filesRepository.getFileExtension());
 
       List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
 
       for (String fileName : fileNames)
       {
-         SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
+         SequentialFile file = fileFactory.createSequentialFile(fileName, filesRepository.getMaxAIO());
 
          file.open(1, false);
 
@@ -421,29 +408,6 @@
       return orderedFiles;
    }
 
-   private void calculateNextfileID(List<JournalFile> files)
-   {
-
-      for (JournalFile file : files)
-      {
-         long fileID = file.getFileID();
-         if (nextFileID.get() < fileID)
-         {
-            nextFileID.set(fileID);
-         }
-
-         long fileNameID = getFileNameID(file.getFile().getFileName());
-
-         // The compactor could create a fileName but use a previously assigned ID.
-         // Because of that we need to take both parts into account
-         if (nextFileID.get() < fileNameID)
-         {
-            nextFileID.set(fileNameID);
-         }
-      }
-
-   }
-
    /** this method is used internally only however tools may use it to maintenance.  */
    public static int readJournalFile(final SequentialFileFactory fileFactory,
                                      final JournalFile file,
@@ -856,6 +820,8 @@
          {
             JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
 
+            if (TRACE_RECORDS) traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+
             records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
          }
          finally
@@ -932,6 +898,8 @@
          {
             JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
 
+            if (TRACE_RECORDS) traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+
             // record== null here could only mean there is a compactor, and computing the delete should be done after
             // compacting is done
             if (jrnRecord == null)
@@ -1008,6 +976,8 @@
          {
             JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
 
+            if (TRACE_RECORDS) traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+
             // record== null here could only mean there is a compactor, and computing the delete should be done after
             // compacting is done
             if (record == null)
@@ -1060,6 +1030,8 @@
          {
             JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
 
+            if (TRACE_RECORDS) traceRecord("appendAddRecordTransactional:txID=" + txID + ",id=" + id + ", usedFile = " + usedFile);
+
             tx.addPositive(usedFile, id, addRecord.getEncodeSize());
          }
          finally
@@ -1104,6 +1076,8 @@
          {
             JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
 
+            if (TRACE_RECORDS) traceRecord("appendUpdateRecordTransactional::txID=" + txID + ",id="+id + ", usedFile = " + usedFile);
+
             tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
          }
          finally
@@ -1142,6 +1116,8 @@
          {
             JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
 
+            if (TRACE_RECORDS) traceRecord("appendDeleteRecordTransactional::txID=" + txID + ", id=" + id + ", usedFile = " + usedFile);
+
             tx.addNegative(usedFile, id);
          }
          finally
@@ -1230,6 +1206,8 @@
          try
          {
             JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
+            
+            if (TRACE_RECORDS) traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
 
             tx.prepare(usedFile);
          }
@@ -1248,7 +1226,7 @@
    public void appendCommitRecord(final long txID, final boolean sync) throws Exception
    {
       SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+      
       appendCommitRecord(txID, sync, syncCompletion);
 
       if (syncCompletion != null)
@@ -1303,6 +1281,8 @@
          try
          {
             JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+            
+            if (TRACE_RECORDS) traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
 
             tx.commit(usedFile);
          }
@@ -1514,7 +1494,7 @@
          throw new IllegalStateException("There is pending compacting operation");
       }
 
-      ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
+      ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(filesRepository.getDataFilesCount());
 
       boolean previousReclaimValue = autoReclaim;
 
@@ -1524,7 +1504,8 @@
          {
             JournalImpl.trace("Starting compacting operation on journal");
          }
-         JournalImpl.log.debug("Starting compacting operation on journal");
+         
+         if (TRACE_RECORDS) traceRecord("Starting compacting operation on journal");
 
          onCompactStart();
 
@@ -1543,22 +1524,22 @@
             setAutoReclaim(false);
 
             // We need to move to the next file, as we need a clear start for negatives and positives counts
-            moveNextFile(true);
+            moveNextFile(false);
 
+            filesRepository.drainClosedFiles();
+            
             // Take the snapshots and replace the structures
 
-            dataFilesToProcess.addAll(dataFiles);
+            dataFilesToProcess.addAll(filesRepository.getDataFiles());
 
-            dataFiles.clear();
+            filesRepository.clearDataFiles();
 
-            drainClosedFiles();
-
             if (dataFilesToProcess.size() == 0)
             {
                return;
             }
 
-            compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
+            compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
 
             for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
             {
@@ -1632,12 +1613,12 @@
                {
                   JournalImpl.trace("Adding file " + fileToAdd + " back as datafile");
                }
-               dataFiles.addFirst(fileToAdd);
+               filesRepository.addDataFileOnTop(fileToAdd);
             }
 
             if (trace)
             {
-               JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+               JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
             }
 
             // Replay pending commands (including updates, deletes and commits)
@@ -1683,6 +1664,8 @@
          {
             JournalImpl.log.debug("Finished compacting on journal");
          }
+         
+         if (TRACE_RECORDS) traceRecord("Finished compacting on journal");
 
       }
       finally
@@ -1753,21 +1736,15 @@
 
       records.clear();
 
-      dataFiles.clear();
+      filesRepository.clear();
 
-      pendingCloseFiles.clear();
-
-      freeFiles.clear();
-
-      openedFiles.clear();
-
       transactions.clear();
 
       final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
 
       final List<JournalFile> orderedFiles = orderFiles();
 
-      calculateNextfileID(orderedFiles);
+      filesRepository.calculateNextfileID(orderedFiles);
 
       int lastDataPos = JournalImpl.SIZE_HEADER;
 
@@ -2030,43 +2007,23 @@
          if (hasData.get())
          {
             lastDataPos = resultLastPost;
-            dataFiles.add(file);
+            filesRepository.addDataFileOnBottom(file);
          }
          else
          {
             // Empty dataFiles with no data
-            freeFiles.add(file);
+            filesRepository.addFreeFileNoInit(file);
          }
       }
 
       // Create any more files we need
 
-      // FIXME - size() involves a scan
-      int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+      filesRepository.ensureMinFiles();
 
-      if (filesToCreate > 0)
-      {
-         for (int i = 0; i < filesToCreate; i++)
-         {
-            // Keeping all files opened can be very costly (mainly on AIO)
-            freeFiles.add(createFile(false, false, true, false));
-         }
-      }
+      // The current file is the last one that has data
+      
+      currentFile = filesRepository.pollLastDataFile();
 
-      // The current file is the last one
-
-      Iterator<JournalFile> iter = dataFiles.iterator();
-
-      while (iter.hasNext())
-      {
-         currentFile = iter.next();
-
-         if (!iter.hasNext())
-         {
-            iter.remove();
-         }
-      }
-
       if (currentFile != null)
       {
          currentFile.getFile().open();
@@ -2075,14 +2032,14 @@
       }
       else
       {
-         currentFile = freeFiles.remove();
+         currentFile = filesRepository.getFreeFile();
 
-         openFile(currentFile, true);
+         filesRepository.openFile(currentFile, true);
       }
 
       fileFactory.activateBuffer(currentFile.getFile());
 
-      pushOpenedFile();
+      filesRepository.pushOpenedFile();
 
       for (TransactionHolder transaction : loadTransactions.values())
       {
@@ -2152,7 +2109,7 @@
       {
          reclaimer.scan(getDataFiles());
 
-         for (JournalFile file : dataFiles)
+         for (JournalFile file : filesRepository.getDataFiles())
          {
             if (file.isCanReclaim())
             {
@@ -2161,13 +2118,10 @@
                {
                   JournalImpl.trace("Reclaiming file " + file);
                }
+               
+               filesRepository.removeDataFile(file);
 
-               if (!dataFiles.remove(file))
-               {
-                  JournalImpl.log.warn("Could not remove file " + file);
-               }
-
-               addFreeFile(file, false);
+               filesRepository.addFreeFile(file, false);
             }
          }
       }
@@ -2179,9 +2133,6 @@
       return false;
    }
 
-   
-   int deleteme = 0;
-
    private boolean needsCompact() throws Exception
    {
       JournalFile[] dataFiles = getDataFiles();
@@ -2197,7 +2148,7 @@
 
       long compactMargin = (long)(totalBytes * compactPercentage);
       
-      boolean needCompact = (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+      boolean needCompact = (totalLiveSize < compactMargin && dataFiles.length > compactMinFiles);
 
       return needCompact;
 
@@ -2218,33 +2169,38 @@
 
       if (needsCompact())
       {
-         if (!compactorRunning.compareAndSet(false, true))
-         {
-            return;
-         }
+         scheduleCompact();
+      }
+   }
 
-         // We can't use the executor for the compacting... or we would dead lock because of file open and creation
-         // operations (that will use the executor)
-         compactorExecutor.execute(new Runnable()
+   private void scheduleCompact()
+   {
+      if (!compactorRunning.compareAndSet(false, true))
+      {
+         return;
+      }
+
+      // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+      // operations (that will use the executor)
+      compactorExecutor.execute(new Runnable()
+      {
+         public void run()
          {
-            public void run()
-            {
 
-               try
-               {
-                  JournalImpl.this.compact();
-               }
-               catch (Throwable e)
-               {
-                  JournalImpl.log.error(e.getMessage(), e);
-               }
-               finally
-               {
-                  compactorRunning.set(false);
-               }
+            try
+            {
+               JournalImpl.this.compact();
             }
-         });
-      }
+            catch (Throwable e)
+            {
+               JournalImpl.log.error(e.getMessage(), e);
+            }
+            finally
+            {
+               compactorRunning.set(false);
+            }
+         }
+      });
    }
 
    // TestableJournal implementation
@@ -2266,7 +2222,7 @@
 
       StringBuilder builder = new StringBuilder();
 
-      for (JournalFile file : dataFiles)
+      for (JournalFile file : filesRepository.getDataFiles())
       {
          builder.append("DataFile:" + file +
                         " posCounter = " +
@@ -2283,7 +2239,7 @@
          }
       }
 
-      for (JournalFile file : freeFiles)
+      for (JournalFile file : filesRepository.getFreeFiles())
       {
          builder.append("FreeFile:" + file + "\n");
       }
@@ -2302,8 +2258,6 @@
          builder.append("CurrentFile: No current file at this point!");
       }
 
-      builder.append("#Opened Files:" + openedFiles.size());
-
       return builder.toString();
    }
 
@@ -2339,22 +2293,22 @@
 
    public int getDataFilesCount()
    {
-      return dataFiles.size();
+      return filesRepository.getDataFilesCount();
    }
 
    public JournalFile[] getDataFiles()
    {
-      return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+      return filesRepository.getDataFilesArray();
    }
 
    public int getFreeFilesCount()
    {
-      return freeFiles.size();
+      return filesRepository.getFreeFilesCount();
    }
 
    public int getOpenedFilesCount()
    {
-      return openedFiles.size();
+      return filesRepository.getOpenedFilesCount();
    }
 
    public int getIDMapSize()
@@ -2374,17 +2328,17 @@
 
    public String getFilePrefix()
    {
-      return filePrefix;
+      return filesRepository.getFilePrefix();
    }
 
    public String getFileExtension()
    {
-      return fileExtension;
+      return filesRepository.getFileExtension();
    }
 
    public int getMaxAIO()
    {
-      return maxAIO;
+      return filesRepository.getMaxAIO();
    }
 
    public int getUserVersion()
@@ -2395,20 +2349,14 @@
    // In some tests we need to force the journal to move to a next file
    public void forceMoveNextFile() throws Exception
    {
-      forceMoveNextFile(true);
-   }
-
-   // In some tests we need to force the journal to move to a next file
-   public void forceMoveNextFile(final boolean synchronous) throws Exception
-   {
       compactingLock.readLock().lock();
       try
       {
          lockAppend.lock();
          try
          {
-            moveNextFile(synchronous);
-            if (autoReclaim && synchronous)
+            moveNextFile(false);
+            if (autoReclaim)
             {
                checkReclaimStatus();
             }
@@ -2462,6 +2410,8 @@
             return new Thread(r, "JournalImpl::CompactorExecutor");
          }
       });
+      
+      filesRepository.setExecutor(filesExecutor);
 
       fileFactory.start();
 
@@ -2493,6 +2443,8 @@
 
          filesExecutor.shutdown();
 
+         filesRepository.setExecutor(null);
+
          if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
          {
             JournalImpl.log.warn("Couldn't stop journal executor after 60 seconds");
@@ -2505,20 +2457,13 @@
             currentFile.getFile().close();
          }
 
-         for (JournalFile file : openedFiles)
-         {
-            file.getFile().close();
-         }
+         filesRepository.drainClosedFiles();
 
          fileFactory.stop();
 
          currentFile = null;
 
-         dataFiles.clear();
-
-         freeFiles.clear();
-
-         openedFiles.clear();
+         filesRepository.clear();
       }
       finally
       {
@@ -2578,7 +2523,7 @@
             {
                try
                {
-                  addFreeFile(file, false);
+                  filesRepository.addFreeFile(file, false);
                }
                catch (Throwable e)
                {
@@ -2606,7 +2551,7 @@
     * @param name
     * @return
     */
-   private String renameExtensionFile(String name, String extension)
+   protected static String renameExtensionFile(String name, String extension)
    {
       name = name.substring(0, name.lastIndexOf(extension));
       return name;
@@ -2632,63 +2577,6 @@
    // -----------------------------------------------------------------------------
 
    /**
-    * @param file
-    * @throws Exception
-    */
-   private void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
-   {
-      if (file.getFile().size() != this.getFileSize())
-      {
-         log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
-         file.getFile().delete();
-      }
-      else
-      // FIXME - size() involves a scan!!!
-      if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
-      {
-         // Re-initialise it
-
-         if (trace)
-         {
-            trace("Adding free file " + file);
-         }
-
-         JournalFile jf = reinitializeFile(file);
-
-         if (renameTmp)
-         {
-            jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
-         }
-
-         freeFiles.add(jf);
-      }
-      else
-      {
-         file.getFile().delete();
-      }
-   }
-
-   // Discard the old JournalFile and set it with a new ID
-   private JournalFile reinitializeFile(final JournalFile file) throws Exception
-   {
-      long newFileID = generateFileID();
-
-      SequentialFile sf = file.getFile();
-
-      sf.open(1, false);
-
-      int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
-
-      JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
-
-      sf.position(position);
-
-      sf.close();
-
-      return jf;
-   }
-
-   /**
     * <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
     * <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
     *     
@@ -2890,7 +2778,7 @@
 
       if (!currentFile.getFile().fits(size))
       {
-         moveNextFile(false);
+         moveNextFile(true);
 
          // The same check needs to be done at the new file also
          if (!currentFile.getFile().fits(size))
@@ -2904,7 +2792,7 @@
       {
          throw new NullPointerException("Current file = null");
       }
-
+      
       if (tx != null)
       {
          // The callback of a transaction has to be taken inside the lock,
@@ -2960,196 +2848,26 @@
       return currentFile;
    }
 
-   /** Get the ID part of the name */
-   private long getFileNameID(final String fileName)
+   // You need to guarantee lock.acquire() before calling this method
+   private void moveNextFile(final boolean scheduleReclaim) throws InterruptedException
    {
-      try
-      {
-         return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
-      }
-      catch (Throwable e)
-      {
-         JournalImpl.log.warn("Impossible to get the ID part of the file name " + fileName, e);
-         return 0;
-      }
-   }
+      filesRepository.closeFile(currentFile);
 
-   /**
-    * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
-    * @param keepOpened
-    * @return
-    * @throws Exception
-    */
-   private JournalFile createFile(final boolean keepOpened,
-                                  final boolean multiAIO,
-                                  final boolean init,
-                                  final boolean tmpCompact) throws Exception
-   {
-      long fileID = generateFileID();
-
-      String fileName;
-
-      fileName = createFileName(tmpCompact, fileID);
-
-      if (JournalImpl.trace)
+      currentFile = filesRepository.openFile();
+      
+      if (scheduleReclaim)
       {
-         JournalImpl.trace("Creating file " + fileName);
+         scheduleReclaim();
       }
 
-      String tmpFileName = fileName + ".tmp";
-
-      SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
-
-      sequentialFile.open(1, false);
-
-      if (init)
-      {
-         sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
-
-         initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
-      }
-
-      long position = sequentialFile.position();
-
-      sequentialFile.close();
-
       if (JournalImpl.trace)
       {
-         JournalImpl.trace("Renaming file " + tmpFileName + " as " + fileName);
+         JournalImpl.trace("moveNextFile: " + currentFile);
       }
 
-      sequentialFile.renameTo(fileName);
-
-      if (keepOpened)
-      {
-         if (multiAIO)
-         {
-            sequentialFile.open();
-         }
-         else
-         {
-            sequentialFile.open(1, false);
-         }
-         sequentialFile.position(position);
-      }
-
-      return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
-   }
-
-   /**
-    * @param tmpCompact
-    * @param fileID
-    * @return
-    */
-   private String createFileName(final boolean tmpCompact, long fileID)
-   {
-      String fileName;
-      if (tmpCompact)
-      {
-         fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
-      }
-      else
-      {
-         fileName = filePrefix + "-" + fileID + "." + fileExtension;
-      }
-      return fileName;
-   }
-
-   private void openFile(final JournalFile file, final boolean multiAIO) throws Exception
-   {
-      if (multiAIO)
-      {
-         file.getFile().open();
-      }
-      else
-      {
-         file.getFile().open(1, false);
-      }
-
-      file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
-   }
-
-   private long generateFileID()
-   {
-      return nextFileID.incrementAndGet();
-   }
-
-   // You need to guarantee lock.acquire() before calling this method
-   private void moveNextFile(final boolean synchronous) throws InterruptedException
-   {
-      closeFile(currentFile, synchronous);
-
-      currentFile = enqueueOpenFile(synchronous);
-
-      if (JournalImpl.trace)
-      {
-         JournalImpl.trace("moveNextFile: " + currentFile + " sync: " + synchronous);
-      }
-
       fileFactory.activateBuffer(currentFile.getFile());
    }
 
-   /** 
-    * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
-    * <p>In case there are no cached opened files, this method will block until the file was opened,
-    * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p> 
-    * */
-   private JournalFile enqueueOpenFile(final boolean synchronous) throws InterruptedException
-   {
-      if (JournalImpl.trace)
-      {
-         JournalImpl.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
-      }
-
-      Runnable run = new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               pushOpenedFile();
-            }
-            catch (Exception e)
-            {
-               JournalImpl.log.error(e.getMessage(), e);
-            }
-         }
-      };
-
-      if (synchronous)
-      {
-         run.run();
-      }
-      else
-      {
-         filesExecutor.execute(run);
-      }
-
-      if (!synchronous)
-      {
-         scheduleReclaim();
-      }
-
-      JournalFile nextFile = null;
-
-      while (nextFile == null)
-      {
-         nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
-         if (nextFile == null)
-         {
-            JournalImpl.log.warn("Couldn't open a file in 60 Seconds",
-                                 new Exception("Warning: Couldn't open a file in 60 Seconds"));
-         }
-      }
-
-      if (trace)
-      {
-         JournalImpl.trace("Returning file " + nextFile);
-      }
-
-      return nextFile;
-   }
-
    private void scheduleReclaim()
    {
       if (state != JournalImpl.STATE_LOADED)
@@ -3165,7 +2883,7 @@
             {
                try
                {
-                  drainClosedFiles();
+                  filesRepository.drainClosedFiles();
                   if (!checkReclaimStatus())
                   {
                      checkCompact();
@@ -3180,97 +2898,7 @@
       }
    }
 
-   /** 
-    * 
-    * Open a file and place it into the openedFiles queue
-    * */
-   private void pushOpenedFile() throws Exception
-   {
-      JournalFile nextOpenedFile = getFile(true, true, true, false);
 
-      if (trace)
-      {
-         JournalImpl.trace("pushing openFile " + nextOpenedFile);
-      }
-
-      openedFiles.offer(nextOpenedFile);
-   }
-
-   /**
-    * @return
-    * @throws Exception
-    */
-   JournalFile getFile(final boolean keepOpened,
-                       final boolean multiAIO,
-                       final boolean initFile,
-                       final boolean tmpCompactExtension) throws Exception
-   {
-      JournalFile nextOpenedFile = null;
-
-      nextOpenedFile = freeFiles.poll();
-
-      if (nextOpenedFile == null)
-      {
-         nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
-      }
-      else
-      {
-         if (tmpCompactExtension)
-         {
-            SequentialFile sequentialFile = nextOpenedFile.getFile();
-            sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
-         }
-
-         if (keepOpened)
-         {
-            openFile(nextOpenedFile, multiAIO);
-         }
-      }
-      return nextOpenedFile;
-   }
-
-   private void closeFile(final JournalFile file, final boolean synchronous)
-   {
-      fileFactory.deactivateBuffer();
-      pendingCloseFiles.add(file);
-      dataFiles.add(file);
-
-      Runnable run = new Runnable()
-      {
-         public void run()
-         {
-            drainClosedFiles();
-         }
-      };
-
-      if (synchronous)
-      {
-         run.run();
-      }
-      else
-      {
-         filesExecutor.execute(run);
-      }
-
-   }
-
-   private void drainClosedFiles()
-   {
-      JournalFile file;
-      try
-      {
-         while ((file = pendingCloseFiles.poll()) != null)
-         {
-            file.getFile().close();
-         }
-      }
-      catch (Exception e)
-      {
-         JournalImpl.log.warn(e.getMessage(), e);
-      }
-
-   }
-
    private JournalTransaction getTransactionInfo(final long txID)
    {
       JournalTransaction tx = transactions.get(txID);

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java	2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -40,7 +40,7 @@
 {
    private final boolean isCommit;
 
-   private final long txID;
+   public final long txID;
 
    private final EncodingSupport transactionData;
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java	2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -25,7 +25,7 @@
  */
 public class JournalRollbackRecordTX extends JournalInternalRecord
 {
-   private final long txID;
+   public final long txID;
 
    public JournalRollbackRecordTX(final long txID)
    {

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -28,6 +28,7 @@
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.AbstractJournalUpdateTask;
+import org.hornetq.core.journal.impl.ExportJournal;
 import org.hornetq.core.journal.impl.JournalCompactor;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.journal.impl.JournalFileImpl;
@@ -38,7 +39,6 @@
 import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
 import org.hornetq.utils.IDGenerator;
 import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.TimeAndCounterIDGenerator;
 
 /**
  * 
@@ -53,7 +53,7 @@
 
    private static final int NUMBER_OF_RECORDS = 1000;
 
-   IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+   IDGenerator idGenerator = new SimpleIDGenerator(100000);
 
    // General tests
    // =============
@@ -277,13 +277,13 @@
       journal.forceMoveNextFile();
 
       addTx(1, 5, 6, 7, 8);
-      
+
       commit(1);
-      
+
       journal.forceMoveNextFile();
-      
+
       journal.compact();
-      
+
       add(10);
 
       stopJournal();
@@ -806,7 +806,7 @@
       SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
 
       createJournal();
-      
+
       startJournal();
       load();
 
@@ -931,7 +931,7 @@
       }
 
       startCompact();
- 
+
       // Delete part of the live records while cleanup still working
       for (int i = 1; i < 5; i++)
       {
@@ -939,7 +939,7 @@
       }
 
       finishCompact();
- 
+
       // Delete part of the live records after cleanup is done
       for (int i = 5; i < 10; i++)
       {
@@ -963,7 +963,7 @@
       setup(2, 60 * 1024, false);
 
       SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-      
+
       createJournal();
 
       startJournal();
@@ -978,7 +978,7 @@
       addTx(appendTX, appendOne);
 
       startCompact();
-      
+
       addTx(appendTX, appendTwo);
 
       commit(appendTX);
@@ -1161,7 +1161,7 @@
       }
 
    }
-   
+
    public void testCompactFirstFileWithPendingCommits() throws Exception
    {
       setup(2, 60 * 1024, true);
@@ -1175,10 +1175,165 @@
       {
          addTx(tx, idGenerator.generateID());
       }
-      
+
       journal.forceMoveNextFile();
+
+      ArrayList<Long> listToDelete = new ArrayList<Long>();
+      for (int i = 0; i < 10; i++)
+      {
+         if (i == 5)
+         {
+            commit(tx);
+         }
+         long id = idGenerator.generateID();
+         listToDelete.add(id);
+         add(id);
+      }
+
+      journal.forceMoveNextFile();
+
+      for (Long id : listToDelete)
+      {
+         delete(id);
+      }
+
+      journal.forceMoveNextFile();
+
+      // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+      journal.compact();
+
+      journal.checkReclaimStatus();
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   public void testCompactFirstFileWithPendingCommits3() throws Exception
+   {
+      setup(2, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      long tx = idGenerator.generateID();
+      for (int i = 0; i < 10; i++)
+      {
+         addTx(tx, idGenerator.generateID());
+      }
+
+      journal.forceMoveNextFile();
+
+      ArrayList<Long> listToDelete = new ArrayList<Long>();
+      for (int i = 0; i < 10; i++)
+      {
+         long id = idGenerator.generateID();
+         listToDelete.add(id);
+         add(id);
+      }
+
+      journal.forceMoveNextFile();
+
+      for (Long id : listToDelete)
+      {
+         delete(id);
+      }
+
+      journal.forceMoveNextFile();
+
+      ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out1.dmp");
+
+      ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out2.dmp");
+
+      rollback(tx);
+
+      ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out3.dmp");
+
+      journal.forceMoveNextFile();
+      journal.checkReclaimStatus();
+
+      ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out4.dmp");
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   public void testCompactFirstFileWithPendingCommits2() throws Exception
+   {
+      setup(2, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      long tx = idGenerator.generateID();
+      for (int i = 0; i < 10; i++)
+      {
+         addTx(tx, idGenerator.generateID());
+      }
+
+      journal.forceMoveNextFile();
+
+      ArrayList<Long> listToDelete = new ArrayList<Long>();
+      for (int i = 0; i < 10; i++)
+      {
+         long id = idGenerator.generateID();
+         listToDelete.add(id);
+         add(id);
+      }
+
+      journal.forceMoveNextFile();
+
+      for (Long id : listToDelete)
+      {
+         delete(id);
+      }
+
+      journal.forceMoveNextFile();
+
+      startCompact();
+      System.out.println("Committing TX " + tx);
       commit(tx);
+      finishCompact();
+
+      journal.checkReclaimStatus();
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   public void testCompactFirstFileWithPendingCommits4() throws Exception
+   {
+      setup(2, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      long ids[] = new long[10];
+
+      long tx0 = idGenerator.generateID();
+      for (int i = 0; i < 10; i++)
+      {
+         ids[i] = idGenerator.generateID();
+         addTx(tx0, ids[i]);
+      }
+
+      long tx1 = idGenerator.generateID();
       
+      journal.forceMoveNextFile();
 
       ArrayList<Long> listToDelete = new ArrayList<Long>();
       for (int i = 0; i < 10; i++)
@@ -1187,21 +1342,157 @@
          listToDelete.add(id);
          add(id);
       }
-      
+
       journal.forceMoveNextFile();
 
       for (Long id : listToDelete)
       {
          delete(id);
       }
+
+      journal.forceMoveNextFile();
+
+      startCompact();
+      System.out.println("Committing TX " + tx1);
+      rollback(tx0);
+      for (int i = 0 ; i < 10; i++)
+      {
+         addTx(tx1, ids[i]);
+      }
+
+      journal.forceMoveNextFile();
+      commit(tx1);
+      finishCompact();
+
+      journal.checkReclaimStatus();
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   public void testCompactFirstFileWithPendingCommits5() throws Exception
+   {
+      setup(2, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      long ids[] = new long[10];
+
+      long tx0 = idGenerator.generateID();
+      for (int i = 0; i < 10; i++)
+      {
+         ids[i] = idGenerator.generateID();
+         addTx(tx0, ids[i]);
+      }
+
+      long tx1 = idGenerator.generateID();
       
       journal.forceMoveNextFile();
-      
-      // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+
+      ArrayList<Long> listToDelete = new ArrayList<Long>();
+      for (int i = 0; i < 10; i++)
+      {
+         long id = idGenerator.generateID();
+         listToDelete.add(id);
+         add(id);
+      }
+
+      journal.forceMoveNextFile();
+
+      for (Long id : listToDelete)
+      {
+         delete(id);
+      }
+
+      journal.forceMoveNextFile();
+
+      startCompact();
+      System.out.println("Committing TX " + tx1);
+      rollback(tx0);
+      for (int i = 0 ; i < 10; i++)
+      {
+         addTx(tx1, ids[i]);
+      }
+
+      journal.forceMoveNextFile();
+      commit(tx1);
+      finishCompact();
+
+      journal.checkReclaimStatus();
+
       journal.compact();
 
-      journal.checkReclaimStatus();
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   public void testCompactFirstFileWithPendingCommits6() throws Exception
+   {
+      setup(2, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      long ids[] = new long[10];
+
+      long tx0 = idGenerator.generateID();
+      for (int i = 0; i < 10; i++)
+      {
+         ids[i] = idGenerator.generateID();
+         addTx(tx0, ids[i]);
+      }
       
+      commit(tx0);
+
+      startCompact();
+      for (int i = 0 ; i < 10; i++)
+      {
+         delete(ids[i]);
+      }
+      finishCompact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   public void testCompactFirstFileWithPendingCommits7() throws Exception
+   {
+      setup(2, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+      long tx0 = idGenerator.generateID();
+      add(idGenerator.generateID());
+
+      long ids[] = new long[]{idGenerator.generateID(), idGenerator.generateID()};
+
+      addTx(tx0, ids[0]);
+      addTx(tx0, ids[1]);
+      
+      journal.forceMoveNextFile();
+      
+      commit(tx0);
+      
+      journal.forceMoveNextFile();
+      
+      delete(ids[0]);
+      delete(ids[1]);
+      
+      journal.forceMoveNextFile();
+      
       journal.compact();
 
       stopJournal();

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-26 21:11:42 UTC (rev 9599)
@@ -119,7 +119,7 @@
       }
       else
       {
-         factory = new NIOSequentialFileFactory(dir.getPath());
+         factory = new NIOSequentialFileFactory(dir.getPath(), true);
          maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
       }
 
@@ -134,9 +134,6 @@
       {
          protected void onCompactLock() throws Exception
          {
-            // System.out.println("OnCompactLock");
-            journal.forceMoveNextFile(false);
-            // System.out.println("OnCompactLock done");
          }
 
          protected void onCompactStart() throws Exception
@@ -152,7 +149,7 @@
                      {
                         long id = idGen.generateID();
                         journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
-                        journal.forceMoveNextFile(false);
+                        journal.forceMoveNextFile();
                         journal.appendDeleteRecord(id, id == 20);
                      }
                      // System.out.println("OnCompactStart leave");



More information about the hornetq-commits mailing list