[hornetq-commits] JBoss hornetq SVN: r9603 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 27 11:56:15 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-27 11:56:15 -0400 (Fri, 27 Aug 2010)
New Revision: 9603

Added:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
Removed:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
Modified:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Renaming FilesRepository class & cleanup

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-08-27 15:56:15 UTC (rev 9603)
@@ -55,7 +55,7 @@
 
    protected SequentialFile sequentialFile;
    
-   protected final FilesRepository filesRepository;
+   protected final JournalFilesRepository filesRepository;
 
    protected long nextOrderingID;
 
@@ -71,7 +71,7 @@
 
    protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
                                        final JournalImpl journal,
-                                       final FilesRepository filesRepository,
+                                       final JournalFilesRepository filesRepository,
                                        final Set<Long> recordsSnapshot,
                                        final long nextOrderingID)
    {

Deleted: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java	2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java	2010-08-27 15:56:15 UTC (rev 9603)
@@ -1,609 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-
-/**
- * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
- * Guaranteeing that they will be delivered in order to the Journal
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class FilesRepository
-{
-
-   private static final Logger log = Logger.getLogger(FilesRepository.class);
-
-   private static final boolean trace = FilesRepository.log.isTraceEnabled();
-
-   // This method exists just to make debug easier.
-   // I could replace log.trace by log.info temporarily while I was debugging
-   // Journal
-   private static final void trace(final String message)
-   {
-      FilesRepository.log.trace(message);
-   }
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private final SequentialFileFactory fileFactory;
-
-   private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
-
-   private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
-
-   private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
-
-   private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
-
-   private final AtomicLong nextFileID = new AtomicLong(0);
-
-   private final int maxAIO;
-
-   private final int minFiles;
-
-   private final int fileSize;
-
-   private final String filePrefix;
-
-   private final String fileExtension;
-
-   private final int userVersion;
-
-   private Executor filesExecutor;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public FilesRepository(final SequentialFileFactory fileFactory,
-                          final String filePrefix,
-                          final String fileExtension,
-                          final int userVersion,
-                          final int maxAIO,
-                          final int fileSize,
-                          final int minFiles)
-   {
-      this.fileFactory = fileFactory;
-      this.maxAIO = maxAIO;
-      this.filePrefix = filePrefix;
-      this.fileExtension = fileExtension;
-      this.minFiles = minFiles;
-      this.fileSize = fileSize;
-      this.userVersion = userVersion;
-   }
-
-   // Public --------------------------------------------------------
-
-   public void setExecutor(final Executor executor)
-   {
-      filesExecutor = executor;
-   }
-
-   public void clear()
-   {
-      dataFiles.clear();
-
-      drainClosedFiles();
-
-      freeFiles.clear();
-
-      for (JournalFile file : openedFiles)
-      {
-         try
-         {
-            file.getFile().close();
-         }
-         catch (Exception e)
-         {
-            FilesRepository.log.warn(e.getMessage(), e);
-         }
-      }
-      openedFiles.clear();
-   }
-
-   public int getMaxAIO()
-   {
-      return maxAIO;
-   }
-
-   public String getFileExtension()
-   {
-      return fileExtension;
-   }
-
-   public String getFilePrefix()
-   {
-      return filePrefix;
-   }
-
-   public void calculateNextfileID(final List<JournalFile> files)
-   {
-
-      for (JournalFile file : files)
-      {
-         long fileID = file.getFileID();
-         if (nextFileID.get() < fileID)
-         {
-            nextFileID.set(fileID);
-         }
-
-         long fileNameID = getFileNameID(file.getFile().getFileName());
-
-         // The compactor could create a fileName but use a previously assigned ID.
-         // Because of that we need to take both parts into account
-         if (nextFileID.get() < fileNameID)
-         {
-            nextFileID.set(fileNameID);
-         }
-      }
-
-   }
-
-   public void ensureMinFiles() throws Exception
-   {
-      // FIXME - size() involves a scan
-      int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
-
-      if (filesToCreate > 0)
-      {
-         for (int i = 0; i < filesToCreate; i++)
-         {
-            // Keeping all files opened can be very costly (mainly on AIO)
-            freeFiles.add(createFile(false, false, true, false));
-         }
-      }
-
-   }
-
-   public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
-   {
-      if (multiAIO)
-      {
-         file.getFile().open();
-      }
-      else
-      {
-         file.getFile().open(1, false);
-      }
-
-      file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
-   }
-
-   // Data File Operations ==========================================
-
-   public JournalFile[] getDataFilesArray()
-   {
-      return dataFiles.toArray(new JournalFile[dataFiles.size()]);
-   }
-
-   public JournalFile pollLastDataFile()
-   {
-      /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
-
-      JournalFile currentFile = null;
-      while (iter.hasNext())
-      {
-         currentFile = iter.next();
-
-         if (!iter.hasNext())
-         {
-            iter.remove();
-         }
-      } 
-      
-      return currentFile; */
-
-      return dataFiles.pollLast();
-   }
-
-   public void removeDataFile(final JournalFile file)
-   {
-      if (!dataFiles.remove(file))
-      {
-         FilesRepository.log.warn("Could not remove file " + file + " from the list of data files");
-      }
-   }
-
-   public int getDataFilesCount()
-   {
-      return dataFiles.size();
-   }
-
-   public Collection<JournalFile> getDataFiles()
-   {
-      return dataFiles;
-   }
-
-   public void clearDataFiles()
-   {
-      dataFiles.clear();
-   }
-
-   public void addDataFileOnTop(final JournalFile file)
-   {
-      dataFiles.addFirst(file);
-   }
-
-   public void addDataFileOnBottom(final JournalFile file)
-   {
-      dataFiles.add(file);
-   }
-
-   // Free File Operations ==========================================
-
-   public int getFreeFilesCount()
-   {
-      return freeFiles.size();
-   }
-
-   /**
-    * Add directly to the freeFiles structure without reinitializing the file.
-    * used on load() only
-    */
-   public void addFreeFileNoInit(final JournalFile file)
-   {
-      freeFiles.add(file);
-   }
-
-   /**
-    * @param file
-    * @throws Exception
-    */
-   public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
-   {
-      if (file.getFile().size() != fileSize)
-      {
-         FilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
-                                  new Exception("trace"));
-         file.getFile().delete();
-      }
-      else
-      // FIXME - size() involves a scan!!!
-      if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
-      {
-         // Re-initialise it
-
-         if (FilesRepository.trace)
-         {
-            FilesRepository.trace("Adding free file " + file);
-         }
-
-         JournalFile jf = reinitializeFile(file);
-
-         if (renameTmp)
-         {
-            jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
-         }
-
-         freeFiles.add(jf);
-      }
-      else
-      {
-         file.getFile().delete();
-      }
-   }
-
-   public Collection<JournalFile> getFreeFiles()
-   {
-      return freeFiles;
-   }
-
-   public JournalFile getFreeFile()
-   {
-      return freeFiles.remove();
-   }
-
-   // Opened files operations =======================================
-
-   public int getOpenedFilesCount()
-   {
-      return openedFiles.size();
-   }
-
-   public void drainClosedFiles()
-   {
-      JournalFile file;
-      try
-      {
-         while ((file = pendingCloseFiles.poll()) != null)
-         {
-            file.getFile().close();
-         }
-      }
-      catch (Exception e)
-      {
-         FilesRepository.log.warn(e.getMessage(), e);
-      }
-
-   }
-
-   /** 
-    * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
-    * <p>In case there are no cached opened files, this method will block until the file was opened,
-    * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p> 
-    * */
-   public JournalFile openFile() throws InterruptedException
-   {
-      if (FilesRepository.trace)
-      {
-         FilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
-      }
-
-      Runnable run = new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               pushOpenedFile();
-            }
-            catch (Exception e)
-            {
-               FilesRepository.log.error(e.getMessage(), e);
-            }
-         }
-      };
-
-      if (filesExecutor == null)
-      {
-         run.run();
-      }
-      else
-      {
-         filesExecutor.execute(run);
-      }
-
-      JournalFile nextFile = null;
-
-      while (nextFile == null)
-      {
-         nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
-         if (nextFile == null)
-         {
-            FilesRepository.log.warn("Couldn't open a file in 60 Seconds",
-                                     new Exception("Warning: Couldn't open a file in 60 Seconds"));
-         }
-      }
-
-      if (FilesRepository.trace)
-      {
-         FilesRepository.trace("Returning file " + nextFile);
-      }
-
-      return nextFile;
-   }
-
-   /** 
-    * 
-    * Open a file and place it into the openedFiles queue
-    * */
-   public void pushOpenedFile() throws Exception
-   {
-      JournalFile nextOpenedFile = takeFile(true, true, true, false);
-
-      if (FilesRepository.trace)
-      {
-         FilesRepository.trace("pushing openFile " + nextOpenedFile);
-      }
-
-      openedFiles.offer(nextOpenedFile);
-   }
-
-   public void closeFile(final JournalFile file)
-   {
-      fileFactory.deactivateBuffer();
-      pendingCloseFiles.add(file);
-      dataFiles.add(file);
-
-      Runnable run = new Runnable()
-      {
-         public void run()
-         {
-            drainClosedFiles();
-         }
-      };
-
-      if (filesExecutor == null)
-      {
-         run.run();
-      }
-      else
-      {
-         filesExecutor.execute(run);
-      }
-
-   }
-
-   /**
-    * This will get a File from freeFile without initializing it
-    * @return
-    * @throws Exception
-    */
-   public JournalFile takeFile(final boolean keepOpened,
-                               final boolean multiAIO,
-                               final boolean initFile,
-                               final boolean tmpCompactExtension) throws Exception
-   {
-      JournalFile nextOpenedFile = null;
-
-      nextOpenedFile = freeFiles.poll();
-
-      if (nextOpenedFile == null)
-      {
-         nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
-      }
-      else
-      {
-         if (tmpCompactExtension)
-         {
-            SequentialFile sequentialFile = nextOpenedFile.getFile();
-            sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
-         }
-
-         if (keepOpened)
-         {
-            openFile(nextOpenedFile, multiAIO);
-         }
-      }
-      return nextOpenedFile;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   /**
-    * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
-    * @param keepOpened
-    * @return
-    * @throws Exception
-    */
-   private JournalFile createFile(final boolean keepOpened,
-                                  final boolean multiAIO,
-                                  final boolean init,
-                                  final boolean tmpCompact) throws Exception
-   {
-      long fileID = generateFileID();
-
-      String fileName;
-
-      fileName = createFileName(tmpCompact, fileID);
-
-      if (FilesRepository.trace)
-      {
-         FilesRepository.trace("Creating file " + fileName);
-      }
-
-      String tmpFileName = fileName + ".tmp";
-
-      SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
-
-      sequentialFile.open(1, false);
-
-      if (init)
-      {
-         sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
-
-         JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
-      }
-
-      long position = sequentialFile.position();
-
-      sequentialFile.close();
-
-      if (FilesRepository.trace)
-      {
-         FilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
-      }
-
-      sequentialFile.renameTo(fileName);
-
-      if (keepOpened)
-      {
-         if (multiAIO)
-         {
-            sequentialFile.open();
-         }
-         else
-         {
-            sequentialFile.open(1, false);
-         }
-         sequentialFile.position(position);
-      }
-
-      return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
-   }
-
-   /**
-    * @param tmpCompact
-    * @param fileID
-    * @return
-    */
-   private String createFileName(final boolean tmpCompact, final long fileID)
-   {
-      String fileName;
-      if (tmpCompact)
-      {
-         fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
-      }
-      else
-      {
-         fileName = filePrefix + "-" + fileID + "." + fileExtension;
-      }
-      return fileName;
-   }
-
-   private long generateFileID()
-   {
-      return nextFileID.incrementAndGet();
-   }
-
-   /** Get the ID part of the name */
-   private long getFileNameID(final String fileName)
-   {
-      try
-      {
-         return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
-      }
-      catch (Throwable e)
-      {
-         FilesRepository.log.warn("Impossible to get the ID part of the file name " + fileName, e);
-         return 0;
-      }
-   }
-
-   // Discard the old JournalFile and set it with a new ID
-   private JournalFile reinitializeFile(final JournalFile file) throws Exception
-   {
-      long newFileID = generateFileID();
-
-      SequentialFile sf = file.getFile();
-
-      sf.open(1, false);
-
-      int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
-
-      JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
-
-      sf.position(position);
-
-      sf.close();
-
-      return jf;
-   }
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2010-08-27 15:56:15 UTC (rev 9603)
@@ -144,7 +144,7 @@
 
    public JournalCompactor(final SequentialFileFactory fileFactory,
                            final JournalImpl journal,
-                           final FilesRepository filesRepository,
+                           final JournalFilesRepository filesRepository,
                            final Set<Long> recordsSnapshot,
                            final long firstFileID)
    {

Copied: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java (from rev 9602, branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java)
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	                        (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2010-08-27 15:56:15 UTC (rev 9603)
@@ -0,0 +1,609 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalFilesRepository
+{
+
+   private static final Logger log = Logger.getLogger(JournalFilesRepository.class);
+
+   private static final boolean trace = JournalFilesRepository.log.isTraceEnabled();
+
+   // This method exists just to make debug easier.
+   // I could replace log.trace by log.info temporarily while I was debugging
+   // Journal
+   private static final void trace(final String message)
+   {
+      JournalFilesRepository.log.trace(message);
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final SequentialFileFactory fileFactory;
+
+   private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+
+   private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+
+   private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+   private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+
+   private final AtomicLong nextFileID = new AtomicLong(0);
+
+   private final int maxAIO;
+
+   private final int minFiles;
+
+   private final int fileSize;
+
+   private final String filePrefix;
+
+   private final String fileExtension;
+
+   private final int userVersion;
+
+   private Executor filesExecutor;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public JournalFilesRepository(final SequentialFileFactory fileFactory,
+                          final String filePrefix,
+                          final String fileExtension,
+                          final int userVersion,
+                          final int maxAIO,
+                          final int fileSize,
+                          final int minFiles)
+   {
+      this.fileFactory = fileFactory;
+      this.maxAIO = maxAIO;
+      this.filePrefix = filePrefix;
+      this.fileExtension = fileExtension;
+      this.minFiles = minFiles;
+      this.fileSize = fileSize;
+      this.userVersion = userVersion;
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setExecutor(final Executor executor)
+   {
+      filesExecutor = executor;
+   }
+
+   public void clear()
+   {
+      dataFiles.clear();
+
+      drainClosedFiles();
+
+      freeFiles.clear();
+
+      for (JournalFile file : openedFiles)
+      {
+         try
+         {
+            file.getFile().close();
+         }
+         catch (Exception e)
+         {
+            JournalFilesRepository.log.warn(e.getMessage(), e);
+         }
+      }
+      openedFiles.clear();
+   }
+
+   public int getMaxAIO()
+   {
+      return maxAIO;
+   }
+
+   public String getFileExtension()
+   {
+      return fileExtension;
+   }
+
+   public String getFilePrefix()
+   {
+      return filePrefix;
+   }
+
+   public void calculateNextfileID(final List<JournalFile> files)
+   {
+
+      for (JournalFile file : files)
+      {
+         long fileID = file.getFileID();
+         if (nextFileID.get() < fileID)
+         {
+            nextFileID.set(fileID);
+         }
+
+         long fileNameID = getFileNameID(file.getFile().getFileName());
+
+         // The compactor could create a fileName but use a previously assigned ID.
+         // Because of that we need to take both parts into account
+         if (nextFileID.get() < fileNameID)
+         {
+            nextFileID.set(fileNameID);
+         }
+      }
+
+   }
+
+   public void ensureMinFiles() throws Exception
+   {
+      // FIXME - size() involves a scan
+      int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+
+      if (filesToCreate > 0)
+      {
+         for (int i = 0; i < filesToCreate; i++)
+         {
+            // Keeping all files opened can be very costly (mainly on AIO)
+            freeFiles.add(createFile(false, false, true, false));
+         }
+      }
+
+   }
+
+   public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+   {
+      if (multiAIO)
+      {
+         file.getFile().open();
+      }
+      else
+      {
+         file.getFile().open(1, false);
+      }
+
+      file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+   }
+
+   // Data File Operations ==========================================
+
+   public JournalFile[] getDataFilesArray()
+   {
+      return dataFiles.toArray(new JournalFile[dataFiles.size()]);
+   }
+
+   public JournalFile pollLastDataFile()
+   {
+      /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
+
+      JournalFile currentFile = null;
+      while (iter.hasNext())
+      {
+         currentFile = iter.next();
+
+         if (!iter.hasNext())
+         {
+            iter.remove();
+         }
+      } 
+      
+      return currentFile; */
+
+      return dataFiles.pollLast();
+   }
+
+   public void removeDataFile(final JournalFile file)
+   {
+      if (!dataFiles.remove(file))
+      {
+         JournalFilesRepository.log.warn("Could not remove file " + file + " from the list of data files");
+      }
+   }
+
+   public int getDataFilesCount()
+   {
+      return dataFiles.size();
+   }
+
+   public Collection<JournalFile> getDataFiles()
+   {
+      return dataFiles;
+   }
+
+   public void clearDataFiles()
+   {
+      dataFiles.clear();
+   }
+
+   public void addDataFileOnTop(final JournalFile file)
+   {
+      dataFiles.addFirst(file);
+   }
+
+   public void addDataFileOnBottom(final JournalFile file)
+   {
+      dataFiles.add(file);
+   }
+
+   // Free File Operations ==========================================
+
+   public int getFreeFilesCount()
+   {
+      return freeFiles.size();
+   }
+
+   /**
+    * Add directly to the freeFiles structure without reinitializing the file.
+    * used on load() only
+    */
+   public void addFreeFileNoInit(final JournalFile file)
+   {
+      freeFiles.add(file);
+   }
+
+   /**
+    * @param file
+    * @throws Exception
+    */
+   public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+   {
+      if (file.getFile().size() != fileSize)
+      {
+         JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
+                                  new Exception("trace"));
+         file.getFile().delete();
+      }
+      else
+      // FIXME - size() involves a scan!!!
+      if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+      {
+         // Re-initialise it
+
+         if (JournalFilesRepository.trace)
+         {
+            JournalFilesRepository.trace("Adding free file " + file);
+         }
+
+         JournalFile jf = reinitializeFile(file);
+
+         if (renameTmp)
+         {
+            jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+         }
+
+         freeFiles.add(jf);
+      }
+      else
+      {
+         file.getFile().delete();
+      }
+   }
+
+   public Collection<JournalFile> getFreeFiles()
+   {
+      return freeFiles;
+   }
+
+   public JournalFile getFreeFile()
+   {
+      return freeFiles.remove();
+   }
+
+   // Opened files operations =======================================
+
+   public int getOpenedFilesCount()
+   {
+      return openedFiles.size();
+   }
+
+   public void drainClosedFiles()
+   {
+      JournalFile file;
+      try
+      {
+         while ((file = pendingCloseFiles.poll()) != null)
+         {
+            file.getFile().close();
+         }
+      }
+      catch (Exception e)
+      {
+         JournalFilesRepository.log.warn(e.getMessage(), e);
+      }
+
+   }
+
+   /** 
+    * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
+    * <p>In case there are no cached opened files, this method will block until the file was opened,
+    * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p> 
+    * */
+   public JournalFile openFile() throws InterruptedException
+   {
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+      }
+
+      Runnable run = new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               pushOpenedFile();
+            }
+            catch (Exception e)
+            {
+               JournalFilesRepository.log.error(e.getMessage(), e);
+            }
+         }
+      };
+
+      if (filesExecutor == null)
+      {
+         run.run();
+      }
+      else
+      {
+         filesExecutor.execute(run);
+      }
+
+      JournalFile nextFile = null;
+
+      while (nextFile == null)
+      {
+         nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+         if (nextFile == null)
+         {
+            JournalFilesRepository.log.warn("Couldn't open a file in 60 Seconds",
+                                     new Exception("Warning: Couldn't open a file in 60 Seconds"));
+         }
+      }
+
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("Returning file " + nextFile);
+      }
+
+      return nextFile;
+   }
+
+   /** 
+    * 
+    * Open a file and place it into the openedFiles queue
+    * */
+   public void pushOpenedFile() throws Exception
+   {
+      JournalFile nextOpenedFile = takeFile(true, true, true, false);
+
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("pushing openFile " + nextOpenedFile);
+      }
+
+      openedFiles.offer(nextOpenedFile);
+   }
+
+   public void closeFile(final JournalFile file)
+   {
+      fileFactory.deactivateBuffer();
+      pendingCloseFiles.add(file);
+      dataFiles.add(file);
+
+      Runnable run = new Runnable()
+      {
+         public void run()
+         {
+            drainClosedFiles();
+         }
+      };
+
+      if (filesExecutor == null)
+      {
+         run.run();
+      }
+      else
+      {
+         filesExecutor.execute(run);
+      }
+
+   }
+
+   /**
+    * This will get a File from freeFile without initializing it
+    * @return
+    * @throws Exception
+    */
+   public JournalFile takeFile(final boolean keepOpened,
+                               final boolean multiAIO,
+                               final boolean initFile,
+                               final boolean tmpCompactExtension) throws Exception
+   {
+      JournalFile nextOpenedFile = null;
+
+      nextOpenedFile = freeFiles.poll();
+
+      if (nextOpenedFile == null)
+      {
+         nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+      }
+      else
+      {
+         if (tmpCompactExtension)
+         {
+            SequentialFile sequentialFile = nextOpenedFile.getFile();
+            sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+         }
+
+         if (keepOpened)
+         {
+            openFile(nextOpenedFile, multiAIO);
+         }
+      }
+      return nextOpenedFile;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+    * @param keepOpened
+    * @return
+    * @throws Exception
+    */
+   private JournalFile createFile(final boolean keepOpened,
+                                  final boolean multiAIO,
+                                  final boolean init,
+                                  final boolean tmpCompact) throws Exception
+   {
+      long fileID = generateFileID();
+
+      String fileName;
+
+      fileName = createFileName(tmpCompact, fileID);
+
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("Creating file " + fileName);
+      }
+
+      String tmpFileName = fileName + ".tmp";
+
+      SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+
+      sequentialFile.open(1, false);
+
+      if (init)
+      {
+         sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+
+         JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
+      }
+
+      long position = sequentialFile.position();
+
+      sequentialFile.close();
+
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
+      }
+
+      sequentialFile.renameTo(fileName);
+
+      if (keepOpened)
+      {
+         if (multiAIO)
+         {
+            sequentialFile.open();
+         }
+         else
+         {
+            sequentialFile.open(1, false);
+         }
+         sequentialFile.position(position);
+      }
+
+      return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+   }
+
+   /**
+    * @param tmpCompact
+    * @param fileID
+    * @return
+    */
+   private String createFileName(final boolean tmpCompact, final long fileID)
+   {
+      String fileName;
+      if (tmpCompact)
+      {
+         fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+      }
+      else
+      {
+         fileName = filePrefix + "-" + fileID + "." + fileExtension;
+      }
+      return fileName;
+   }
+
+   private long generateFileID()
+   {
+      return nextFileID.incrementAndGet();
+   }
+
+   /** Get the ID part of the name */
+   private long getFileNameID(final String fileName)
+   {
+      try
+      {
+         return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+      }
+      catch (Throwable e)
+      {
+         JournalFilesRepository.log.warn("Impossible to get the ID part of the file name " + fileName, e);
+         return 0;
+      }
+   }
+
+   // Discard the old JournalFile and set it with a new ID
+   private JournalFile reinitializeFile(final JournalFile file) throws Exception
+   {
+      long newFileID = generateFileID();
+
+      SequentialFile sf = file.getFile();
+
+      sf.open(1, false);
+
+      int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
+
+      JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+
+      sf.position(position);
+
+      sf.close();
+
+      return jf;
+   }
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-27 15:56:15 UTC (rev 9603)
@@ -63,7 +63,6 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.DataConstants;
 
-
 /**
  * 
  * <p>A circular log implementation.</p
@@ -86,15 +85,15 @@
    private static final int STATE_LOADED = 2;
 
    public static final int FORMAT_VERSION = 2;
-   
-   private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
 
+   private static final int COMPATIBLE_VERSIONS[] = new int[] { 1 };
+
    // Static --------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(JournalImpl.class);
 
-   private static final boolean trace = log.isTraceEnabled();
- 
+   private static final boolean trace = JournalImpl.log.isTraceEnabled();
+
    // This is useful at debug time...
    // if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
    private static final boolean TRACE_RECORDS = false;
@@ -104,9 +103,9 @@
    // Journal
    private static final void trace(final String message)
    {
-      log.trace(message);
+      JournalImpl.log.trace(message);
    }
-   
+
    private static final void traceRecord(final String message)
    {
       System.out.println(message);
@@ -183,9 +182,8 @@
 
    private final SequentialFileFactory fileFactory;
 
+   private final JournalFilesRepository filesRepository;
 
-   private final FilesRepository filesRepository;
-   
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
 
@@ -291,9 +289,15 @@
       this.minFiles = minFiles;
 
       this.fileFactory = fileFactory;
-      
-      filesRepository = new FilesRepository(fileFactory, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
 
+      filesRepository = new JournalFilesRepository(fileFactory,
+                                                   filePrefix,
+                                                   fileExtension,
+                                                   userVersion,
+                                                   maxAIO,
+                                                   fileSize,
+                                                   minFiles);
+
       this.userVersion = userVersion;
    }
 
@@ -389,7 +393,7 @@
 
          try
          {
-            
+
             JournalFileImpl jrnFile = readFileHeader(file);
 
             orderedFiles.add(jrnFile);
@@ -469,20 +473,20 @@
             }
 
             short compactCount = 0;
-            
+
             if (file.getJournalVersion() >= 2)
             {
                if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
                {
                   reader.markAsDataFile(file);
-   
+
                   wholeFileBuffer.position(pos + 1);
                   continue;
                }
-               
+
                compactCount = wholeFileBuffer.get();
             }
-            
+
             long transactionID = 0;
 
             if (JournalImpl.isTransaction(recordType))
@@ -678,19 +682,31 @@
 
                case ADD_RECORD_TX:
                {
-                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
+                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID,
+                                                                         userRecordType,
+                                                                         record,
+                                                                         false,
+                                                                         compactCount));
                   break;
                }
 
                case UPDATE_RECORD_TX:
                {
-                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
+                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID,
+                                                                            userRecordType,
+                                                                            record,
+                                                                            true,
+                                                                            compactCount));
                   break;
                }
 
                case DELETE_RECORD_TX:
                {
-                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true, compactCount));
+                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID,
+                                                                            (byte)0,
+                                                                            record,
+                                                                            true,
+                                                                            compactCount));
                   break;
                }
 
@@ -744,7 +760,7 @@
       }
       catch (Throwable e)
       {
-         log.warn(e.getMessage(), e);
+         JournalImpl.log.warn(e.getMessage(), e);
          throw new Exception(e.getMessage(), e);
       }
       finally
@@ -820,7 +836,10 @@
          {
             JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
 
-            if (TRACE_RECORDS) traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+            }
 
             records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
          }
@@ -898,7 +917,10 @@
          {
             JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
 
-            if (TRACE_RECORDS) traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+            }
 
             // record== null here could only mean there is a compactor, and computing the delete should be done after
             // compacting is done
@@ -976,7 +998,10 @@
          {
             JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
 
-            if (TRACE_RECORDS) traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+            }
 
             // record== null here could only mean there is a compactor, and computing the delete should be done after
             // compacting is done
@@ -1030,7 +1055,14 @@
          {
             JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
 
-            if (TRACE_RECORDS) traceRecord("appendAddRecordTransactional:txID=" + txID + ",id=" + id + ", usedFile = " + usedFile);
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
+                                       ",id=" +
+                                       id +
+                                       ", usedFile = " +
+                                       usedFile);
+            }
 
             tx.addPositive(usedFile, id, addRecord.getEncodeSize());
          }
@@ -1076,7 +1108,14 @@
          {
             JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
 
-            if (TRACE_RECORDS) traceRecord("appendUpdateRecordTransactional::txID=" + txID + ",id="+id + ", usedFile = " + usedFile);
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
+                                       ",id=" +
+                                       id +
+                                       ", usedFile = " +
+                                       usedFile);
+            }
 
             tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
          }
@@ -1116,7 +1155,14 @@
          {
             JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
 
-            if (TRACE_RECORDS) traceRecord("appendDeleteRecordTransactional::txID=" + txID + ", id=" + id + ", usedFile = " + usedFile);
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendDeleteRecordTransactional::txID=" + txID +
+                                       ", id=" +
+                                       id +
+                                       ", usedFile = " +
+                                       usedFile);
+            }
 
             tx.addNegative(usedFile, id);
          }
@@ -1206,9 +1252,12 @@
          try
          {
             JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
-            
-            if (TRACE_RECORDS) traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
 
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+            }
+
             tx.prepare(usedFile);
          }
          finally
@@ -1226,7 +1275,7 @@
    public void appendCommitRecord(final long txID, final boolean sync) throws Exception
    {
       SyncIOCompletion syncCompletion = getSyncCallback(sync);
-      
+
       appendCommitRecord(txID, sync, syncCompletion);
 
       if (syncCompletion != null)
@@ -1281,9 +1330,12 @@
          try
          {
             JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
-            
-            if (TRACE_RECORDS) traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
 
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+            }
+
             tx.commit(usedFile);
          }
          finally
@@ -1413,9 +1465,9 @@
          private void checkDeleteSize()
          {
             // HORNETQ-482 - Flush deletes only if memory is critical
-            if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
+            if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2)
             {
-               log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+               JournalImpl.log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
                // Clean up when the list is too large, or it won't be possible to load large sets of files
                // Done as part of JBMESSAGING-1678
                Iterator<RecordInfo> iter = records.iterator();
@@ -1431,7 +1483,7 @@
 
                recordsToDelete.clear();
 
-               log.debug("flush delete done");
+               JournalImpl.log.debug("flush delete done");
             }
          }
 
@@ -1500,13 +1552,16 @@
 
       try
       {
-         if (trace)
+         if (JournalImpl.trace)
          {
             JournalImpl.trace("Starting compacting operation on journal");
          }
-         
-         if (TRACE_RECORDS) traceRecord("Starting compacting operation on journal");
 
+         if (JournalImpl.TRACE_RECORDS)
+         {
+            JournalImpl.traceRecord("Starting compacting operation on journal");
+         }
+
          onCompactStart();
 
          // We need to guarantee that the journal is frozen for this short time
@@ -1527,7 +1582,7 @@
             moveNextFile(false);
 
             filesRepository.drainClosedFiles();
-            
+
             // Take the snapshots and replace the structures
 
             dataFilesToProcess.addAll(filesRepository.getDataFiles());
@@ -1539,7 +1594,11 @@
                return;
             }
 
-            compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
+            compactor = new JournalCompactor(fileFactory,
+                                             this,
+                                             filesRepository,
+                                             records.keySet(),
+                                             dataFilesToProcess.get(0).getFileID());
 
             for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
             {
@@ -1571,7 +1630,7 @@
             }
             catch (Throwable e)
             {
-               log.warn("Error on reading compacting for " + file);
+               JournalImpl.log.warn("Error on reading compacting for " + file);
                throw new Exception("Error on reading compacting for " + file, e);
             }
          }
@@ -1616,7 +1675,7 @@
                filesRepository.addDataFileOnTop(fileToAdd);
             }
 
-            if (trace)
+            if (JournalImpl.trace)
             {
                JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
             }
@@ -1647,7 +1706,7 @@
                }
                else
                {
-                  log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+                  JournalImpl.log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
                }
             }
          }
@@ -1660,13 +1719,16 @@
          renameFiles(dataFilesToProcess, newDatafiles);
          deleteControlFile(controlFile);
 
-         if (trace)
+         if (JournalImpl.trace)
          {
             JournalImpl.log.debug("Finished compacting on journal");
          }
-         
-         if (TRACE_RECORDS) traceRecord("Finished compacting on journal");
 
+         if (JournalImpl.TRACE_RECORDS)
+         {
+            JournalImpl.traceRecord("Finished compacting on journal");
+         }
+
       }
       finally
       {
@@ -1794,7 +1856,8 @@
                   // have been deleted
                   // just leaving some updates in this file
 
-                  posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
+                  posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
+                                                                                                    // count
                }
             }
 
@@ -1844,7 +1907,8 @@
                   transactions.put(transactionID, tnp);
                }
 
-               tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
+               tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
+                                                                                                      // count
             }
 
             public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
@@ -2021,7 +2085,7 @@
       filesRepository.ensureMinFiles();
 
       // The current file is the last one that has data
-      
+
       currentFile = filesRepository.pollLastDataFile();
 
       if (currentFile != null)
@@ -2118,7 +2182,7 @@
                {
                   JournalImpl.trace("Reclaiming file " + file);
                }
-               
+
                filesRepository.removeDataFile(file);
 
                filesRepository.addFreeFile(file, false);
@@ -2147,9 +2211,9 @@
       long totalBytes = (long)dataFiles.length * (long)fileSize;
 
       long compactMargin = (long)(totalBytes * compactPercentage);
-      
-      boolean needCompact = (totalLiveSize < compactMargin && dataFiles.length > compactMinFiles);
 
+      boolean needCompact = totalLiveSize < compactMargin && dataFiles.length > compactMinFiles;
+
       return needCompact;
 
    }
@@ -2392,7 +2456,7 @@
       filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
       {
 
-         public Thread newThread(Runnable r)
+         public Thread newThread(final Runnable r)
          {
             return new Thread(r, "JournalImpl::FilesExecutor");
          }
@@ -2401,12 +2465,12 @@
       compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
       {
 
-         public Thread newThread(Runnable r)
+         public Thread newThread(final Runnable r)
          {
             return new Thread(r, "JournalImpl::CompactorExecutor");
          }
       });
-      
+
       filesRepository.setExecutor(filesExecutor);
 
       fileFactory.start();
@@ -2523,7 +2587,7 @@
                }
                catch (Throwable e)
                {
-                  log.warn("Error reinitializing file " + file, e);
+                  JournalImpl.log.warn("Error reinitializing file " + file, e);
                }
             }
             done.countDown();
@@ -2537,7 +2601,7 @@
 
       for (JournalFile file : newFiles)
       {
-         String newName = renameExtensionFile(file.getFile().getFileName(), ".cmp");
+         String newName = JournalImpl.renameExtensionFile(file.getFile().getFileName(), ".cmp");
          file.getFile().renameTo(newName);
       }
 
@@ -2547,7 +2611,7 @@
     * @param name
     * @return
     */
-   protected static String renameExtensionFile(String name, String extension)
+   protected static String renameExtensionFile(String name, final String extension)
    {
       name = name.substring(0, name.lastIndexOf(extension));
       return name;
@@ -2663,29 +2727,30 @@
     * @return
     * @throws Exception
     */
-   private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
+   private JournalFileImpl readFileHeader(final SequentialFile file) throws Exception
    {
       ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
 
       file.read(bb);
 
       int journalVersion = bb.getInt();
-      
-      if (journalVersion != FORMAT_VERSION)
+
+      if (journalVersion != JournalImpl.FORMAT_VERSION)
       {
          boolean isCompatible = false;
-         
-         for (int v : COMPATIBLE_VERSIONS)
+
+         for (int v : JournalImpl.COMPATIBLE_VERSIONS)
          {
             if (v == journalVersion)
             {
                isCompatible = true;
             }
          }
-         
+
          if (!isCompatible)
          {
-            throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
+            throw new HornetQException(HornetQException.IO_ERROR,
+                                       "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
          }
       }
 
@@ -2701,7 +2766,7 @@
       fileFactory.releaseBuffer(bb);
 
       bb = null;
-      
+
       return new JournalFileImpl(file, fileID, journalVersion);
    }
 
@@ -2720,7 +2785,7 @@
 
       HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
 
-      writeHeader(buffer, userVersion, fileID);
+      JournalImpl.writeHeader(buffer, userVersion, fileID);
 
       bb.rewind();
 
@@ -2738,9 +2803,9 @@
     * @param userVersion
     * @param fileID
     */
-   public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
+   public static void writeHeader(final HornetQBuffer buffer, final int userVersion, final long fileID)
    {
-      buffer.writeInt(FORMAT_VERSION);
+      buffer.writeInt(JournalImpl.FORMAT_VERSION);
 
       buffer.writeInt(userVersion);
 
@@ -2788,7 +2853,7 @@
       {
          throw new NullPointerException("Current file = null");
       }
-      
+
       if (tx != null)
       {
          // The callback of a transaction has to be taken inside the lock,
@@ -2850,7 +2915,7 @@
       filesRepository.closeFile(currentFile);
 
       currentFile = filesRepository.openFile();
-      
+
       if (scheduleReclaim)
       {
          scheduleReclaim();
@@ -2894,7 +2959,6 @@
       }
    }
 
-
    private JournalTransaction getTransactionInfo(final long txID)
    {
       JournalTransaction tx = transactions.get(txID);



More information about the hornetq-commits mailing list