[hornetq-commits] JBoss hornetq SVN: r8435 - in trunk: src/main/org/hornetq/core/journal/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 27 21:30:52 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-27 21:30:52 -0500 (Fri, 27 Nov 2009)
New Revision: 8435

Added:
   trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java
   trunk/src/main/org/hornetq/core/journal/impl/dataformat/
   trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
   trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
   trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
   trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
   trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
   trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
Modified:
   trunk/src/main/org/hornetq/core/journal/SequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
   trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Optimization on journal - removing one unecessary buffer copy that was introduced after the TimedBuffer

Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -60,6 +60,10 @@
 
    void write(HornetQBuffer bytes, boolean sync) throws Exception;
 
+   void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception;
+
+   void write(EncodingSupport bytes, boolean sync) throws Exception;
+
    /** Write directly to the file without using any buffer */
    void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -22,6 +22,7 @@
 import org.hornetq.core.buffers.HornetQBuffers;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.Pair;
@@ -151,12 +152,16 @@
             }
          }
 
-         JournalImpl.writeAddRecord(-1,
-                                    1,
-                                    (byte)0,
-                                    new JournalImpl.ByteArrayEncoding(filesToRename.toByteBuffer().array()),
-                                    JournalImpl.SIZE_ADD_RECORD + filesToRename.toByteBuffer().array().length,
-                                    renameBuffer);
+         
+         InternalEncoder controlRecord = new JournalAddRecord(true,
+                                                              1,
+                                                              (byte)0,
+                                                              new JournalImpl.ByteArrayEncoding(filesToRename.toByteBuffer()
+                                                                                                             .array()));
+         
+         controlRecord.setFileID(-1);
+         
+         controlRecord.encode(renameBuffer);
 
          ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
 
@@ -229,7 +234,21 @@
    {
       return writingChannel;
    }
+   
+   protected void writeEncoder(InternalEncoder record) throws Exception
+   {
+      record.setFileID(fileID);
+      record.encode(getWritingChannel());
+   }
 
+   protected void writeEncoder(InternalEncoder record, int txcounter) throws Exception
+   {
+      record.setNumberOfRecords(txcounter);
+      writeEncoder(record);
+   }
+
+   
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -21,7 +21,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
@@ -223,7 +225,44 @@
          write(bytes, false, DummyCallback.getInstance());
       }
    }
+   
+   public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.addBytes(bytes, sync, callback);
+      }
+      else
+      {
+         ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
+         
+         // If not using the TimedBuffer, a final copy is necessary
+         // Because AIO will need a specific Buffer
+         // And NIO will also need a whole buffer to perform the write
+         
+         HornetQBuffer outBuffer = HornetQBuffers.wrappedBuffer(buffer);
+         bytes.encode(outBuffer);
+         buffer.rewind();
+         writeDirect(buffer, sync, callback);
+      }
+   }
 
+   public void write(final EncodingSupport bytes, final boolean sync) throws Exception
+   {
+      if (sync)
+      {
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+         write(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         write(bytes, false, DummyCallback.getInstance());
+      }
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Added: trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+
+/**
+ * A InternalEncoder
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class InternalEncoder implements EncodingSupport
+{
+
+   protected int fileID;
+   
+   public int getFileID()
+   {
+      return fileID;
+   }
+
+   public void setFileID(int fileID)
+   {
+      this.fileID = fileID;
+   }
+
+   public void decode(HornetQBuffer buffer)
+   {
+   }
+   
+   public void setNumberOfRecords(int records)
+   {
+   }
+   
+   public int getNumberOfRecords()
+   {
+      return 0;
+   }
+
+   public abstract int getEncodeSize();
+}
+

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -20,7 +20,12 @@
 
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.utils.DataConstants;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
 
 /**
  * A JournalCleaner
@@ -72,14 +77,10 @@
    {
       if (lookupRecord(info.id))
       {
-         int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
-
-         JournalImpl.writeAddRecord(fileID,
-                                    info.id,
-                                    info.getUserRecordType(),
-                                    new JournalImpl.ByteArrayEncoding(info.data),
-                                    size,
-                                    getWritingChannel());
+         writeEncoder(new JournalAddRecord(true,
+                                           info.id,
+                                           info.getUserRecordType(),
+                                           new JournalImpl.ByteArrayEncoding(info.data)));
       }
    }
 
@@ -92,15 +93,11 @@
       {
          incrementTransactionCounter(transactionID);
 
-         int size = JournalImpl.SIZE_ADD_RECORD_TX + recordInfo.data.length;
-
-         JournalImpl.writeAddRecordTX(fileID,
-                                      transactionID,
-                                      recordInfo.id,
-                                      recordInfo.getUserRecordType(),
-                                      new JournalImpl.ByteArrayEncoding(recordInfo.data),
-                                      size,
-                                      getWritingChannel());
+         writeEncoder(new JournalAddRecordTX(true,
+                                             transactionID,
+                                             recordInfo.id,
+                                             recordInfo.getUserRecordType(),
+                                             new JournalImpl.ByteArrayEncoding(recordInfo.data)));
       }
    }
 
@@ -111,14 +108,7 @@
    {
       int txcounter = getTransactionCounter(transactionID);
 
-      JournalImpl.writeTransaction(fileID,
-                                   JournalImpl.COMMIT_RECORD,
-                                   transactionID,
-                                   null,
-                                   JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD,
-                                   txcounter,
-                                   getWritingChannel());
-
+      writeEncoder(new JournalCompleteRecordTX(true, transactionID, null), txcounter);
    }
 
    /* (non-Javadoc)
@@ -126,7 +116,7 @@
     */
    public void onReadDeleteRecord(final long recordID) throws Exception
    {
-      JournalImpl.writeDeleteRecord(fileID, recordID, JournalImpl.SIZE_DELETE_RECORD, getWritingChannel());
+      writeEncoder(new JournalDeleteRecord(recordID));
    }
 
    /* (non-Javadoc)
@@ -134,16 +124,11 @@
     */
    public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
    {
-      int size = JournalImpl.SIZE_DELETE_RECORD_TX + recordInfo.data.length;
-
       incrementTransactionCounter(transactionID);
 
-      JournalImpl.writeDeleteRecordTransactional(fileID,
-                                                 transactionID,
-                                                 recordInfo.id,
-                                                 new JournalImpl.ByteArrayEncoding(recordInfo.data),
-                                                 size,
-                                                 getWritingChannel());
+      writeEncoder(new JournalDeleteRecordTX(transactionID,
+                                             recordInfo.id,
+                                             new JournalImpl.ByteArrayEncoding(recordInfo.data)));
    }
 
    /* (non-Javadoc)
@@ -153,15 +138,8 @@
    {
       int txcounter = getTransactionCounter(transactionID);
 
-      int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
-
-      JournalImpl.writeTransaction(fileID,
-                                   JournalImpl.PREPARE_RECORD,
-                                   transactionID,
-                                   new JournalImpl.ByteArrayEncoding(extraData),
-                                   size,
-                                   txcounter,
-                                   getWritingChannel());
+      writeEncoder(new JournalCompleteRecordTX(false, transactionID, new JournalImpl.ByteArrayEncoding(extraData)),
+                   txcounter);
    }
 
    /* (non-Javadoc)
@@ -169,7 +147,7 @@
     */
    public void onReadRollbackRecord(final long transactionID) throws Exception
    {
-      JournalImpl.writeRollback(fileID, transactionID, getWritingChannel());
+      writeEncoder(new JournalRollbackRecordTX(transactionID));
    }
 
    /* (non-Javadoc)
@@ -179,13 +157,10 @@
    {
       if (lookupRecord(recordInfo.id))
       {
-         int size = JournalImpl.SIZE_UPDATE_RECORD + recordInfo.data.length;
-         JournalImpl.writeUpdateRecord(fileID,
-                                       recordInfo.id,
-                                       recordInfo.userRecordType,
-                                       new JournalImpl.ByteArrayEncoding(recordInfo.data),
-                                       size,
-                                       getWritingChannel());
+         writeEncoder(new JournalAddRecord(false,
+                                           recordInfo.id,
+                                           recordInfo.userRecordType,
+                                           new JournalImpl.ByteArrayEncoding(recordInfo.data)));
       }
    }
 
@@ -197,14 +172,12 @@
       if (lookupRecord(recordInfo.id))
       {
          incrementTransactionCounter(transactionID);
-         int size = JournalImpl.SIZE_UPDATE_RECORD_TX + recordInfo.data.length;
-         JournalImpl.writeUpdateRecordTX(fileID,
-                                         transactionID,
-                                         recordInfo.id,
-                                         recordInfo.userRecordType,
-                                         new JournalImpl.ByteArrayEncoding(recordInfo.data),
-                                         size,
-                                         getWritingChannel());
+         
+         writeEncoder(new JournalAddRecordTX(false,
+                                             transactionID,
+                                             recordInfo.id,
+                                             recordInfo.userRecordType,
+                                             new JournalImpl.ByteArrayEncoding(recordInfo.data)));
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -27,6 +27,10 @@
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.Pair;
@@ -246,18 +250,16 @@
    {
       if (lookupRecord(info.id))
       {
-         int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
+         InternalEncoder addRecord = new JournalAddRecord(true,
+                                                          info.id,
+                                                          info.getUserRecordType(),
+                                                          new JournalImpl.ByteArrayEncoding(info.data));
+         
+         checkSize(addRecord.getEncodeSize());
 
-         checkSize(size);
+         writeEncoder(addRecord);
 
-         JournalImpl.writeAddRecord(fileID,
-                                    info.id,
-                                    info.getUserRecordType(),
-                                    new JournalImpl.ByteArrayEncoding(info.data),
-                                    size,
-                                    getWritingChannel());
-
-         newRecords.put(info.id, new JournalRecord(currentFile, size));
+         newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize()));
       }
    }
 
@@ -267,19 +269,17 @@
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-         int size = JournalImpl.SIZE_ADD_RECORD_TX + info.data.length;
+         InternalEncoder record = new JournalAddRecordTX(true,
+                                                         transactionID,
+                                                         info.id,
+                                                         info.getUserRecordType(),
+                                                         new JournalImpl.ByteArrayEncoding(info.data));
+         
+         checkSize(record.getEncodeSize());
 
-         checkSize(size);
+         newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
 
-         newTransaction.addPositive(currentFile, info.id, size);
-
-         JournalImpl.writeAddRecordTX(fileID,
-                                      transactionID,
-                                      info.id,
-                                      info.getUserRecordType(),
-                                      new JournalImpl.ByteArrayEncoding(info.data),
-                                      size,
-                                      getWritingChannel());
+         writeEncoder(record);
       }
       else
       {
@@ -315,17 +315,14 @@
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-         int size = JournalImpl.SIZE_DELETE_RECORD_TX + info.data.length;
+         InternalEncoder record = new JournalDeleteRecordTX(transactionID,
+                                                            info.id,
+                                                            new JournalImpl.ByteArrayEncoding(info.data));
 
-         checkSize(size);
+         checkSize(record.getEncodeSize());
+         
+         writeEncoder(record);
 
-         JournalImpl.writeDeleteRecordTransactional(fileID,
-                                                    transactionID,
-                                                    info.id,
-                                                    new JournalImpl.ByteArrayEncoding(info.data),
-                                                    size,
-                                                    getWritingChannel());
-
          newTransaction.addNegative(currentFile, info.id);
       }
       // else.. nothing to be done
@@ -343,17 +340,13 @@
 
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-         int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
+         InternalEncoder prepareRecord = new JournalCompleteRecordTX(false,
+                                                                     transactionID,
+                                                                     new JournalImpl.ByteArrayEncoding(extraData));
 
-         checkSize(size);
+         checkSize(prepareRecord.getEncodeSize());
 
-         JournalImpl.writeTransaction(fileID,
-                                      JournalImpl.PREPARE_RECORD,
-                                      transactionID,
-                                      new JournalImpl.ByteArrayEncoding(extraData),
-                                      size,
-                                      newTransaction.getCounter(currentFile),
-                                      getWritingChannel());
+         writeEncoder(prepareRecord, newTransaction.getCounter(currentFile));
 
          newTransaction.prepare(currentFile);
 
@@ -374,9 +367,12 @@
    {
       if (lookupRecord(info.id))
       {
-         int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
+         InternalEncoder updateRecord = new JournalAddRecord(false,
+                                                             info.id,
+                                                             info.userRecordType,
+                                                             new JournalImpl.ByteArrayEncoding(info.data));
 
-         checkSize(size);
+         checkSize(updateRecord.getEncodeSize());
 
          JournalRecord newRecord = newRecords.get(info.id);
 
@@ -386,16 +382,10 @@
          }
          else
          {
-            newRecord.addUpdateFile(currentFile, size);
+            newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize());
          }
-
-         JournalImpl.writeUpdateRecord(fileID,
-                                       info.id,
-                                       info.userRecordType,
-                                       new JournalImpl.ByteArrayEncoding(info.data),
-                                       size,
-                                       getWritingChannel());
-
+         
+         writeEncoder(updateRecord);
       }
    }
 
@@ -405,19 +395,18 @@
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-         int size = JournalImpl.SIZE_UPDATE_RECORD_TX + info.data.length;
+         InternalEncoder updateRecordTX = new JournalAddRecordTX(false,
+                                                                 transactionID,
+                                                                 info.id,
+                                                                 info.userRecordType,
+                                                                 new JournalImpl.ByteArrayEncoding(info.data));
 
-         checkSize(size);
+            
+         checkSize(updateRecordTX.getEncodeSize());
 
-         JournalImpl.writeUpdateRecordTX(fileID,
-                                         transactionID,
-                                         info.id,
-                                         info.userRecordType,
-                                         new JournalImpl.ByteArrayEncoding(info.data),
-                                         size,
-                                         getWritingChannel());
-
-         newTransaction.addPositive(currentFile, info.id, size);
+         writeEncoder(updateRecordTX);
+         
+         newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize());
       }
       else
       {
@@ -425,6 +414,8 @@
       }
    }
 
+
+   
    /**
     * @param transactionID
     * @return

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -54,6 +54,12 @@
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TestableJournal;
 import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.Pair;
@@ -114,10 +120,6 @@
 
    public static final byte ADD_RECORD = 11;
 
-   public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
-                                                 DataConstants.SIZE_BYTE +
-                                                 DataConstants.SIZE_INT /* + record.length */;
-
    public static final byte UPDATE_RECORD = 12;
 
    public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
@@ -127,11 +129,6 @@
 
    public static final byte ADD_RECORD_TX = 13;
 
-   public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
-                                                   DataConstants.SIZE_BYTE +
-                                                   DataConstants.SIZE_LONG +
-                                                   DataConstants.SIZE_INT /* + record.length */;
-
    public static final byte UPDATE_RECORD_TX = 14;
 
    public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
@@ -292,207 +289,6 @@
       this.maxAIO = maxAIO;
    }
 
-   // Public methods (used by package members such as JournalCompactor) (these methods are not part of the JournalImpl
-   // interface)
-
-   /**
-    * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
-    * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file. 
-    *    (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
-    * <p>The element-summary will then have</p>
-    * <p>FileID1, 10</p>
-    * <p>FileID2, 10</p>
-    * <p>FileID3, 10</p>
-    * 
-    * <br>
-    * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
-    * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
-    * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed. 
-    *     That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
-    * 
-    * @param recordType
-    * @param txID
-    * @param tx
-    * @param transactionData
-    * @return
-    * @throws Exception
-    */
-   public static void writeTransaction(final int fileID,
-                                       final byte recordType,
-                                       final long txID,
-                                       final EncodingSupport transactionData,
-                                       final int size,
-                                       final int numberOfRecords,
-                                       final HornetQBuffer bb) throws Exception
-   {
-      bb.writeByte(recordType);
-      bb.writeInt(fileID); // skip ID part
-      bb.writeLong(txID);
-      bb.writeInt(numberOfRecords);
-
-      if (transactionData != null)
-      {
-         bb.writeInt(transactionData.getEncodeSize());
-      }
-
-      if (transactionData != null)
-      {
-         transactionData.encode(bb);
-      }
-
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param txID
-    * @param id
-    * @param recordType
-    * @param record
-    * @param size
-    * @param bb
-    */
-   public static void writeUpdateRecordTX(final int fileID,
-                                          final long txID,
-                                          final long id,
-                                          final byte recordType,
-                                          final EncodingSupport record,
-                                          final int size,
-                                          final HornetQBuffer bb)
-   {
-      bb.writeByte(UPDATE_RECORD_TX);
-      bb.writeInt(fileID);
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param txID
-    * @param bb
-    */
-   public static void writeRollback(final int fileID, final long txID, HornetQBuffer bb)
-   {
-      bb.writeByte(ROLLBACK_RECORD);
-      bb.writeInt(fileID);
-      bb.writeLong(txID);
-      bb.writeInt(SIZE_ROLLBACK_RECORD);
-   }
-
-   /**
-    * @param id
-    * @param recordType
-    * @param record
-    * @param size
-    * @param bb
-    */
-   public static void writeUpdateRecord(final int fileId,
-                                        final long id,
-                                        final byte recordType,
-                                        final EncodingSupport record,
-                                        final int size,
-                                        final HornetQBuffer bb)
-   {
-      bb.writeByte(UPDATE_RECORD);
-      bb.writeInt(fileId);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param id
-    * @param recordType
-    * @param record
-    * @param size
-    * @param bb
-    */
-   public static void writeAddRecord(final int fileId,
-                                     final long id,
-                                     final byte recordType,
-                                     final EncodingSupport record,
-                                     final int size,
-                                     final HornetQBuffer bb)
-   {     
-      bb.writeByte(ADD_RECORD);
-      bb.writeInt(fileId);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);      
-      record.encode(bb);       
-      bb.writeInt(size);        
-   }
-
-   /**
-    * @param id
-    * @param size
-    * @param bb
-    */
-   public static void writeDeleteRecord(final int fileId, final long id, int size, HornetQBuffer bb)
-   {
-      bb.writeByte(DELETE_RECORD);
-      bb.writeInt(fileId);
-      bb.writeLong(id);
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param txID
-    * @param id
-    * @param record
-    * @param size
-    * @param bb
-    */
-   public static void writeDeleteRecordTransactional(final int fileID,
-                                                     final long txID,
-                                                     final long id,
-                                                     final EncodingSupport record,
-                                                     final int size,
-                                                     final HornetQBuffer bb)
-   {
-      bb.writeByte(DELETE_RECORD_TX);
-      bb.writeInt(fileID);
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(record != null ? record.getEncodeSize() : 0);
-      if (record != null)
-      {
-         record.encode(bb);
-      }
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param txID
-    * @param id
-    * @param recordType
-    * @param record
-    * @param recordLength
-    * @param size
-    * @param bb
-    */
-   public static void writeAddRecordTX(final int fileID,
-                                       final long txID,
-                                       final long id,
-                                       final byte recordType,
-                                       final EncodingSupport record,
-                                       final int size,
-                                       final HornetQBuffer bb)
-   {
-      bb.writeByte(ADD_RECORD_TX);
-      bb.writeInt(fileID);
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
-   }
-
    public Map<Long, JournalRecord> getRecords()
    {
       return records;
@@ -843,7 +639,7 @@
    {
       appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
-
+   
    public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
    {
       appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
@@ -876,12 +672,8 @@
 
       try
       {  
-         int size = SIZE_ADD_RECORD + record.getEncodeSize();
+         InternalEncoder addRecord = new JournalAddRecord(true, id, recordType, record);
 
-         HornetQBuffer bb = newBuffer(size);
-
-         writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled later
-
          if (callback != null)
          {
             callback.lineUp();
@@ -890,9 +682,9 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+            JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
 
-            records.put(id, new JournalRecord(usedFile, size));
+            records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
          }
          finally
          {
@@ -952,12 +744,8 @@
             }
          }
 
-         int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+         InternalEncoder updateRecord = new JournalAddRecord(false, id, recordType, record);
 
-         HornetQBuffer bb = newBuffer(size);
-
-         writeUpdateRecord(-1, id, recordType, record, size, bb);
-
          if (callback != null)
          {
             callback.lineUp();
@@ -966,17 +754,17 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+            JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
 
             // record== null here could only mean there is a compactor, and computing the delete should be done after
             // compacting is done
             if (jrnRecord == null)
             {
-               compactor.addCommandUpdate(id, usedFile, size);
+               compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
             }
             else
             {
-               jrnRecord.addUpdateFile(usedFile, size);
+               jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
             }
          }
          finally
@@ -1029,12 +817,8 @@
             }
          }
          
-         int size = SIZE_DELETE_RECORD;
+         InternalEncoder deleteRecord = new JournalDeleteRecord(id);
 
-         HornetQBuffer bb = newBuffer(size);
-
-         writeDeleteRecord(-1, id, size, bb);
-
          if (callback != null)
          {
             callback.lineUp();
@@ -1043,7 +827,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+            JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
 
             // record== null here could only mean there is a compactor, and computing the delete should be done after
             // compacting is done
@@ -1093,20 +877,16 @@
       try
       {
 
-         int size = SIZE_ADD_RECORD_TX + record.getEncodeSize();
+         InternalEncoder addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
 
-         HornetQBuffer bb = newBuffer(size);
-
-         writeAddRecordTX(-1, txID, id, recordType, record, size, bb);
-
          JournalTransaction tx = getTransactionInfo(txID);
 
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+            JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
 
-            tx.addPositive(usedFile, id, size);
+            tx.addPositive(usedFile, id, addRecord.getEncodeSize());
          }
          finally
          {
@@ -1146,20 +926,16 @@
       try
       {
 
-         int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+         InternalEncoder updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
 
-         HornetQBuffer bb = newBuffer(size);
-
-         writeUpdateRecordTX(-1, txID, id, recordType, record, size, bb);
-
          JournalTransaction tx = getTransactionInfo(txID);
 
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+            JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
 
-            tx.addPositive(usedFile, id, size);
+            tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
          }
          finally
          {
@@ -1193,18 +969,14 @@
 
       try
       {
-         int size = SIZE_DELETE_RECORD_TX + record.getEncodeSize();
+         InternalEncoder deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
 
-         HornetQBuffer bb = newBuffer(size);
-
-         writeDeleteRecordTransactional(-1, txID, id, record, size, bb);
-
          JournalTransaction tx = getTransactionInfo(txID);
 
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+            JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
 
             tx.addNegative(usedFile, id);
          }
@@ -1282,11 +1054,8 @@
       try
       {
 
-         int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + DataConstants.SIZE_INT;
-         HornetQBuffer bb = newBuffer(size);
+         InternalEncoder prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
 
-         writeTransaction(-1, PREPARE_RECORD, txID, transactionData, size, -1, bb);
-
          if (callback != null)
          {
             callback.lineUp();
@@ -1295,7 +1064,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
+            JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
 
             tx.prepare(usedFile);
          }
@@ -1365,16 +1134,8 @@
             throw new IllegalStateException("Cannot find tx with id " + txID);
          }
 
-         HornetQBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
+         InternalEncoder commitRecord = new JournalCompleteRecordTX(true, txID, null);
 
-         writeTransaction(-1,
-                          COMMIT_RECORD,
-                          txID,
-                          null,
-                          SIZE_COMPLETE_TRANSACTION_RECORD,
-                          -1 /* number of records on this transaction will be filled later inside append record */,
-                          bb);
-
          if (callback != null)
          {
             callback.lineUp();
@@ -1383,7 +1144,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
+            JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
 
             tx.commit(usedFile);
          }
@@ -1433,10 +1194,8 @@
             throw new IllegalStateException("Cannot find tx with id " + txID);
          }
          
-         HornetQBuffer bb = newBuffer(SIZE_ROLLBACK_RECORD);
+         InternalEncoder rollbackRecord = new JournalRollbackRecordTX(txID);
 
-         writeRollback(-1, txID, bb);
-
          if (callback != null)
          {
             callback.lineUp();
@@ -1445,7 +1204,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, sync, tx, callback);
+            JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
 
             tx.rollback(usedFile);
          }
@@ -1586,6 +1345,7 @@
 
       try
       {
+         trace("Starting compacting operation on journal");
          log.debug("Starting compacting operation on journal");
 
          // We need to guarantee that the journal is frozen for this short time
@@ -1867,7 +1627,7 @@
                   // have been deleted
                   // just leaving some updates in this file
 
-                  posFiles.addUpdateFile(file, info.data.length + SIZE_UPDATE_RECORD);
+                  posFiles.addUpdateFile(file, info.data.length + SIZE_ADD_RECORD);
                }
             }
 
@@ -2337,6 +2097,10 @@
          try
          {
 
+            if (trace)
+            {
+               trace("Cleaning up file " + file);
+            }
             log.debug("Cleaning up file " + file);
 
             if (file.getPosCount() == 0)
@@ -2847,13 +2611,13 @@
             recordSize = SIZE_ADD_RECORD;
             break;
          case UPDATE_RECORD:
-            recordSize = SIZE_UPDATE_RECORD;
+            recordSize = SIZE_ADD_RECORD;
             break;
          case ADD_RECORD_TX:
             recordSize = SIZE_ADD_RECORD_TX;
             break;
          case UPDATE_RECORD_TX:
-            recordSize = SIZE_UPDATE_RECORD_TX;
+            recordSize = SIZE_ADD_RECORD_TX;
             break;
          case DELETE_RECORD:
             recordSize = SIZE_DELETE_RECORD;
@@ -2933,7 +2697,7 @@
     * 
     * @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
     * */
-   private JournalFile appendRecord(final HornetQBuffer bb,
+   private JournalFile appendRecord(final InternalEncoder encoder,
                                     final boolean completeTransaction,
                                     final boolean sync,
                                     final JournalTransaction tx,
@@ -2948,7 +2712,7 @@
          
          final IOAsyncTask callback;
 
-         int size = bb.capacity();
+         int size = encoder.getEncodeSize();
 
          // We take into account the fileID used on the Header
          if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
@@ -3012,7 +2776,7 @@
             if (completeTransaction)
             {
                // Filling the number of pendingTransactions at the current file
-               tx.fillNumberOfRecords(currentFile, bb);
+               tx.fillNumberOfRecords(currentFile, encoder);
             }
          }
          else
@@ -3021,16 +2785,15 @@
          }
 
          // Adding fileID
-         bb.writerIndex(DataConstants.SIZE_BYTE);
-         bb.writeInt(currentFile.getFileID());
+         encoder.setFileID(currentFile.getFileID());
 
          if (callback != null)
          {
-            currentFile.getFile().write(bb, sync, callback);
+            currentFile.getFile().write(encoder, sync, callback);
          }
          else
          {
-            currentFile.getFile().write(bb, sync);
+            currentFile.getFile().write(encoder, sync);
          }
 
          return currentFile;
@@ -3615,7 +3378,7 @@
          return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
       }
    }
-
+   
    private class PerfBlast extends Thread
    {
       private final int pages;
@@ -3633,13 +3396,13 @@
          {
             lockAppend.lock();
 
-            HornetQBuffer bb = newBuffer(490 * 1024);
+//            HornetQBuffer bb = newBuffer(128 * 1024);
+//
+//            for (int i = 0; i < pages; i++)
+//            {
+//               appendRecord(bb, false, false, null, null);
+//            }
 
-            for (int i = 0; i < pages; i++)
-            {
-               appendRecord(bb, false, true, null, null);
-            }
-
             lockAppend.unlock();
          }
          catch (Exception e)
@@ -3648,5 +3411,11 @@
          }
       }
    }
+   
 
+   
+   
+   
+   
+   
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -184,12 +184,9 @@
     * @param currentFile
     * @param bb
     */
-   public void fillNumberOfRecords(final JournalFile currentFile, final HornetQBuffer bb)
+   public void fillNumberOfRecords(final JournalFile currentFile, final InternalEncoder data)
    {
-      bb.writerIndex(DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG);
-
-      bb.writeInt(getCounter(currentFile));
-
+      data.setNumberOfRecords(getCounter(currentFile));
    }
 
    /** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.

Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -24,7 +24,9 @@
 
 import org.hornetq.core.buffers.HornetQBuffer;
 import org.hornetq.core.buffers.HornetQBuffers;
+import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.VariableLatch;
 
@@ -231,13 +233,18 @@
 
    public synchronized void addBytes(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback)
    {
+      addBytes(new JournalImpl.ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
+   }
+
+   public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
+   {
       if (buffer.writerIndex() == 0)
       {
          // Resume latch
          latchTimer.down();
       }
 
-      buffer.writeBytes(bytes, bytes.capacity());
+      bytes.encode(buffer);
 
       callbacks.add(callback);
 

Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalAddRecord
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalAddRecord extends InternalEncoder
+{
+
+   private final long id;
+
+   private final EncodingSupport record;
+
+   private final byte recordType;
+
+   private final boolean add;
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    */
+   public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record)
+   {
+      this.id = id;
+
+      this.record = record;
+
+      this.recordType = recordType;
+
+      this.add = add;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+    */
+   public void encode(final HornetQBuffer buffer)
+   {
+      if (add)
+      {
+         buffer.writeByte(JournalImpl.ADD_RECORD);
+      }
+      else
+      {
+         buffer.writeByte(JournalImpl.UPDATE_RECORD);
+      }
+
+      buffer.writeInt(fileID);
+
+      buffer.writeLong(id);
+
+      buffer.writeInt(record.getEncodeSize());
+
+      buffer.writeByte(recordType);
+
+      record.encode(buffer);
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize();
+   }
+}
\ No newline at end of file

Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalAddRecordTX
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalAddRecordTX extends InternalEncoder
+{
+
+   private final long txID;
+
+   private final long id;
+
+   private final EncodingSupport record;
+
+   private final byte recordType;
+
+   private final boolean add;
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    */
+   public JournalAddRecordTX(final boolean add,
+                             final long txID,
+                             final long id,
+                             final byte recordType,
+                             final EncodingSupport record)
+   {
+
+      this.txID = txID;
+
+      this.id = id;
+
+      this.record = record;
+
+      this.recordType = recordType;
+
+      this.add = add;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+    */
+   public void encode(final HornetQBuffer buffer)
+   {
+      if (add)
+      {
+         buffer.writeByte(JournalImpl.ADD_RECORD_TX);
+      }
+      else
+      {
+         buffer.writeByte(JournalImpl.UPDATE_RECORD_TX);
+      }
+
+      buffer.writeInt(fileID);
+
+      buffer.writeLong(txID);
+
+      buffer.writeLong(id);
+
+      buffer.writeInt(record.getEncodeSize());
+
+      buffer.writeByte(recordType);
+
+      record.encode(buffer);
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize();
+   }
+}

Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
+ * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file. 
+ *    (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
+ * <p>The element-summary will then have</p>
+ * <p>FileID1, 10</p>
+ * <p>FileID2, 10</p>
+ * <p>FileID3, 10</p>
+ * 
+ * <br>
+ * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
+ * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
+ * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed. 
+ *     That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCompleteRecordTX extends InternalEncoder
+{
+   private final boolean isCommit;
+
+   private final long txID;
+
+   private final EncodingSupport transactionData;
+
+   private int numberOfRecords;
+
+   public JournalCompleteRecordTX(final boolean isCommit, final long txID, final EncodingSupport transactionData)
+   {
+      this.isCommit = isCommit;
+
+      this.txID = txID;
+
+      this.transactionData = transactionData;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+    */
+   public void encode(final HornetQBuffer buffer)
+   {
+      if (isCommit)
+      {
+         buffer.writeByte(JournalImpl.COMMIT_RECORD);
+      }
+      else
+      {
+         buffer.writeByte(JournalImpl.PREPARE_RECORD);
+      }
+
+      buffer.writeInt(fileID);
+
+      buffer.writeLong(txID);
+
+      buffer.writeInt(numberOfRecords);
+
+      if (transactionData != null)
+      {
+         buffer.writeInt(transactionData.getEncodeSize());
+      }
+
+      if (transactionData != null)
+      {
+         transactionData.encode(buffer);
+      }
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public void setNumberOfRecords(final int records)
+   {
+      numberOfRecords = records;
+   }
+
+   @Override
+   public int getNumberOfRecords()
+   {
+      return numberOfRecords;
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      if (isCommit)
+      {
+         return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD;
+      }
+      else
+      {
+         return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0);
+      }
+   }
+}

Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalDeleteRecord
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalDeleteRecord extends InternalEncoder
+{
+
+   private final long id;
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    */
+   public JournalDeleteRecord(final long id)
+   {
+      this.id = id;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+    */
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(JournalImpl.DELETE_RECORD);
+
+      buffer.writeInt(fileID);
+
+      buffer.writeLong(id);
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_DELETE_RECORD;
+   }
+}

Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalDeleteRecordTX
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalDeleteRecordTX extends InternalEncoder
+{
+
+   private final long txID;
+
+   private final long id;
+
+   private final EncodingSupport record;
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    */
+   public JournalDeleteRecordTX(final long txID, final long id, final EncodingSupport record)
+   {
+      this.id = id;
+
+      this.txID = txID;
+
+      this.record = record;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+    */
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(JournalImpl.DELETE_RECORD_TX);
+
+      buffer.writeInt(fileID);
+
+      buffer.writeLong(txID);
+
+      buffer.writeLong(id);
+
+      buffer.writeInt(record != null ? record.getEncodeSize() : 0);
+
+      if (record != null)
+      {
+         record.encode(buffer);
+      }
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+   }
+}

Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalRollbackRecordTX
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalRollbackRecordTX extends InternalEncoder
+{
+   private final long txID;
+
+   public JournalRollbackRecordTX(final long txID)
+   {
+      this.txID = txID;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+    */
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(JournalImpl.ROLLBACK_RECORD);
+      buffer.writeInt(fileID);
+      buffer.writeLong(txID);
+      buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD);
+
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_ROLLBACK_RECORD;
+   }
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -616,7 +616,7 @@
          journal.forceMoveNextFile();
          update(id);
 
-         expectedSizes.add(recordLength + JournalImpl.SIZE_UPDATE_RECORD);
+         expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
          journal.forceMoveNextFile();
       }
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -719,7 +719,7 @@
    {
       final int JOURNAL_SIZE = 2000;
 
-      setupAndLoadJournal(JOURNAL_SIZE, 100);
+      setupAndLoadJournal(JOURNAL_SIZE, 1);
 
       assertEquals(2, factory.listFiles("tt").size());
 
@@ -759,6 +759,7 @@
       // reload will think the record came from a different journal usage)
       file.position(100);
 
+      buffer.rewind();
       file.writeDirect(buffer, true);
 
       file.close();

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -279,7 +279,7 @@
 
       for (long element : arguments)
       {
-         byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX);
+         byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
 
          journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -3118,7 +3118,9 @@
 
 
       addTx(1, 1);
+      addTx(1, 2);
       updateTx(1, 1);
+      updateTx(1, 3);
       commit(1);
       update(1);
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-28 02:30:52 UTC (rev 8435)
@@ -20,7 +20,9 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
@@ -622,8 +624,32 @@
          bytes.readerIndex(0);
          writeDirect(bytes.toByteBuffer(), sync);
       }
+      
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+       */
+      public void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception
+      {
+         ByteBuffer buffer = newBuffer(bytes.getEncodeSize());
+         HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+         bytes.encode(outbuffer);
+         write(outbuffer, sync, callback);
+      }
 
       /* (non-Javadoc)
+       * @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.journal.EncodingSupport, boolean)
+       */
+      public void write(EncodingSupport bytes, boolean sync) throws Exception
+      {
+         ByteBuffer buffer = newBuffer(bytes.getEncodeSize());
+         HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+         bytes.encode(outbuffer);
+         write(outbuffer, sync);
+      }
+
+      
+
+      /* (non-Javadoc)
        * @see org.hornetq.core.journal.SequentialFile#exists()
        */
       public boolean exists()



More information about the hornetq-commits mailing list