[hornetq-commits] JBoss hornetq SVN: r9606 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 27 12:44:18 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-27 12:44:17 -0400 (Fri, 27 Aug 2010)
New Revision: 9606

Removed:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
Modified:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
auto cleanup only

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -279,8 +279,8 @@
 
       aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
    }
-   
-   public void writeInternal(ByteBuffer bytes) throws Exception
+
+   public void writeInternal(final ByteBuffer bytes) throws Exception
    {
       final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
 
@@ -289,7 +289,6 @@
       aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
    }
 
-
    // Protected methods
    // -----------------------------------------------------------------------------------------------------
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -146,7 +146,8 @@
       super.start();
 
       pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
-                                                                              true, getThisClassLoader()));
+                                                                              true,
+                                                                              AIOSequentialFileFactory.getThisClassLoader()));
 
    }
 
@@ -295,18 +296,17 @@
          }
       }
    }
-   
+
    private static ClassLoader getThisClassLoader()
    {
       return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
-                                    {
-                                       public ClassLoader run()
-                                       {
-                                          return ClientSessionFactoryImpl.class.getClassLoader();
-                                       }
-                                    });
-      
+      {
+         public ClassLoader run()
+         {
+            return ClientSessionFactoryImpl.class.getClassLoader();
+         }
+      });
+
    }
 
-
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -54,7 +54,7 @@
    protected JournalFile currentFile;
 
    protected SequentialFile sequentialFile;
-   
+
    protected final JournalFilesRepository filesRepository;
 
    protected long nextOrderingID;
@@ -154,16 +154,14 @@
                                                                     new ByteArrayEncoding(filesToRename.toByteBuffer()
                                                                                                        .array()));
 
-
-
          HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(filesToRename.writerIndex());
 
          controlRecord.setFileID(0);
-         
+
          controlRecord.encode(renameBuffer);
 
          ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
-         
+
          writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
 
          writeBuffer.rewind();
@@ -184,10 +182,10 @@
       if (writingChannel != null)
       {
          sequentialFile.position(0);
-         
+
          // To Fix the size of the file
          writingChannel.writerIndex(writingChannel.capacity());
-         
+
          sequentialFile.writeInternal(writingChannel.toByteBuffer());
          sequentialFile.close();
          newDataFiles.add(currentFile);
@@ -217,13 +215,13 @@
       writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
 
       currentFile = filesRepository.takeFile(false, false, false, true);
-      
+
       sequentialFile = currentFile.getFile();
 
       sequentialFile.open(1, false);
 
       currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
-      
+
       JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
    }
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -117,7 +117,8 @@
       if (isSupportsCallbacks())
       {
          writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
-                                                                                    true, getThisClassLoader()));
+                                                                                    true,
+                                                                                    AbstractSequentialFileFactory.getThisClassLoader()));
       }
 
    }
@@ -193,14 +194,13 @@
    private static ClassLoader getThisClassLoader()
    {
       return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
-                                    {
-                                       public ClassLoader run()
-                                       {
-                                          return ClientSessionFactoryImpl.class.getClassLoader();
-                                       }
-                                    });
-      
+      {
+         public ClassLoader run()
+         {
+            return ClientSessionFactoryImpl.class.getClassLoader();
+         }
+      });
+
    }
 
-
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -19,8 +19,7 @@
 import java.io.PrintStream;
 import java.util.List;
 
-import org.hornetq.core.journal.*;
-
+import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.utils.Base64;
 
@@ -48,7 +47,7 @@
 
    // Public --------------------------------------------------------
 
-   public static void main(String arg[])
+   public static void main(final String arg[])
    {
       if (arg.length != 5)
       {
@@ -58,7 +57,7 @@
 
       try
       {
-         exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+         ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
       }
       catch (Exception e)
       {
@@ -67,32 +66,31 @@
 
    }
 
-   public static void exportJournal(String directory,
-                                    String journalPrefix,
-                                    String journalSuffix,
-                                    int minFiles,
-                                    int fileSize,
-                                    String fileOutput) throws Exception
+   public static void exportJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final String fileOutput) throws Exception
    {
-      
+
       FileOutputStream fileOut = new FileOutputStream(new File(fileOutput));
 
       BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
 
       PrintStream out = new PrintStream(buffOut);
-      
-      exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
-      
+
+      ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+
       out.close();
    }
 
-   
-   public static void exportJournal(String directory,
-                                    String journalPrefix,
-                                    String journalSuffix,
-                                    int minFiles,
-                                    int fileSize,
-                                    PrintStream out) throws Exception
+   public static void exportJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final PrintStream out) throws Exception
    {
       NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
 
@@ -104,7 +102,7 @@
       {
          out.println("#File," + file);
 
-         exportJournalFile(out, nio, file);
+         ExportJournal.exportJournalFile(out, nio, file);
       }
    }
 
@@ -114,67 +112,71 @@
     * @param file
     * @throws Exception
     */
-   public static void exportJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
+   public static void exportJournalFile(final PrintStream out,
+                                        final SequentialFileFactory fileFactory,
+                                        final JournalFile file) throws Exception
    {
       JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
       {
 
-         public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+         public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
          {
-            out.println("operation at UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+            out.println("operation at UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
          }
 
-         public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+         public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
          {
-            out.println("operation at Update," + describeRecord(recordInfo));
+            out.println("operation at Update," + ExportJournal.describeRecord(recordInfo));
          }
 
-         public void onReadRollbackRecord(long transactionID) throws Exception
+         public void onReadRollbackRecord(final long transactionID) throws Exception
          {
             out.println("operation at Rollback,txID@" + transactionID);
          }
 
-         public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+         public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
          {
             out.println("operation at Prepare,txID@" + transactionID +
                         ",numberOfRecords@" +
                         numberOfRecords +
                         ",extraData@" +
-                        encode(extraData));
+                        ExportJournal.encode(extraData));
          }
 
-         public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+         public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
          {
-            out.println("operation at DeleteRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+            out.println("operation at DeleteRecordTX,txID@" + transactionID +
+                        "," +
+                        ExportJournal.describeRecord(recordInfo));
          }
 
-         public void onReadDeleteRecord(long recordID) throws Exception
+         public void onReadDeleteRecord(final long recordID) throws Exception
          {
             out.println("operation at DeleteRecord,id@" + recordID);
          }
 
-         public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+         public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
          {
             out.println("operation at Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
          }
 
-         public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+         public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
          {
-            out.println("operation at AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+            out.println("operation at AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
          }
 
-         public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+         public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
          {
-            out.println("operation at AddRecord," + describeRecord(recordInfo));
+            out.println("operation at AddRecord," + ExportJournal.describeRecord(recordInfo));
          }
 
-         public void markAsDataFile(JournalFile file)
+         public void markAsDataFile(final JournalFile file)
          {
          }
       });
    }
 
-   private static String describeRecord(RecordInfo recordInfo)
+   private static String describeRecord(final RecordInfo recordInfo)
    {
       return "id@" + recordInfo.id +
              ",userRecordType@" +
@@ -186,10 +188,10 @@
              ",compactCount@" +
              recordInfo.compactCount +
              ",data@" +
-             encode(recordInfo.data);
+             ExportJournal.encode(recordInfo.data);
    }
 
-   private static String encode(byte[] data)
+   private static String encode(final byte[] data)
    {
       return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
    }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -25,7 +25,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalRecord;
 import org.hornetq.utils.Base64;
 
 /**
@@ -52,7 +51,7 @@
 
    // Public --------------------------------------------------------
 
-   public static void main(String arg[])
+   public static void main(final String arg[])
    {
       if (arg.length != 5)
       {
@@ -62,7 +61,7 @@
 
       try
       {
-         importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+         ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
       }
       catch (Exception e)
       {
@@ -71,34 +70,35 @@
 
    }
 
-   public static void importJournal(String directory,
-                                    String journalPrefix,
-                                    String journalSuffix,
-                                    int minFiles,
-                                    int fileSize,
-                                    String fileInput) throws Exception
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final String fileInput) throws Exception
    {
       FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
-      importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+      ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
 
    }
-   public static void importJournal(String directory,
-                                    String journalPrefix,
-                                    String journalSuffix,
-                                    int minFiles,
-                                    int fileSize,
-                                    InputStream stream) throws Exception
+
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final InputStream stream) throws Exception
    {
       Reader reader = new InputStreamReader(stream);
-      importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+      ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
    }
 
-   public static void importJournal(String directory,
-                                    String journalPrefix,
-                                    String journalSuffix,
-                                    int minFiles,
-                                    int fileSize,
-                                    Reader reader) throws Exception
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final Reader reader) throws Exception
    {
 
       File journalDir = new File(directory);
@@ -139,7 +139,7 @@
             continue;
          }
 
-         Properties lineProperties = parseLine(splitLine);
+         Properties lineProperties = ImportJournal.parseLine(splitLine);
 
          String operation = null;
          try
@@ -148,67 +148,67 @@
 
             if (operation.equals("AddRecord"))
             {
-               RecordInfo info = parseRecord(lineProperties);
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
                journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
             }
             else if (operation.equals("AddRecordTX"))
             {
-               long txID = parseLong("txID", lineProperties);
-               AtomicInteger counter = getCounter(txID, txCounters);
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
                counter.incrementAndGet();
-               RecordInfo info = parseRecord(lineProperties);
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
                journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
             }
             else if (operation.equals("AddRecordTX"))
             {
-               long txID = parseLong("txID", lineProperties);
-               AtomicInteger counter = getCounter(txID, txCounters);
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
                counter.incrementAndGet();
-               RecordInfo info = parseRecord(lineProperties);
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
                journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
             }
             else if (operation.equals("UpdateTX"))
             {
-               long txID = parseLong("txID", lineProperties);
-               AtomicInteger counter = getCounter(txID, txCounters);
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
                counter.incrementAndGet();
-               RecordInfo info = parseRecord(lineProperties);
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
                journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
             }
             else if (operation.equals("Update"))
             {
-               RecordInfo info = parseRecord(lineProperties);
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
                journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
             }
             else if (operation.equals("DeleteRecord"))
             {
-               long id = parseLong("id", lineProperties);
+               long id = ImportJournal.parseLong("id", lineProperties);
 
                // If not found it means the append/update records were reclaimed already
-               if (journalRecords.get((Long)id) != null)
+               if (journalRecords.get(id) != null)
                {
                   journal.appendDeleteRecord(id, false);
                }
             }
             else if (operation.equals("DeleteRecordTX"))
             {
-               long txID = parseLong("txID", lineProperties);
-               long id = parseLong("id", lineProperties);
-               AtomicInteger counter = getCounter(txID, txCounters);
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               long id = ImportJournal.parseLong("id", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
                counter.incrementAndGet();
 
                // If not found it means the append/update records were reclaimed already
-               if (journalRecords.get((Long)id) != null)
+               if (journalRecords.get(id) != null)
                {
                   journal.appendDeleteRecordTransactional(txID, id);
                }
             }
             else if (operation.equals("Prepare"))
             {
-               long txID = parseLong("txID", lineProperties);
-               int numberOfRecords = parseInt("numberOfRecords", lineProperties);
-               AtomicInteger counter = getCounter(txID, txCounters);
-               byte[] data = parseEncoding("extraData", lineProperties);
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+               byte[] data = ImportJournal.parseEncoding("extraData", lineProperties);
 
                if (counter.get() == numberOfRecords)
                {
@@ -227,9 +227,9 @@
             }
             else if (operation.equals("Commit"))
             {
-               long txID = parseLong("txID", lineProperties);
-               int numberOfRecords = parseInt("numberOfRecords", lineProperties);
-               AtomicInteger counter = getCounter(txID, txCounters);
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
                if (counter.get() == numberOfRecords)
                {
                   journal.appendCommitRecord(txID, false);
@@ -247,7 +247,7 @@
             }
             else if (operation.equals("Rollback"))
             {
-               long txID = parseLong("txID", lineProperties);
+               long txID = ImportJournal.parseLong("txID", lineProperties);
                journal.appendRollbackRecord(txID, false);
             }
             else
@@ -264,7 +264,7 @@
       journal.stop();
    }
 
-   protected static AtomicInteger getCounter(Long txID, Map<Long, AtomicInteger> txCounters)
+   protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters)
    {
 
       AtomicInteger counter = txCounters.get(txID);
@@ -277,50 +277,50 @@
       return counter;
    }
 
-   protected static RecordInfo parseRecord(Properties properties) throws Exception
+   protected static RecordInfo parseRecord(final Properties properties) throws Exception
    {
-      long id = parseLong("id", properties);
-      byte userRecordType = parseByte("userRecordType", properties);
-      boolean isUpdate = parseBoolean("isUpdate", properties);
-      byte[] data = parseEncoding("data", properties);
+      long id = ImportJournal.parseLong("id", properties);
+      byte userRecordType = ImportJournal.parseByte("userRecordType", properties);
+      boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties);
+      byte[] data = ImportJournal.parseEncoding("data", properties);
       return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
    }
 
-   private static byte[] parseEncoding(String name, Properties properties) throws Exception
+   private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
    {
-      String value = parseString(name, properties);
+      String value = ImportJournal.parseString(name, properties);
 
-      return decode(value);
+      return ImportJournal.decode(value);
    }
 
    /**
     * @param properties
     * @return
     */
-   private static int parseInt(String name, Properties properties) throws Exception
+   private static int parseInt(final String name, final Properties properties) throws Exception
    {
-      String value = parseString(name, properties);
+      String value = ImportJournal.parseString(name, properties);
 
       return Integer.parseInt(value);
    }
 
-   private static long parseLong(String name, Properties properties) throws Exception
+   private static long parseLong(final String name, final Properties properties) throws Exception
    {
-      String value = parseString(name, properties);
+      String value = ImportJournal.parseString(name, properties);
 
       return Long.parseLong(value);
    }
 
-   private static boolean parseBoolean(String name, Properties properties) throws Exception
+   private static boolean parseBoolean(final String name, final Properties properties) throws Exception
    {
-      String value = parseString(name, properties);
+      String value = ImportJournal.parseString(name, properties);
 
       return Boolean.parseBoolean(value);
    }
 
-   private static byte parseByte(String name, Properties properties) throws Exception
+   private static byte parseByte(final String name, final Properties properties) throws Exception
    {
-      String value = parseString(name, properties);
+      String value = ImportJournal.parseString(name, properties);
 
       return Byte.parseByte(value);
    }
@@ -331,7 +331,7 @@
     * @return
     * @throws Exception
     */
-   private static String parseString(String name, Properties properties) throws Exception
+   private static String parseString(final String name, final Properties properties) throws Exception
    {
       String value = properties.getProperty(name);
 
@@ -342,7 +342,7 @@
       return value;
    }
 
-   protected static Properties parseLine(String[] splitLine)
+   protected static Properties parseLine(final String[] splitLine)
    {
       Properties properties = new Properties();
 
@@ -362,7 +362,7 @@
       return properties;
    }
 
-   private static byte[] decode(String data)
+   private static byte[] decode(final String data)
    {
       return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
    }

Deleted: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -47,10 +47,10 @@
 {
 
    private static final Logger log = Logger.getLogger(JournalCompactor.class);
-   
+
    // We try to separate old record from new ones when doing the compacting
    // this is a split line
-   // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE 
+   // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
    private final short COMPACT_SPLIT_LINE = 2;
 
    // Snapshot of transactions that were pending when the compactor started
@@ -217,7 +217,7 @@
    {
       checkSize(size, -1);
    }
-   
+
    private void checkSize(final int size, final int compactCount) throws Exception
    {
       if (getWritingChannel() == null)
@@ -239,17 +239,19 @@
                return;
             }
          }
-         
+
          if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
          {
             openFile();
          }
       }
    }
-   
+
    int currentCount;
+
    // This means we will need to split when the compactCount is bellow the watermark
    boolean willNeedToSplit = false;
+
    boolean splitted = false;
 
    private boolean checkCompact(final int compactCount) throws Exception
@@ -258,7 +260,7 @@
       {
          willNeedToSplit = true;
       }
-      
+
       if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
       {
          willNeedToSplit = false;
@@ -271,8 +273,6 @@
          return false;
       }
    }
-   
-   
 
    /**
     * Replay pending counts that happened during compacting
@@ -305,7 +305,7 @@
                                                                 info.getUserRecordType(),
                                                                 new ByteArrayEncoding(info.data));
          addRecord.setCompactCount((short)(info.compactCount + 1));
-         
+
          checkSize(addRecord.getEncodeSize(), info.compactCount);
 
          writeEncoder(addRecord);
@@ -327,7 +327,7 @@
                                                                new ByteArrayEncoding(info.data));
 
          record.setCompactCount((short)(info.compactCount + 1));
-         
+
          checkSize(record.getEncodeSize(), info.compactCount);
 
          newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
@@ -347,15 +347,15 @@
       }
       else
       {
-         JournalTransaction newTransaction =  newTransactions.remove(transactionID);
+         JournalTransaction newTransaction = newTransactions.remove(transactionID);
          if (newTransaction != null)
          {
             JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, transactionID, null);
-   
+
             checkSize(commitRecord.getEncodeSize());
-   
+
             writeEncoder(commitRecord, newTransaction.getCounter(currentFile));
-   
+
             newTransaction.commit(currentFile);
          }
       }
@@ -366,7 +366,8 @@
       if (newRecords.get(recordID) != null)
       {
          // Sanity check, it should never happen
-         throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID + ")");
+         throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID +
+                                         ")");
       }
 
    }
@@ -428,16 +429,16 @@
          JournalTransaction newTransaction = newTransactions.remove(transactionID);
          if (newTransaction != null)
          {
-            
+
             JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID);
-            
+
             checkSize(rollbackRecord.getEncodeSize());
 
             writeEncoder(rollbackRecord);
-            
+
             newTransaction.rollback(currentFile);
          }
-         
+
       }
    }
 
@@ -451,7 +452,7 @@
                                                                    new ByteArrayEncoding(info.data));
 
          updateRecord.setCompactCount((short)(info.compactCount + 1));
-         
+
          checkSize(updateRecord.getEncodeSize(), info.compactCount);
 
          JournalRecord newRecord = newRecords.get(info.id);
@@ -483,7 +484,7 @@
                                                                        new ByteArrayEncoding(info.data));
 
          updateRecordTX.setCompactCount((short)(info.compactCount + 1));
-         
+
          checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
 
          writeEncoder(updateRecordTX);
@@ -534,7 +535,7 @@
          JournalRecord deleteRecord = journal.getRecords().remove(id);
          if (deleteRecord == null)
          {
-            log.warn("Can't find record " + id + " during compact replay");
+            JournalCompactor.log.warn("Can't find record " + id + " during compact replay");
          }
          else
          {

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -45,7 +45,7 @@
    void decSize(int bytes);
 
    int getLiveSize();
-   
+
    /** The total number of deletes this file has */
    int getTotalNegativeToOthers();
 
@@ -58,9 +58,9 @@
    /** This is a field to identify that records on this file actually belong to the current file.
     *  The possible implementation for this is fileID & Integer.MAX_VALUE */
    int getRecordID();
-   
+
    long getFileID();
-   
+
    int getJournalVersion();
 
    SequentialFile getFile();

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -47,7 +47,7 @@
 
    private boolean canReclaim;
 
-   private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+   private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
 
    private final int version;
 
@@ -61,7 +61,7 @@
 
       this.version = version;
 
-      this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
+      recordID = (int)(fileID & Integer.MAX_VALUE);
    }
 
    public void clearCounts()
@@ -165,7 +165,7 @@
    {
       try
       {
-         return "JournalFileImpl: (" + file.getFileName() + " id = " + this.fileID + ", recordID = " + recordID + ")";
+         return "JournalFileImpl: (" + file.getFileName() + " id = " + fileID + ", recordID = " + recordID + ")";
       }
       catch (Exception e)
       {

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -86,12 +86,12 @@
    // Constructors --------------------------------------------------
 
    public JournalFilesRepository(final SequentialFileFactory fileFactory,
-                          final String filePrefix,
-                          final String fileExtension,
-                          final int userVersion,
-                          final int maxAIO,
-                          final int fileSize,
-                          final int minFiles)
+                                 final String filePrefix,
+                                 final String fileExtension,
+                                 final int userVersion,
+                                 final int maxAIO,
+                                 final int fileSize,
+                                 final int minFiles)
    {
       this.fileFactory = fileFactory;
       this.maxAIO = maxAIO;
@@ -269,7 +269,7 @@
       if (file.getFile().size() != fileSize)
       {
          JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
-                                  new Exception("trace"));
+                                         new Exception("trace"));
          file.getFile().delete();
       }
       else
@@ -376,7 +376,7 @@
          if (nextFile == null)
          {
             JournalFilesRepository.log.warn("Couldn't open a file in 60 Seconds",
-                                     new Exception("Warning: Couldn't open a file in 60 Seconds"));
+                                            new Exception("Warning: Couldn't open a file in 60 Seconds"));
          }
       }
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -1857,7 +1857,7 @@
                   // just leaving some updates in this file
 
                   posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
-                                                                                                    // count
+                  // count
                }
             }
 
@@ -1908,7 +1908,7 @@
                }
 
                tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
-                                                                                                      // count
+               // count
             }
 
             public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -74,6 +74,7 @@
       }
    }
 
+   @Override
    public String toString()
    {
       StringBuffer buffer = new StringBuffer();

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -29,6 +29,6 @@
 public interface JournalRecordProvider
 {
    JournalCompactor getCompactor();
-   
+
    Map<Long, JournalRecord> getRecords();
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -61,10 +61,10 @@
       this.id = id;
       this.journal = journal;
    }
-   
-   public void replaceRecordProvider(JournalRecordProvider provider)
+
+   public void replaceRecordProvider(final JournalRecordProvider provider)
    {
-      this.journal = provider;
+      journal = provider;
    }
 
    /**
@@ -329,7 +329,7 @@
                else
                {
                   JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
-   
+
                   if (posFiles != null)
                   {
                      posFiles.delete(trDelete.file);

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -140,7 +140,7 @@
    @Override
    public synchronized void close() throws Exception
    {
-       super.close();
+      super.close();
 
       if (maxIOSemaphore != null)
       {
@@ -260,14 +260,12 @@
    {
       internalWrite(bytes, sync, null);
    }
-   
-   
-   public void writeInternal(ByteBuffer bytes) throws Exception
+
+   public void writeInternal(final ByteBuffer bytes) throws Exception
    {
       internalWrite(bytes, true, null);
    }
 
-
    @Override
    protected ByteBuffer newBuffer(int size, final int limit)
    {
@@ -292,7 +290,7 @@
          }
          return;
       }
-      
+
       position.addAndGet(bytes.limit());
 
       if (maxIOSemaphore == null)

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -61,20 +61,24 @@
          {
             Reclaimer.trace("posCount on " + currentFile + " = " + posCount);
          }
-         
+
          for (int j = i; j < files.length; j++)
          {
             if (Reclaimer.trace)
             {
                if (files[j].getNegCount(currentFile) != 0)
                {
-                  Reclaimer.trace("Negative from " + files[j] + " into " + currentFile + " = " + files[j].getNegCount(currentFile));
+                  Reclaimer.trace("Negative from " + files[j] +
+                                  " into " +
+                                  currentFile +
+                                  " = " +
+                                  files[j].getNegCount(currentFile));
                }
             }
 
             totNeg += files[j].getNegCount(currentFile);
          }
-         
+
          currentFile.setCanReclaim(true);
 
          if (posCount <= totNeg)
@@ -99,7 +103,7 @@
                      {
                         Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
                      }
- 
+
                      currentFile.setCanReclaim(false);
 
                      break;

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2010-08-27 16:44:17 UTC (rev 9606)
@@ -127,7 +127,6 @@
          return;
       }
 
-
       // Need to start with the spin limiter acquired
       try
       {
@@ -207,7 +206,7 @@
       {
          throw new IllegalStateException("TimedBuffer is not started");
       }
-      
+
       if (sizeChecked > bufferSize)
       {
          throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
@@ -259,7 +258,7 @@
       {
          throw new IllegalStateException("TimedBuffer is not started");
       }
-      
+
       delayFlush = false;
 
       bytes.encode(buffer);
@@ -306,7 +305,7 @@
          {
             throw new IllegalStateException("TimedBuffer is not started");
          }
-         
+
          if ((force || !delayFlush) && buffer.writerIndex() > 0)
          {
             int pos = buffer.writerIndex();



More information about the hornetq-commits mailing list