[jboss-cvs] JBoss Messaging SVN: r4924 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Sep 9 18:41:34 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-09-09 18:41:33 -0400 (Tue, 09 Sep 2008)
New Revision: 4924

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/Journal.java
   trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Journal tweaks / doc changes

Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -22,10 +22,9 @@
 
 package org.jboss.messaging.core.journal;
 
+import java.util.List;
+
 import org.jboss.messaging.core.server.MessagingComponent;
-
-import javax.transaction.xa.Xid;
-import java.util.List;
 /**
  * 
  * A Journal
@@ -40,10 +39,6 @@
    
    void appendAddRecord(long id, byte recordType, EncodingSupport record) throws Exception;
    
-   void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
-   
-   void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
-   
    void appendUpdateRecord(long id, byte recordType, EncodingSupport record) throws Exception;
    
    void appendDeleteRecord(long id) throws Exception;
@@ -54,17 +49,24 @@
    
    void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
    
-   void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
-   
-   void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
-   
    void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
    
    void appendDeleteRecordTransactional(long txID, long id) throws Exception;
    
    void appendCommitRecord(long txID) throws Exception;
    
-   void appendPrepareRecord(long txID, EncodingSupport transactionIdentifier) throws Exception;
+   /** 
+    * 
+    * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
+    *     back to a state it could be committed. </p>
+    * 
+    * <p> transactionData is usually a safe space you could use to store things like XIDs or any other supporting user-data required to replay the transaction </p>
+    * 
+    * @param txID
+    * @param transactionData Information to support the system replaying the transaction
+    * @throws Exception
+    */
+   void appendPrepareRecord(long txID, EncodingSupport transactionData) throws Exception;
    
    void appendRollbackRecord(long txID) throws Exception;
    

Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -71,4 +71,17 @@
    void setAutoReclaim(boolean autoReclaim);
    
    boolean isAutoReclaim();
+   
+   
+   // These add methods are only used by testCases
+   
+   void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
+   
+   void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
+   
+   void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+   
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+   
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -22,19 +22,48 @@
 
 package org.jboss.messaging.core.journal.impl;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.journal.*;
+import org.jboss.messaging.core.journal.BufferCallback;
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.LoadManager;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.TestableJournal;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.VariableLatch;
 
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * 
  * <p>A JournalImpl</p
@@ -151,10 +180,9 @@
    
    private final int reuseBufferSize;
    
-   private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+   /** Object that will control buffer's callback and getting buffers from the queue */
+   private final ReuseBuffersController buffersControl = new ReuseBuffersController();
    
-   private final BufferCallback bufferCallback = new LocalBufferCallback();
-   
    /*
     * We use a semaphore rather than synchronized since it performs better when
     * contended
@@ -279,81 +307,6 @@
       }
    }
    
-   public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
-   {
-      if (state != STATE_LOADED)
-      {
-         throw new IllegalStateException("Journal must be loaded first");
-      }
-            
-      int size = SIZE_ADD_RECORD + record.length;
-      
-      ByteBuffer bb = newBuffer(size);
-      
-      bb.put(ADD_RECORD);
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
-      bb.putLong(id);
-      bb.putInt(record.length);     
-      bb.put(recordType);
-      bb.put(record);		
-      bb.putInt(size);			
-      bb.rewind();
-           
-      try
-      {                 
-         lock.acquire();
-
-         JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-         
-         posFilesMap.put(id, new PosFiles(usedFile));
-      }
-      finally
-      {
-         lock.release();
-      }
-      
-   }
-   
-   public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
-   {
-      if (state != STATE_LOADED)
-      {
-         throw new IllegalStateException("Journal must be loaded first");
-      }
-      
-      PosFiles posFiles = posFilesMap.get(id);
-      
-      if (posFiles == null)
-      {
-         throw new IllegalStateException("Cannot find add info " + id);
-      }
-      
-      int size = SIZE_UPDATE_RECORD + record.length;
-      
-      ByteBuffer bb = newBuffer(size); 
-      
-      bb.put(UPDATE_RECORD);     
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
-      bb.putLong(id);      
-      bb.putInt(record.length);     
-      bb.put(recordType);
-      bb.put(record);      
-      bb.putInt(size);     
-      bb.rewind();
-      
-      lock.acquire();
-      try
-      {                          
-         JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-         
-         posFiles.addUpdateFile(usedFile);
-      }
-      finally
-      {
-         lock.release();
-      }
-   }
-   
    public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
    {
       if (state != STATE_LOADED)
@@ -478,81 +431,6 @@
       }
    }
    
-   public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
-   {
-      if (state != STATE_LOADED)
-      {
-         throw new IllegalStateException("Journal must be loaded first");
-      }
-      
-      int size = SIZE_ADD_RECORD_TX + record.length;
-      
-      ByteBuffer bb = newBuffer(size); 
-      
-      bb.put(ADD_RECORD_TX);
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
-      bb.putLong(txID);
-      bb.putLong(id);
-      bb.putInt(record.length);
-      bb.put(recordType);
-      bb.put(record);
-      bb.putInt(size);
-      bb.rewind();
-      
-      lock.acquire();
-
-      try
-      {                          
-         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-         
-         JournalTransaction tx = getTransactionInfo(txID);
-         
-         tx.addPositive(usedFile, id);
-      }
-      finally
-      {
-         lock.release();
-      }
-   }
-   
-   public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
-   {
-      if (state != STATE_LOADED)
-      {
-         throw new IllegalStateException("Journal must be loaded first");
-      }
-      
-      int size = SIZE_UPDATE_RECORD_TX + record.length; 
-      
-      ByteBuffer bb = newBuffer(size); 
-      
-      bb.put(UPDATE_RECORD_TX);     
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
-      bb.putLong(txID);
-      bb.putLong(id);      
-      bb.putInt(record.length);     
-      bb.put(recordType);
-      bb.put(record);
-      bb.putInt(size);     
-      bb.rewind();
-      
-      lock.acquire();
-      
-      try
-      {                          
-         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-         
-         JournalTransaction tx = getTransactionInfo(txID);
-         
-         tx.addPositive(usedFile, id);
-      }
-      finally
-      {
-         lock.release();
-      }
-   }
-   
-   
    public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
    {
       if (state != STATE_LOADED)
@@ -624,7 +502,18 @@
       }
    }  
    
-   public void appendPrepareRecord(final long txID, EncodingSupport xid) throws Exception
+   /** 
+    * 
+    * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
+    *     back to a state it could be committed. </p>
+    * 
+    * <p> transactionData is usually a safe space you could use to store things like XIDs or any other supporting user-data required to replay the transaction </p>
+    * 
+    * @param txID
+    * @param transactionData Information to support the system replaying the transaction
+    * @throws Exception
+    */
+   public void appendPrepareRecord(final long txID, EncodingSupport transactionData) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -633,7 +522,7 @@
       
       JournalTransaction tx = getTransactionInfo(txID);
       
-      ByteBuffer bb = writePrepareTransaction(PREPARE_RECORD, txID, tx, xid);
+      ByteBuffer bb = writePrepareTransaction(PREPARE_RECORD, txID, tx, transactionData);
       
       lock.acquire();
       
@@ -804,16 +693,21 @@
             byte recordType = bb.get();
             if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
             {
+               // I - We scan for any valid record on the file. If a hole happened on the middle of the file we keep looking until all the possibilities are gone
                continue;
             }
 
             if (bb.position() + SIZE_INT > fileSize)
             {
+               // II - Ignore this record, lets keep looking
                continue;
             }
 
+            // III - Every record has the file-id.
+            //       This is what supports us from not re-filling the whole file
             int readFileId = bb.getInt();
             
+            // IV - This record is from a previous file-usage. The file was reused and we need to ignore this record
             if (readFileId != file.getOrderingID())
             {
                // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
@@ -825,7 +719,7 @@
             }
             
             long transactionID = 0;
-            
+
             if (isTransaction(recordType))
             {
                if (bb.position() + SIZE_LONG > fileSize)
@@ -846,7 +740,11 @@
                recordID = bb.getLong();
                maxID = Math.max(maxID, recordID);
             }
+
             
+            // We use the size of the record to validate the health of the record.
+            // (V) We verify the size of the record
+            
             // The variable record portion used on Updates and Appends
             int variableSize = 0;
             // Used to hold XIDs on PrepareTransactions 
@@ -884,9 +782,12 @@
             }
 
             int recordSize = getRecordSize(recordType);
-            
+
+            // VI - this is completing V,  We will validate the size at the end of the record,
+            //      But we avoid buffer overflows by damaged data
             if (pos + recordSize + variableSize  + preparedTransactionDataSize > fileSize)
             {
+               // Avoid a buffer overflow caused by damaged data... continue scanning for more records...
                continue;
             }
             
@@ -896,6 +797,8 @@
             
             int checkSize = bb.getInt();
             
+            // VII - The checkSize at the end has to match with the size informed at the beggining.
+            //       This is like testing a hash for the record. (We could replace the checkSize by some sort of calculated hash)
             if (checkSize != variableSize + recordSize  + preparedTransactionDataSize)
             {
                log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
@@ -907,6 +810,9 @@
             
             bb.position(oldPos);
             
+            
+            // At this point everything is already check. So relax and just load the data now.
+            
             switch(recordType)
             {
                case ADD_RECORD:
@@ -1023,8 +929,8 @@
                   byte xidData[] = new byte[preparedTransactionDataSize];
                   bb.get(xidData);
 
-                  // Pair <OrderId, NumberOfElements>
-                  Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+                  // Pair <FileID, NumberOfElements>
+                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
 
                   if (tx != null)
                   {                     
@@ -1040,7 +946,7 @@
                         transactionInfos.put(transactionID, journalTransaction);
                      }
                      
-                     boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, values);
+                     boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
                      
                      if (healthy)
                      {
@@ -1063,7 +969,7 @@
                   
                   // We need to read it even if transaction was not found, or the reading checks would fail
                   // Pair <OrderId, NumberOfElements>
-                  Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
 
                   if (tx != null)
                   {                     
@@ -1074,7 +980,7 @@
                         throw new IllegalStateException("Cannot find tx " + transactionID);
                      }
 
-                     boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, values);
+                     boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
                                           
                      if (healthy)
                      {
@@ -1109,6 +1015,7 @@
                }
                case ROLLBACK_RECORD:
                {
+                  
                   TransactionHolder tx = transactions.remove(transactionID);
                   
                   if (tx != null)
@@ -1119,7 +1026,8 @@
                      {
                         throw new IllegalStateException("Cannot find tx " + transactionID);
                      }
-                     
+
+                     // There is no need to validate summaries on Rollbacks.. We will ignore the data anyway.
                      tnp.rollback(file);  
                      
                      hasData = true;         
@@ -1136,6 +1044,8 @@
             
             checkSize = bb.getInt();
             
+            // This is a sanity check about the loading code itself.
+            // If this checkSize doesn't match, it means the reading method is not doing what it was supposed to do
             if (checkSize != variableSize + recordSize  + preparedTransactionDataSize)
             {
                throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
@@ -1193,7 +1103,7 @@
          
          if (this.reuseBufferSize > 0)
          {
-            currentFile.getFile().setBufferCallback(bufferCallback);
+            currentFile.getFile().setBufferCallback(buffersControl.callback);
          }
          
          currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
@@ -1436,6 +1346,30 @@
       debugWait();
    }
 
+   // Add/update methods only used on testcases (using a byte[]). Those methods are now part of the Test interface ------------------------
+   
+   
+   public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
+   {
+      appendAddRecord(id, recordType, new ByteArrayEncoding(record));
+   }
+   
+   public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
+   {
+      appendUpdateRecord(id, recordType, new ByteArrayEncoding(record));
+   }
+   
+   public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
+   {
+      appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+   }
+   
+   public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
+   {
+      appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+   }
+   
+   
    // MessagingComponent implementation ---------------------------------------------------
    
    public synchronized boolean isStarted()
@@ -1526,10 +1460,15 @@
       return jf;
    }
    
-   private Pair<Integer, Integer>[] readReferencesOnTransaction(final int variableSize, final ByteBuffer bb)
+   /** It will read the elements-summary back from the commit/prepare transaction 
+    *  Pair<FileID, Counter> */
+   @SuppressWarnings("unchecked") // See comment on the method body 
+   private Pair<Integer, Integer>[] readTransactionalElementsSummary(final int variableSize, final ByteBuffer bb)
    {
       int numberOfFiles = variableSize / (SIZE_INT * 2);
       
+      // This line aways throws a compilation-warning, even thought is a completely legal operation.
+      // We could remove the SuppressWarnings as soon as we find better expression on this line:
       Pair<Integer, Integer> values[] = (Pair<Integer, Integer> [])new Pair[numberOfFiles];
       
       for (int i = 0; i < numberOfFiles; i++)
@@ -1540,27 +1479,48 @@
       return values;
    }
 
+   
+   /**
+    * <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
+    * <p> We record a summary about the records on the journal file on COMMIT and PREPARE. 
+    *     When we load the records we build a new summary and we check the original summary to the current summary.
+    *     This method is basically verifying if the entire transaction is being loaded </p> 
+    * @param journalTransaction
+    * @param orderedFiles
+    * @param recordedSummary
+    * @return
+    */
    private boolean checkTransactionHealth(final JournalTransaction journalTransaction,
                                           final List<JournalFile> orderedFiles,
-                                          final Pair<Integer, Integer>[] readReferences)
+                                          final Pair<Integer, Integer>[] recordedSummary)
    {
       boolean healthy = true;
       
-      Map<Integer, AtomicInteger> refMap = journalTransaction.getElementsSummary();
       
-      for (Pair<Integer, Integer> ref: readReferences)
+      // (I) First we get the summary of what we really have on the files now:
+      
+      // FileID, NumberOfElements
+      Map<Integer, AtomicInteger> currentSummary = journalTransaction.getElementsSummary();
+
+      // (II) We compare the recorded summary on the commit, against to the reality on the files
+      for (Pair<Integer, Integer> ref: recordedSummary)
       {
-         AtomicInteger counter = refMap.get(ref.a);
+         AtomicInteger counter = currentSummary.get(ref.a);
          
          if (counter == null)
          {
-            // Couldn't find the counter, but if part of the transaction was reclaimed it is ok!
+            // (III) One of the original files didn't show any record. This would still be okay if the file was reclaimed
             boolean found = false;
             
             for (JournalFile lookupFile: orderedFiles)
             {
                if (lookupFile.getOrderingID() == ref.a)
                {
+                  // (IV) oops, we were expecting at least one record on this file. 
+                  //      The file still exists and no records were found. 
+                  //      That means the transaction crashed before complete, 
+                  //      so this transaction is broken and needs to be ignored.
+                  //      This is probably a hole caused by a crash during commit.
                   found = true;
                }
             }
@@ -1572,6 +1532,8 @@
          }
          else
          {
+            // (V) Missing a record... Transaction was not completed as stated. we will ignore the whole transaction
+            //      This is probably a hole caused by a crash during commit/prepare.
             if (counter.get() != ref.b)
             {
                healthy = false;
@@ -1582,6 +1544,29 @@
       return healthy;
    }
 
+   /**
+    * 
+    * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
+    * <p>For example, a transaction was spread along 3 journal files with 10 records on each file. 
+    *    (What could happen if there are too many records, or if an user event delayed records to come in time to a single file).</p>
+    * <p>The element-summary will then have</p>
+    * <p>FileID1, 10</p>
+    * <p>FileID2, 10</p>
+    * <p>FileID3, 10</p>
+    * 
+    * <br>
+    * <p> During the load the transaction needs to have 30 records, spread across the files as originally written.</p>
+    * <p> If for any reason there are missing records, that means the transaction was not completed and we should ignore the whole transaction </p>
+    * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed. 
+    *     That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
+    * 
+    * @param recordType
+    * @param txID
+    * @param tx
+    * @param transactionData
+    * @return
+    * @throws Exception
+    */
    private ByteBuffer writeCommitTransaction(final byte recordType, final long txID, final JournalTransaction tx) throws Exception
    {
       int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2;
@@ -1606,9 +1591,13 @@
       return bb;
    }
 
-   private ByteBuffer writePrepareTransaction(final byte recordType, final long txID, final JournalTransaction tx, EncodingSupport xid) throws Exception
+   /**
+    * TODO: Merge this method back into writeCommitTransaction (as it was done originally).
+    * For more explanations read the javadoc on writeCommitTransaction
+    */
+   private ByteBuffer writePrepareTransaction(final byte recordType, final long txID, final JournalTransaction tx, EncodingSupport transactionData) throws Exception
    {
-      int xidSize = xid.getEncodeSize();
+      int xidSize = transactionData.getEncodeSize();
       int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2 + xidSize + SIZE_INT;
 
       ByteBuffer bb = newBuffer(size);
@@ -1618,7 +1607,7 @@
       bb.putLong(txID);
       bb.putInt(xidSize);
       bb.putInt(tx.getElementsSummary().size());
-      xid.encode(new ByteBufferWrapper(bb));
+      transactionData.encode(new ByteBufferWrapper(bb));
 
       for (Map.Entry<Integer, AtomicInteger> entry: tx.getElementsSummary().entrySet())
       {
@@ -1765,6 +1754,12 @@
    }
    
    
+   /**
+    * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+    * @param keepOpened
+    * @return
+    * @throws Exception
+    */
    private JournalFile createFile(final boolean keepOpened) throws Exception
    {
       int orderingID = generateOrderingID();
@@ -1806,7 +1801,7 @@
       file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
       if (this.reuseBufferSize > 0)
       {
-         file.getFile().setBufferCallback(bufferCallback);
+         file.getFile().setBufferCallback(buffersControl.callback);
       }
    }
    
@@ -1844,7 +1839,12 @@
       currentFile = enqueueOpenFile();
    }
    
-   // You need to guarantee lock.acquire() before calling this method
+   /** 
+    * This method will immediatly return the opened file, and schedule opening and reclaiming.
+    * In case there are no cached opened files, this method will block until the file was opened. (what would happen only if the system is under load).
+    * 
+    * Warning: You need to guarantee lock.acquire() before calling this method
+    * */
    private JournalFile enqueueOpenFile() throws InterruptedException
    {
       if (trace) trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
@@ -1987,69 +1987,20 @@
          return null;
       }
    }
-   // -- Area reserved for the reuse buffer logic -----------------------------------------
    
-   private volatile long bufferReuseLastTime = System.currentTimeMillis();
-   private ByteBuffer newBuffer(int size)
+   public ByteBuffer newBuffer(int size)
    {
-      // if a new buffer wasn't requested in 10 seconds, we clear the queue
-      // This is being done this way as we don't need another Timeout Thread just to cleanup this
-      if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
-      {
-         log.debug("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
-         bufferReuseLastTime = System.currentTimeMillis();
-         reuseBuffers.clear();
-      }
-      
-      if (size > reuseBufferSize)
-      {
-         return fileFactory.newBuffer(size);
-      }
-      else
-      {
-
-         int alignedSize = fileFactory.calculateBlockSize(size);
-      
-         ByteBuffer buffer = this.reuseBuffers.poll();
-         if (buffer == null)
-         {
-            buffer = fileFactory.newBuffer(reuseBufferSize);
-            buffer.limit(alignedSize);
-         }
-         else
-         {
-            buffer.limit(alignedSize);
-
-            // we could gain some little performance if we could avoid clearing the buffer.
-            // On AIO this is being done with just a memset, what should be fairly quick
-            fileFactory.clearBuffer(buffer);
-         }
-         
-         buffer.rewind();
-
-         return buffer;         
-      }
+      return this.buffersControl.newBuffer(size);
    }
-   
-   private class LocalBufferCallback implements BufferCallback
-   {
 
-      public void bufferDone(ByteBuffer buffer)
-      {
-         bufferReuseLastTime = System.currentTimeMillis();
-         if (buffer.capacity() == reuseBufferSize)
-         {
-            reuseBuffers.offer(buffer);
-         }
-      }
-      
-   }
-   
    // ------------------------------------------------------------------------------------
    
    
    // Inner classes ---------------------------------------------------------------------------
    
+   
+   // Just encapsulates the VariableLatch waiting for transaction completions
+   // Used if the SequentialFile supports Callbacks
    private static class TransactionCallback implements IOCallback
    {      
       private final VariableLatch countLatch = new VariableLatch();
@@ -2087,6 +2038,7 @@
       
    }
    
+   /** Used on the ref-count for reclaiming */
    private static class PosFiles
    {
       private final JournalFile addFile;
@@ -2125,7 +2077,82 @@
          }
       }
    }
+
    
+   /** Class that will control buffer-reuse */
+   class ReuseBuffersController
+   {
+      private volatile long bufferReuseLastTime = System.currentTimeMillis();
+      
+      // This queue is feeded by LocalBufferCallback which is called directly by NIO or NIO.
+      // On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
+      // and ready to reuse or GC
+      private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+      
+      final BufferCallback callback = new LocalBufferCallback();
+
+      public  ByteBuffer newBuffer(int size)
+      {
+         // if a new buffer wasn't requested in 10 seconds, we clear the queue
+         // This is being done this way as we don't need another Timeout Thread just to cleanup this
+         if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
+         {
+            log.debug("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
+            bufferReuseLastTime = System.currentTimeMillis();
+            reuseBuffers.clear();
+         }
+         
+         // if a buffer is bigger than the configured-size, we just create a new buffer.
+         if (size > reuseBufferSize)
+         {
+            return fileFactory.newBuffer(size);
+         }
+         else
+         {
+
+            // We need to allocate buffers following the rules of the storage being used (AIO/NIO)
+            int alignedSize = fileFactory.calculateBlockSize(size);
+
+            // Try getting a buffer from the queue...
+            ByteBuffer buffer = this.reuseBuffers.poll();
+            if (buffer == null)
+            {
+               // if empty create a new one.
+               buffer = fileFactory.newBuffer(reuseBufferSize);
+               buffer.limit(alignedSize);
+            }
+            else
+            {
+               // set the limit of the buffer to the size being required 
+               buffer.limit(alignedSize);
+               fileFactory.clearBuffer(buffer);
+            }
+            
+            buffer.rewind();
+
+            return buffer;         
+         }
+      }
+      
+      private class LocalBufferCallback implements BufferCallback
+      {
+
+         public void bufferDone(ByteBuffer buffer)
+         {
+            bufferReuseLastTime = System.currentTimeMillis();
+            // If a buffer has any other than the configured size, the buffer will be just sent to GC
+            // This could happen if 
+            if (buffer.capacity() == reuseBufferSize)
+            {
+               reuseBuffers.offer(buffer);
+            }
+         }
+         
+      }
+      
+   }
+   
+   
    private class JournalTransaction
    {
       private List<Pair<JournalFile, Long>> pos;
@@ -2275,4 +2302,34 @@
       }
       
    }
+   
+   private static class ByteArrayEncoding implements EncodingSupport
+   {
+
+      final byte[] data;
+      
+      ByteArrayEncoding(final byte[] data)
+      {
+         this.data = data;
+      }
+      
+      public void decode(MessagingBuffer buffer)
+      {
+         throw new IllegalStateException("operation not supported");
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+         buffer.putBytes(data);
+      }
+
+      public int getEncodeSize()
+      {
+         return data.length;
+      }
+      
+   }
+   
+
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -91,7 +91,7 @@
     * 
     * Duplication detection for paging processing
     *  */
-   void loadLastPage(LastPageRecord lastPage) throws Exception;
+   void setLastPage(LastPageRecord lastPage) throws Exception;
    
    /** 
     * 

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -246,7 +246,7 @@
       return pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes(); 
    }
    
-   public void loadLastPage(LastPageRecord lastPage) throws Exception
+   public void setLastPage(LastPageRecord lastPage) throws Exception
    {
       System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
       this.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -179,7 +179,7 @@
    
 
    /**
-    * Depage one page-file, read it and send it to the pagingManager
+    * Depage one page-file, read it and send it to the pagingManager / postoffice
     * @return
     * @throws Exception
     */
@@ -197,10 +197,10 @@
       }
       page.open();
       PageMessage messages[] = page.read();
-      boolean needMorePages = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
+      boolean addressFull = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
       page.delete();
       
-      return needMorePages;
+      return addressFull;
 
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -203,9 +203,7 @@
 
 	public void storeAcknowledge(final long queueID, final long messageID) throws Exception
 	{		
-	   EncodingSupport record = ackBytes(queueID, messageID);
-		
-		messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, record);					
+		messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new ACKEncoding(queueID));					
 	}
 	
 	public void storeDelete(final long messageID) throws Exception
@@ -244,9 +242,7 @@
 
    public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
    {
-   	EncodingSupport record = ackBytes(queueID, messageID);
-		
-		messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, record);	
+		messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new ACKEncoding(queueID));	
    }
    
    public void storeDeleteTransactional(long txID, long messageID) throws Exception
@@ -273,19 +269,11 @@
    
 	public void updateDeliveryCount(final MessageReference ref) throws Exception
 	{
-		byte[] bytes = new byte[SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT];
+		DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getPersistenceID(), ref.getDeliveryCount());
 		
-		ByteBuffer bb = ByteBuffer.wrap(bytes);
-		
-		bb.putLong(ref.getQueue().getPersistenceID());
-		
-		bb.putLong(ref.getMessage().getMessageID());
-		
-		bb.putInt(ref.getDeliveryCount());
-		
-		messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), UPDATE_DELIVERY_COUNT, bytes);
+		messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), UPDATE_DELIVERY_COUNT, updateInfo);
 	}
-
+	
 	public void loadMessages(final PostOffice postOffice, final Map<Long, Queue> queues, ResourceManager resourceManager) throws Exception
 	{
 		List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -358,47 +346,15 @@
 			byte[] data = record.data;
 			
 			ByteBuffer bb = ByteBuffer.wrap(data);
+
+			MessagingBuffer buff = new ByteBufferWrapper(bb);
 			
 			byte recordType = record.getUserRecordType();
 			
 			switch (recordType)
 			{
-			   case PAGE_TRANSACTION:
-			   {
-               MessagingBuffer buff = new ByteBufferWrapper(bb);
-               
-               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
-               
-               pageTransactionInfo.decode(buff);
-               
-               pageTransactionInfo.setRecordID(record.id);
-               
-               PagingManager pagingManager = postOffice.getPagingManager();
-               
-               pagingManager.addTransaction(pageTransactionInfo);
-
-               break;
-			   }
-			   case LAST_PAGE:
-			   {
-               MessagingBuffer buff = new ByteBufferWrapper(bb);
-               
-			      LastPageRecordImpl recordImpl = new LastPageRecordImpl();
-			      
-			      recordImpl.setRecordId(record.id);
-			      
-			      recordImpl.decode(buff);
-               
-               PagingManager pagingManager = postOffice.getPagingManager();
-               
-               pagingManager.loadLastPage(recordImpl);
-			      
-			      break;
-			   }
 				case ADD_MESSAGE:
 				{
-					MessagingBuffer buff = new ByteBufferWrapper(bb);
-
 					ServerMessage message = new ServerMessageImpl(record.id);
 					
 					message.decode(buff);
@@ -414,15 +370,17 @@
 				}
 				case ACKNOWLEDGE_REF:
 				{
-					long queueID = bb.getLong();
+               long messageID = record.id;
+
+               ACKEncoding encoding = new ACKEncoding();
+				   encoding.decode(buff);
+
 					
-					long messageID = bb.getLong();
+					Queue queue = queues.get(encoding.queueID);
 					
-					Queue queue = queues.get(queueID);
-					
 					if (queue == null)
 					{
-						throw new IllegalStateException("Cannot find queue with id " + queueID);
+						throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
 					}
 					
 					MessageReference removed = queue.removeReferenceWithID(messageID);
@@ -436,17 +394,17 @@
 				}
 				case UPDATE_DELIVERY_COUNT:
 				{
-					long queueID = bb.getLong();
+				   long messageID = record.id;
+				   
+				   DeliveryCountUpdateEncoding deliveryUpdate = new DeliveryCountUpdateEncoding();
+				   
+				   deliveryUpdate.decode(buff);
 					
-					long messageID = bb.getLong();
+					Queue queue = queues.get(deliveryUpdate.queueID);
 					
-					int deliveryCount = bb.getInt();
-					
-					Queue queue = queues.get(queueID);
-					
 					if (queue == null)
 					{
-						throw new IllegalStateException("Cannot find queue with id " + queueID);
+						throw new IllegalStateException("Cannot find queue with id " + deliveryUpdate.queueID);
 					}
 					
 					MessageReference reference = queue.getReference(messageID);
@@ -456,11 +414,40 @@
 						throw new IllegalStateException("Failed to find reference for " + messageID);
 					}
 					
-					reference.setDeliveryCount(deliveryCount);
+					reference.setDeliveryCount(deliveryUpdate.count);
 					
 					break;
 					
 				}
+            case PAGE_TRANSACTION:
+            {
+               
+               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+               
+               pageTransactionInfo.decode(buff);
+               
+               pageTransactionInfo.setRecordID(record.id);
+               
+               PagingManager pagingManager = postOffice.getPagingManager();
+               
+               pagingManager.addTransaction(pageTransactionInfo);
+
+               break;
+            }
+            case LAST_PAGE:
+            {
+               LastPageRecordImpl recordImpl = new LastPageRecordImpl();
+               
+               recordImpl.setRecordId(record.id);
+               
+               recordImpl.decode(buff);
+               
+               PagingManager pagingManager = postOffice.getPagingManager();
+               
+               pagingManager.setLastPage(recordImpl);
+               
+               break;
+            }
 				case SET_SCHEDULED_DELIVERY_TIME:
 				{
 					//TODO
@@ -477,19 +464,6 @@
 	
 	public void addBinding(Binding binding) throws Exception
 	{
-		 ByteArrayOutputStream baos = new ByteArrayOutputStream();
-	      
-		 DataOutputStream daos = new DataOutputStream(baos);
-
-		 /*
-	      We store:
-		  * 
-		  * Queue name
-		  * Address string
-		  * All nodes?
-		  * Filter string
-		  */
-
 		 Queue queue = binding.getQueue();
 
 		 //We generate the queue id here
@@ -497,39 +471,25 @@
 		 long queueID = bindingIDSequence.getAndIncrement();
 
 		 queue.setPersistenceID(queueID);
-
-		 byte[] nameBytes = queue.getName().getData();
 		 
-		 daos.writeInt(nameBytes.length);
+		 final SimpleString filterString;
 		 
-		 daos.write(nameBytes);
+		 final Filter filter = queue.getFilter();
 		 
-		 byte[] addressBytes = binding.getAddress().getData();
-
-		 daos.writeInt(addressBytes.length);
-		 
-		 daos.write(addressBytes);
-
-		 Filter filter = queue.getFilter();
-
-		 daos.writeBoolean(filter != null);
-
 		 if (filter != null)
 		 {
-			 byte[] filterBytes = queue.getFilter().getFilterString().getData();
-
-			 daos.writeInt(filterBytes.length);
-			 
-			 daos.write(filterBytes);
+		    filterString = filter.getFilterString();
 		 }
-
-		 daos.flush();
-
-		 byte[] data = baos.toByteArray();
+		 else
+		 {
+		    filterString = null;
+		 }
 		 
-		 bindingsJournal.appendAddRecord(queueID, BINDING_RECORD, data);
+		 BindingEncoding bindingEncoding = new BindingEncoding(binding.getQueue().getName(), binding.getAddress(), filterString);
+		 
+		 bindingsJournal.appendAddRecord(queueID, BINDING_RECORD, bindingEncoding);
 	}
-
+	
 	public void deleteBinding(Binding binding) throws Exception
 	{
 		long id = binding.getQueue().getPersistenceID();
@@ -553,22 +513,10 @@
 		}
 		else
 		{
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-	      
-			DataOutputStream daos = new DataOutputStream(baos);
+			DestinationEncoding destinationEnc = new DestinationEncoding(destination);
 			
-			byte[] destBytes = destination.getData();
+			bindingsJournal.appendAddRecord(destinationID, DESTINATION_RECORD, destinationEnc);
 			
-			daos.writeInt(destBytes.length);
-			
-			daos.write(destBytes);
-			
-			daos.flush();
-			
-			byte[] data = baos.toByteArray();
-			
-			bindingsJournal.appendAddRecord(destinationID, DESTINATION_RECORD, data);
-			
 			return true;
 		}		
 	}
@@ -600,57 +548,38 @@
 		{		  
 			long id = record.id;
 			
-			byte[] data = record.data;
+         MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(record.data));
 
-			ByteArrayInputStream bais = new ByteArrayInputStream(data);
-
-			DataInputStream dais = new DataInputStream(bais);
-			
 			byte rec = record.getUserRecordType();
 			
 			if (rec == BINDING_RECORD)
 			{
-				int len = dais.readInt();
-				byte[] queueNameBytes = new byte[len];
-				dais.read(queueNameBytes);
-				SimpleString queueName = new SimpleString(queueNameBytes);
-
-				len = dais.readInt();
-				byte[] addressBytes = new byte[len];
-				dais.read(addressBytes);
-				SimpleString address = new SimpleString(addressBytes);
-
-				Filter filter = null;
-
-				if (dais.readBoolean())
-				{
-					len = dais.readInt();
-					byte[] filterBytes = new byte[len];
-					dais.read(filterBytes);
-					SimpleString filterString = new SimpleString(filterBytes);
-										
-					filter = new FilterImpl(filterString);
-				}
-
-				Queue queue = queueFactory.createQueue(id, queueName, filter, true);
+			   
+			   BindingEncoding encodeBinding = new BindingEncoding();
+			   encodeBinding.decode(buffer);
+			   
+			   Filter filter = null;
+			   
+			   if (encodeBinding.filter != null)
+			   {
+			      filter = new FilterImpl(encodeBinding.filter);
+			   }
+			   
+				Queue queue = queueFactory.createQueue(id, encodeBinding.queueName, filter, true);
 			
-				Binding binding = new BindingImpl(address, queue);
+				Binding binding = new BindingImpl(encodeBinding.address, queue);
 				
 				bindings.add(binding);      
 			}
 			else if (rec == DESTINATION_RECORD)
 			{
-				int len = dais.readInt();
+			   
+			   DestinationEncoding destEnc = new DestinationEncoding();
+			   destEnc.decode(buffer);
+
+			   destinationIDMap.put(destEnc.destination, id);
 				
-				byte[] destData = new byte[len];
-				
-				dais.read(destData);
-				
-				SimpleString destinationName = new SimpleString(destData);
-				
-				destinationIDMap.put(destinationName, id);
-				
-				destinations.add(destinationName);
+				destinations.add(destEnc.destination);
 			}
 			else
 			{
@@ -661,6 +590,7 @@
 		bindingIDSequence.set(maxID + 1);
 	}
 	
+	
 	// MessagingComponent implementation ------------------------------------------------------
 
 	public synchronized void start() throws Exception
@@ -710,30 +640,6 @@
 	
 	// Private ----------------------------------------------------------------------------------
 	
-	private EncodingSupport ackBytes(final long queueID, final long messageID)
-   {
-	   // Using an EncodingSupport, to avoid some byteArrayCopy
-      return new EncodingSupport()
-      {
-
-         public void decode(MessagingBuffer buffer)
-         {
-            throw new UnsupportedOperationException();
-         }
-
-         public void encode(MessagingBuffer buffer)
-         {
-            buffer.putLong(queueID);
-            buffer.putLong(messageID);
-         }
-
-         public int getEncodeSize()
-         {
-            return SIZE_LONG * 2;
-         }
-       
-      };
-   }
 	
 	private void checkAndCreateDir(String dir, boolean create)
 	{
@@ -800,4 +706,154 @@
 	   
 	}
 
+   private static class BindingEncoding implements EncodingSupport
+   {
+      
+      SimpleString queueName;
+      SimpleString address;
+      SimpleString filter;
+
+      public BindingEncoding()
+      {
+         
+      }
+      
+      public BindingEncoding(SimpleString queueName,
+            SimpleString address, SimpleString filter)
+      {
+         super();
+         this.queueName = queueName;
+         this.address = address;
+         this.filter = filter;
+      }
+
+      public void decode(MessagingBuffer buffer)
+      {
+         queueName = buffer.getSimpleString();
+         address = buffer.getSimpleString();
+         filter = buffer.getNullableSimpleString();
+         
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+         buffer.putSimpleString(queueName);
+         buffer.putSimpleString(address);
+         buffer.putNullableSimpleString(filter);
+      }
+
+      public int getEncodeSize()
+      {
+         return SimpleString.sizeofString(queueName) +
+                SimpleString.sizeofString(address) +
+                1 + // HasFilter?
+                ((filter != null) ? SimpleString.sizeofString(filter) : 0);
+      }
+   }
+
+   private static class DestinationEncoding implements EncodingSupport
+   {
+
+      SimpleString destination;
+      
+      DestinationEncoding(SimpleString destination)
+      {
+         this.destination = destination;
+      }
+      
+      DestinationEncoding()
+      {
+      }
+      
+      public void decode(MessagingBuffer buffer)
+      {
+         this.destination = buffer.getSimpleString();
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+         buffer.putSimpleString(destination);
+      }
+
+      public int getEncodeSize()
+      {
+         return SimpleString.sizeofString(destination);
+      }
+      
+   }
+   
+   private static class DeliveryCountUpdateEncoding implements EncodingSupport
+   {
+      long queueID;
+      int count;
+      
+      public DeliveryCountUpdateEncoding()
+      {
+         super();
+      }
+      
+      public DeliveryCountUpdateEncoding(long queueID, int count)
+      {
+         super();
+         this.queueID = queueID;
+         this.count = count;
+      }
+
+      public void decode(MessagingBuffer buffer)
+      {
+         queueID = buffer.getLong();
+         count = buffer.getInt();
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+         buffer.putLong(queueID);
+         buffer.putInt(count);
+      }
+
+      public int getEncodeSize()
+      {
+         return 8 + 4;
+      }
+      
+   }
+   
+   
+   private class ACKEncoding implements EncodingSupport
+   {
+      long queueID;
+      
+      
+
+      public ACKEncoding(long queueID)
+      {
+         super();
+         this.queueID = queueID;
+      }
+
+      public ACKEncoding()
+      {
+         super();
+      }
+
+      public void decode(MessagingBuffer buffer)
+      {
+         this.queueID = buffer.getLong();
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+         buffer.putLong(queueID);
+      }
+
+      public int getEncodeSize()
+      {
+         return 8;
+      }
+      
+   }
+   
+   
+
+   
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -31,6 +31,7 @@
 import org.jboss.messaging.core.journal.impl.JournalImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
 
 /**
  * 
@@ -211,7 +212,7 @@
       {
          final int numMessages = 50050;
          
-         byte[] data = new byte[1024];
+         SimpleEncoding data = new SimpleEncoding(1024, (byte)'j');
          
          long start = System.currentTimeMillis();
          
@@ -270,7 +271,7 @@
       journal.load(new ArrayList<RecordInfo>(), null);
             
       log.debug("Adding data");
-      byte[] data = new byte[700];
+      SimpleEncoding data = new SimpleEncoding(700, (byte)'j');
       
       long start = System.currentTimeMillis();
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -88,12 +88,7 @@
       final long queueID = 1210981;
       final long messageID = 101921092;
       
-      byte[] record = new byte[16];      
-      ByteBuffer bb = ByteBuffer.wrap(record);      
-      bb.putLong(queueID);      
-      bb.putLong(messageID);
-      
-      messageJournal.appendUpdateRecord(EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(record));  
+      messageJournal.appendUpdateRecord(EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(autoEncode(queueID)));  
       EasyMock.replay(messageJournal, bindingsJournal);      
       jsm.storeAcknowledge(queueID, messageID);     
       EasyMock.verify(messageJournal, bindingsJournal);
@@ -141,13 +136,8 @@
       final long queueID = 1210981;
       final long messageID = 101921092;
       
-      byte[] record = new byte[16];      
-      ByteBuffer bb = ByteBuffer.wrap(record);      
-      bb.putLong(queueID);      
-      bb.putLong(messageID);
-      
       final long txID = 12091921;
-      messageJournal.appendUpdateRecordTransactional(EasyMock.eq(txID), EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(record));  
+      messageJournal.appendUpdateRecordTransactional(EasyMock.eq(txID), EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(autoEncode(queueID)));  
       EasyMock.replay(messageJournal, bindingsJournal);      
       jsm.storeAcknowledgeTransactional(txID, queueID, messageID);     
       EasyMock.verify(messageJournal, bindingsJournal);
@@ -228,12 +218,6 @@
       final long queueID = 1283743;
       final int deliveryCount = 4757;
       
-      byte[] bytes = new byte[21];      
-      ByteBuffer bb = ByteBuffer.wrap(bytes);      
-      bb.putLong(queueID);      
-      bb.putLong(msgID);      
-      bb.putInt(deliveryCount);
-      
       MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
       ServerMessage msg = EasyMock.createStrictMock(ServerMessage.class);
       Queue queue = EasyMock.createStrictMock(Queue.class);
@@ -243,7 +227,7 @@
       EasyMock.expect(msg.getMessageID()).andStubReturn(msgID);
       EasyMock.expect(ref.getDeliveryCount()).andReturn(deliveryCount);
       
-      messageJournal.appendUpdateRecord(EasyMock.eq(msgID), EasyMock.eq(JournalStorageManager.UPDATE_DELIVERY_COUNT), EasyMock.aryEq(bytes));
+      messageJournal.appendUpdateRecord(EasyMock.eq(msgID), EasyMock.eq(JournalStorageManager.UPDATE_DELIVERY_COUNT), compareEncodingSupport(autoEncode(queueID, deliveryCount)));
       EasyMock.replay(messageJournal, bindingsJournal, ref, msg, queue);      
       jsm.updateDeliveryCount(ref);
       EasyMock.verify(messageJournal, bindingsJournal, ref, msg, queue);
@@ -319,10 +303,9 @@
       RecordInfo record5 = new RecordInfo(msg2ID, JournalStorageManager.ACKNOWLEDGE_REF, ack3Bytes, true);
       
       final int deliveryCount = 4757;      
-      byte[] updateBytes = new byte[21];      
+      byte[] updateBytes = new byte[12];      
       ByteBuffer bb4 = ByteBuffer.wrap(updateBytes);      
       bb4.putLong(queue1ID);      
-      bb4.putLong(msg1ID);      
       bb4.putInt(deliveryCount);
       RecordInfo record6 = new RecordInfo(msg1ID, JournalStorageManager.UPDATE_DELIVERY_COUNT, updateBytes, true);
       
@@ -459,7 +442,7 @@
       log.debug("** data length is " + data.length);
       log.debug(UnitTestCase.dumpBytes(data));
        
-      bindingsJournal.appendAddRecord(EasyMock.eq(0L), EasyMock.eq(JournalStorageManager.BINDING_RECORD), EasyMock.aryEq(data));
+      bindingsJournal.appendAddRecord(EasyMock.eq(0L), EasyMock.eq(JournalStorageManager.BINDING_RECORD), compareEncodingSupport(data));
       
       if (useFilter)
       {
@@ -548,7 +531,7 @@
       daos.write(destBytes);      
       daos.flush();      
       byte[] data = baos.toByteArray();
-      bindingsJournal.appendAddRecord(EasyMock.eq(0L), EasyMock.eq(JournalStorageManager.DESTINATION_RECORD), EasyMock.aryEq(data));
+      bindingsJournal.appendAddRecord(EasyMock.eq(0L), EasyMock.eq(JournalStorageManager.DESTINATION_RECORD), compareEncodingSupport(data));
                   
       EasyMock.replay(messageJournal, bindingsJournal);
       
@@ -579,7 +562,7 @@
       daos.write(destBytes);      
       daos.flush();      
       data = baos.toByteArray();
-      bindingsJournal.appendAddRecord(EasyMock.eq(2L), EasyMock.eq(JournalStorageManager.DESTINATION_RECORD), EasyMock.aryEq(data));
+      bindingsJournal.appendAddRecord(EasyMock.eq(2L), EasyMock.eq(JournalStorageManager.DESTINATION_RECORD), compareEncodingSupport(data));
 
       EasyMock.replay(messageJournal, bindingsJournal);
       

Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-09-09 22:41:33 UTC (rev 4924)
@@ -43,7 +43,9 @@
 import org.easymock.EasyMock;
 import org.easymock.IArgumentMatcher;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -275,6 +277,50 @@
       return null;
    }
 
+   protected EncodingSupport compareEncodingSupport(final byte expectedArray[])
+   {
+      
+      EasyMock.reportMatcher(new IArgumentMatcher()
+      {
+
+         public void appendTo(StringBuffer buffer)
+         {
+            buffer.append("EncodingSupport buffer didn't match");
+         }
+
+         public boolean matches(Object argument)
+         {
+            EncodingSupport encoding = (EncodingSupport) argument;
+
+            final int size = encoding.getEncodeSize();
+            
+            if (size != expectedArray.length)
+            {
+               System.out.println(size + " != " + expectedArray.length);
+               return false;
+            }
+            
+            byte[] compareArray = new byte[size];
+            
+            MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(compareArray));
+            encoding.encode(buffer);
+            
+            for (int i = 0; i < expectedArray.length; i++)
+            {
+               if (expectedArray[i] != compareArray[i])
+               {
+                  return false;
+               }
+            }
+            
+            return true;
+         }
+         
+      });
+      
+      return null;
+   }
+
    
 
    protected boolean deleteDirectory(File directory)




More information about the jboss-cvs-commits mailing list