[jboss-cvs] JBoss Messaging SVN: r4925 - in trunk/src/main/org/jboss/messaging/core/journal: impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Sep 10 05:29:43 EDT 2008


Author: timfox
Date: 2008-09-10 05:29:43 -0400 (Wed, 10 Sep 2008)
New Revision: 4925

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
   trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
Log:
Some tweaks to the journal


Modified: trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java	2008-09-09 22:41:33 UTC (rev 4924)
+++ trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java	2008-09-10 09:29:43 UTC (rev 4925)
@@ -38,15 +38,17 @@
 public class PreparedTransactionInfo
 {
    public final long id;
-   public final byte[] xidData;
    
+   public final byte[] extraData;
+   
    public final List<RecordInfo> records = new ArrayList<RecordInfo>();
    
    public final Set<Long> recordsToDelete = new HashSet<Long>();
 
-   public PreparedTransactionInfo(final long id, final byte[] xidData)
+   public PreparedTransactionInfo(final long id, final byte[] extraData)
    {
       this.id = id;
-      this.xidData = xidData;
+      
+      this.extraData = extraData;
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2008-09-09 22:41:33 UTC (rev 4924)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2008-09-10 09:29:43 UTC (rev 4925)
@@ -75,6 +75,8 @@
    
    // These add methods are only used by testCases
    
+   //FIXME: These methods should be removed - they are only used by tests
+   
    void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
    
    void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-09-09 22:41:33 UTC (rev 4924)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-09-10 09:29:43 UTC (rev 4925)
@@ -71,6 +71,7 @@
  * <p>WIKI Page: <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal"> http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal</a></p>
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
@@ -84,6 +85,19 @@
    
    private static final int STATE_LOADED = 2;
    
+   // Static --------------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(JournalImpl.class);
+   
+   private static final boolean trace = log.isTraceEnabled();
+   
+   // This method exists just to make debug easier.
+   // I could replace log.trace by log.info temporarily while I was debugging Journal 
+   private static final void trace(String message)
+   {      
+      log.trace(message);
+   }
+   
    // The sizes of primitive types
    
    private static final int SIZE_LONG = 8;
@@ -139,11 +153,10 @@
    public static final byte ROLLBACK_RECORD = 19;
    
    public static final byte FILL_CHARACTER = 74; // Letter 'J' 
-   
-   
+      
    // Attributes ----------------------------------------------------
    
-   private boolean autoReclaim = true;
+   private volatile boolean autoReclaim = true;
    
    private final AtomicInteger nextOrderingId = new AtomicInteger(0);
    
@@ -183,11 +196,6 @@
    /** Object that will control buffer's callback and getting buffers from the queue */
    private final ReuseBuffersController buffersControl = new ReuseBuffersController();
    
-   /*
-    * We use a semaphore rather than synchronized since it performs better when
-    * contended
-    */
-   
    //TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
    private final Semaphore lock = new Semaphore(1, true);
    
@@ -198,20 +206,7 @@
    private final AtomicLong transactionIDSequence = new AtomicLong(0);
    
    private final Reclaimer reclaimer = new Reclaimer();
-   
-   // Static --------------------------------------------------------
-   
-   private static final Logger log = Logger.getLogger(JournalImpl.class);
-   
-   private static final boolean trace = log.isTraceEnabled();
-   
-   // This method exists just to make debug easier.
-   // I could replace log.trace by log.info temporarily while I was debugging Journal 
-   private static final void trace(String message)
-   {      
-      log.trace(message);
-   }
-   
+      
    // Constructors --------------------------------------------------
    
    public JournalImpl(final int fileSize, final int minFiles,
@@ -285,13 +280,14 @@
       ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
       
       bb.putByte(ADD_RECORD);     
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1); // skip ID part
       bb.putLong(id);
       bb.putInt(recordLength);
       bb.putByte(recordType);
       record.encode(bb);
       bb.putInt(size);        
-      bb.rewind();
+      bb.rewind();  // TODO is rewind necessary?
       
       try
       {                 
@@ -326,13 +322,14 @@
       ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
       
       bb.putByte(UPDATE_RECORD);     
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+    //  bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1); // skip ID part
       bb.putLong(id);      
       bb.putInt(record.getEncodeSize());
       bb.putByte(recordType);
       record.encode(bb);
       bb.putInt(size);     
-      bb.rewind();
+      bb.rewind(); //Is this necessary?
       
       lock.acquire();
 
@@ -367,10 +364,11 @@
       ByteBuffer bb = newBuffer(size); 
       
       bb.put(DELETE_RECORD);     
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1); // skip ID part
       bb.putLong(id);      
       bb.putInt(size);     
-      bb.rewind();
+      bb.rewind();  //Is this necessary?
       
       lock.acquire();
       
@@ -406,14 +404,15 @@
       ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size)); 
       
       bb.putByte(ADD_RECORD_TX);
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+     // bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1); // skip ID part
       bb.putLong(txID);
       bb.putLong(id);
       bb.putInt(recordLength);
       bb.putByte(recordType);
       record.encode(bb);
       bb.putInt(size);     
-      bb.rewind();
+      bb.rewind();  //Is this necessary?
       
       lock.acquire();
       
@@ -431,7 +430,7 @@
       }
    }
    
-   public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
+   public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final EncodingSupport record) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -443,14 +442,15 @@
       ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size)); 
             
       bb.putByte(UPDATE_RECORD_TX);     
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1); // skip ID part
       bb.putLong(txID);
       bb.putLong(id);      
       bb.putInt(record.getEncodeSize());
       bb.putByte(recordType);
       record.encode(bb);
       bb.putInt(size);     
-      bb.rewind();
+      bb.rewind(); //Is this necessary?
       
       lock.acquire();
       
@@ -480,11 +480,12 @@
       ByteBuffer bb = newBuffer(size); 
       
       bb.put(DELETE_RECORD_TX);     
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1);
       bb.putLong(txID);    
       bb.putLong(id);      
       bb.putInt(size);     
-      bb.rewind();
+      bb.rewind(); //Is this necessary
       
       lock.acquire();
       
@@ -507,13 +508,13 @@
     * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
     *     back to a state it could be committed. </p>
     * 
-    * <p> transactionData is usually a safe space you could use to store things like XIDs or any other supporting user-data required to replay the transaction </p>
+    * <p> transactionData allows you to store any other supporting user-data related to the transaction</p>
     * 
     * @param txID
-    * @param transactionData Information to support the system replaying the transaction
+    * @param transactionData - extra user data for the prepare
     * @throws Exception
     */
-   public void appendPrepareRecord(final long txID, EncodingSupport transactionData) throws Exception
+   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -589,10 +590,11 @@
       ByteBuffer bb = newBuffer(size); 
       
       bb.put(ROLLBACK_RECORD);      
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1); // skip ID part
       bb.putLong(txID);
       bb.putInt(size);        
-      bb.rewind();
+      bb.rewind(); // Is this necessary?
       
       lock.acquire();
       
@@ -649,7 +651,7 @@
       
       return maxID;
    }
-   
+      
    public synchronized long load(final LoadManager loadManager) throws Exception
    {      
       if (state != STATE_STARTED)
@@ -677,6 +679,8 @@
          
          if (bytesRead != fileSize)
          {
+            //FIXME - shouldn't be just ignore the file and log a warning, rather than throw ISE?
+            //We don't want to leave the user with an unusable system
             throw new IllegalStateException("File is wrong size " + bytesRead +
                                             " expected " + fileSize + " : " + file.getFile().getFileName());
          }
@@ -691,6 +695,7 @@
             final int pos = bb.position();
             
             byte recordType = bb.get();
+            
             if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
             {
                // I - We scan for any valid record on the file. If a hole happened on the middle of the file we keep looking until all the possibilities are gone
@@ -726,30 +731,37 @@
                {
                   continue;
                }
+               
                transactionID = bb.getLong();
+               
                maxTransactionID = Math.max(maxTransactionID, transactionID); 
             }
 
             long recordID = 0;
+            
             if (!isCompleteTransaction(recordType))
             {
                if (bb.position() + SIZE_LONG > fileSize)
                {
                   continue;
                }
+               
                recordID = bb.getLong();
+               
                maxID = Math.max(maxID, recordID);
             }
-
             
             // We use the size of the record to validate the health of the record.
             // (V) We verify the size of the record
             
             // The variable record portion used on Updates and Appends
             int variableSize = 0;
-            // Used to hold XIDs on PrepareTransactions 
-            int preparedTransactionDataSize = 0;
+            
+            // Used to hold extra data on transaction prepares
+            int preparedTransactionExtraDataSize = 0;
+            
             byte userRecordType = 0;
+            
             byte record[] = null;
             
             if (isContainsBody(recordType))
@@ -763,21 +775,24 @@
                
                if (bb.position() + variableSize > fileSize)
                {
+                  //TODO - isn't this an error?
                   continue;
                }
                
                userRecordType = bb.get();
                
                record = new byte[variableSize];
+               
                bb.get(record);
             }
-            
+                                    
             if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
             {
-               if(recordType == PREPARE_RECORD)
+               if (recordType == PREPARE_RECORD)
                {
-                  preparedTransactionDataSize = bb.getInt();
-               }
+                  preparedTransactionExtraDataSize = bb.getInt();
+               }               
+               //Comment required: I'm guessing this is because commits and prepares also include the record ids?
                variableSize += bb.getInt() * SIZE_INT * 2;
             }
 
@@ -785,38 +800,45 @@
 
             // VI - this is completing V,  We will validate the size at the end of the record,
             //      But we avoid buffer overflows by damaged data
-            if (pos + recordSize + variableSize  + preparedTransactionDataSize > fileSize)
+            if (pos + recordSize + variableSize  + preparedTransactionExtraDataSize > fileSize)
             {
                // Avoid a buffer overflow caused by damaged data... continue scanning for more records...
+               
+               //TODO - isn't this an error?
+               
                continue;
             }
             
             int oldPos = bb.position();
             
-            bb.position(pos + variableSize + recordSize  + preparedTransactionDataSize - SIZE_INT);
+            bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
             
             int checkSize = bb.getInt();
             
             // VII - The checkSize at the end has to match with the size informed at the beggining.
             //       This is like testing a hash for the record. (We could replace the checkSize by some sort of calculated hash)
-            if (checkSize != variableSize + recordSize  + preparedTransactionDataSize)
+            if (checkSize != variableSize + recordSize  + preparedTransactionExtraDataSize)
             {
                log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
+               
                // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+               
                hasData = true;
+               
                bb.position(pos + SIZE_BYTE);
+               
                continue;
             }
             
             bb.position(oldPos);
+                        
+            // At this point everything is checked. So we relax and just load the data now.
             
-            
-            // At this point everything is already check. So relax and just load the data now.
-            
-            switch(recordType)
+            switch (recordType)
             {
                case ADD_RECORD:
                {                          
+                  
                   loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
                   
                   posFilesMap.put(recordID, new PosFiles(file));
@@ -828,7 +850,9 @@
                case UPDATE_RECORD:                 
                {
                   loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+                  
                   hasData = true;      
+                  
                   file.incPosCount();
                   
                   PosFiles posFiles = posFilesMap.get(recordID);
@@ -846,6 +870,7 @@
                case DELETE_RECORD:                 
                {
                   loadManager.deleteRecord(recordID);
+                  
                   hasData = true;
                   
                   PosFiles posFiles = posFilesMap.remove(recordID);
@@ -864,11 +889,12 @@
                   
                   if (tx == null)
                   {
-                     tx = new TransactionHolder(transactionID);                        
+                     tx = new TransactionHolder(transactionID); 
+                     
                      transactions.put(transactionID, tx);
                   }
                   
-                  tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));                     
+                  tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType == UPDATE_RECORD_TX));                     
                   
                   JournalTransaction tnp = transactionInfos.get(transactionID);
                   
@@ -891,7 +917,8 @@
                   
                   if (tx == null)
                   {
-                     tx = new TransactionHolder(transactionID);                        
+                     tx = new TransactionHolder(transactionID);         
+                     
                      transactions.put(transactionID, tx);
                   }
                   
@@ -915,52 +942,50 @@
                case PREPARE_RECORD:
                {
                   TransactionHolder tx = transactions.get(transactionID);
-                  
-                  
+                                    
                   if (tx == null)
                   {
-                     tx = new TransactionHolder(transactionID);                        
+                     tx = new TransactionHolder(transactionID);  
+                     
                      transactions.put(transactionID, tx);
                   }
-                  
-                  
+                                    
                   // We need to read it even if transaction was not found, or the reading checks would fail
 
-                  byte xidData[] = new byte[preparedTransactionDataSize];
-                  bb.get(xidData);
+                  byte extraData[] = new byte[preparedTransactionExtraDataSize];
+                  
+                  bb.get(extraData);
 
                   // Pair <FileID, NumberOfElements>
                   Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
-
-                  if (tx != null)
-                  {                     
-                     tx.prepared = true;
-                     tx.xidData = xidData;
-                     JournalTransaction journalTransaction = transactionInfos.get(transactionID);
+                  
+                  tx.prepared = true;
+                  
+                  tx.extraData = extraData;
+                  
+                  JournalTransaction journalTransaction = transactionInfos.get(transactionID);
+                                       
+                  if (journalTransaction == null)
+                  {
+                     journalTransaction = new JournalTransaction();
                      
-                     
-                     if (journalTransaction == null)
-                     {
-                        journalTransaction = new JournalTransaction();
-                        
-                        transactionInfos.put(transactionID, journalTransaction);
-                     }
-                     
-                     boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
-                     
-                     if (healthy)
-                     {
-                        journalTransaction.prepare(file);
-                     }
-                     else
-                     {
-                        log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
-                        tx.invalid = true;
-                     }
-                     
-                     hasData = true;
+                     transactionInfos.put(transactionID, journalTransaction);
                   }
                   
+                  boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
+                  
+                  if (healthy)
+                  {
+                     journalTransaction.prepare(file);
+                  }
+                  else
+                  {
+                     log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
+                     tx.invalid = true;
+                  }
+                  
+                  hasData = true;                  
+                  
                   break;
                }
                case COMMIT_RECORD:
@@ -996,26 +1021,31 @@
                            }
                         }
                         
-                        for (Long deleteValue: tx.recordsToDelete)
+                        for (long deleteValue: tx.recordsToDelete)
                         {
                            loadManager.deleteRecord(deleteValue);
                         }
+                        
                         journalTransaction.commit(file);       
                      }
                      else
                      {
                         log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
+                        
                         journalTransaction.forget();
                      }
                      
                      hasData = true;         
                   }
+                  else
+                  {
+                     //TODO isn't this an error?
+                  }
                   
                   break;
                }
                case ROLLBACK_RECORD:
-               {
-                  
+               {                  
                   TransactionHolder tx = transactions.remove(transactionID);
                   
                   if (tx != null)
@@ -1032,6 +1062,10 @@
                      
                      hasData = true;         
                   }
+                  else
+                  {
+                     //TODO is this an error?
+                  }
                   
                   break;
                }
@@ -1046,7 +1080,7 @@
             
             // This is a sanity check about the loading code itself.
             // If this checkSize doesn't match, it means the reading method is not doing what it was supposed to do
-            if (checkSize != variableSize + recordSize  + preparedTransactionDataSize)
+            if (checkSize != variableSize + recordSize  + preparedTransactionExtraDataSize)
             {
                throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
             }
@@ -1113,6 +1147,7 @@
       else
       {
          currentFile = freeFiles.remove();
+         
          openFile(currentFile);
       }
       
@@ -1124,7 +1159,7 @@
          {
             log.warn("Uncommitted transaction with id " + transaction.transactionID + " found and discarded");
             
-            JournalTransaction transactionInfo = this.transactionInfos.get(transaction.transactionID);
+            JournalTransaction transactionInfo = transactionInfos.get(transaction.transactionID);
             
             if (transactionInfo == null)
             {
@@ -1139,7 +1174,7 @@
          }
          else
          {
-            PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.xidData);
+            PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
             
             info.records.addAll(transaction.recordInfos);
             
@@ -1163,8 +1198,7 @@
    
    // TestableJournal implementation --------------------------------------------------------------
    
-   
-   public void setAutoReclaim(boolean autoReclaim)
+   public void setAutoReclaim(final boolean autoReclaim)
    {
       this.autoReclaim = autoReclaim;
    }
@@ -1348,6 +1382,8 @@
 
    // Add/update methods only used on testcases (using a byte[]). Those methods are now part of the Test interface ------------------------
    
+   //FIXME - why have these methods?? Why not just use the normal public interface methods
+   //They should be removed
    
    public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
    {
@@ -1384,7 +1420,7 @@
          throw new IllegalStateException("Journal is not stopped");
       }
       
-      this.filesExecutor =  Executors.newSingleThreadExecutor();
+      filesExecutor =  Executors.newSingleThreadExecutor();
       
       state = STATE_STARTED;
    }
@@ -1402,9 +1438,10 @@
       }
       
       filesExecutor.shutdown();
-      while (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
+      
+      if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
       {
-         log.warn("Couldn't stop Journal after 60 seconds", new Exception ("Warning: Couldn't stop journal after 60 Seconds"));
+         log.warn("Couldn't stop journal executor after 60 seconds");
       }
       
       for (JournalFile file: openedFiles)
@@ -1469,6 +1506,9 @@
       
       // This line aways throws a compilation-warning, even thought is a completely legal operation.
       // We could remove the SuppressWarnings as soon as we find better expression on this line:
+      
+      //TODO sure - if something throws a warning doesn't mean it's illegal - if it were illegal it wouldn't compile!
+      
       Pair<Integer, Integer> values[] = (Pair<Integer, Integer> [])new Pair[numberOfFiles];
       
       for (int i = 0; i < numberOfFiles; i++)
@@ -1485,6 +1525,9 @@
     * <p> We record a summary about the records on the journal file on COMMIT and PREPARE. 
     *     When we load the records we build a new summary and we check the original summary to the current summary.
     *     This method is basically verifying if the entire transaction is being loaded </p> 
+    *     
+    *     Summary means what? More info please
+    *     
     * @param journalTransaction
     * @param orderedFiles
     * @param recordedSummary
@@ -1495,8 +1538,7 @@
                                           final Pair<Integer, Integer>[] recordedSummary)
    {
       boolean healthy = true;
-      
-      
+            
       // (I) First we get the summary of what we really have on the files now:
       
       // FileID, NumberOfElements
@@ -1527,6 +1569,7 @@
             if (found)
             {
                healthy = false;
+               
                break;
             }
          }
@@ -1574,7 +1617,8 @@
       ByteBuffer bb = newBuffer(size); 
       
       bb.put(recordType);    
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+     // bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1); // skip ID part
       bb.putLong(txID);
 
       bb.putInt(tx.getElementsSummary().size());
@@ -1586,7 +1630,7 @@
       }
       
       bb.putInt(size);           
-      bb.rewind();
+      bb.rewind(); //Is this necessary?
       
       return bb;
    }
@@ -1603,7 +1647,8 @@
       ByteBuffer bb = newBuffer(size);
 
       bb.put(recordType);
-      bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putInt(-1); // skip ID part
       bb.putLong(txID);
       bb.putInt(xidSize);
       bb.putInt(tx.getElementsSummary().size());
@@ -1616,7 +1661,7 @@
       }
 
       bb.putInt(size);
-      bb.rewind();
+      bb.rewind();  //Is this necessary?
 
       return bb;
    }
@@ -1720,6 +1765,7 @@
       }
       
       Collections.sort(orderedFiles, new JournalFileComparator());
+      
       return orderedFiles;
    }
    
@@ -1729,17 +1775,24 @@
    private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
    {      
       int size = bb.limit();
+      
       checkFile(size);
+      
       bb.position(SIZE_BYTE);
+      
       if (currentFile == null)
       {
-         throw new Exception ("Current file = null");
+         throw new IllegalStateException("Current file = null");
       }
+      
       bb.putInt(currentFile.getOrderingID());
+      
       bb.rewind();
+      
       if (callback != null)
       {
          currentFile.getFile().write(bb, callback);
+         
          if (sync)
          {
             callback.waitCompletion();
@@ -1749,7 +1802,9 @@
       {
          currentFile.getFile().write(bb, sync);       
       }
+      
       currentFile.extendOffset(size);
+      
       return currentFile;
    }
    
@@ -1766,7 +1821,7 @@
       
       String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
       
-      if (trace) trace("Creating file " + fileName);
+      if (trace) { trace("Creating file " + fileName); }
       
       SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO);
       
@@ -1797,8 +1852,11 @@
    private void openFile(final JournalFile file) throws Exception
    {
       file.getFile().open();
+      
       file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
+      
       file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
+      
       if (this.reuseBufferSize > 0)
       {
          file.getFile().setBufferCallback(buffersControl.callback);
@@ -1925,7 +1983,7 @@
       
    private void closeFile(final JournalFile file)
    {
-      this.filesExecutor.execute(new Runnable() { public void run()
+      filesExecutor.execute(new Runnable() { public void run()
       {
          try
          {
@@ -1949,6 +2007,7 @@
          tx = new JournalTransaction();
          
          JournalTransaction trans = transactionInfos.putIfAbsent(txID, tx);
+         
          if (trans != null)
          {
             tx = trans;
@@ -1967,7 +2026,9 @@
          if (callback == null)
          {
             callback = new TransactionCallback();
+            
             TransactionCallback callbackCheck = transactionCallbacks.putIfAbsent(transactionId, callback);
+            
             if (callbackCheck != null)
             {
                callback = callbackCheck;
@@ -1980,6 +2041,7 @@
          }
          
          callback.countUp();
+         
          return callback;
       }
       else
@@ -1988,17 +2050,13 @@
       }
    }
    
-   public ByteBuffer newBuffer(int size)
+   public ByteBuffer newBuffer(final int size)
    {
-      return this.buffersControl.newBuffer(size);
+      return buffersControl.newBuffer(size);
    }
 
-   // ------------------------------------------------------------------------------------
-   
-   
    // Inner classes ---------------------------------------------------------------------------
-   
-   
+      
    // Just encapsulates the VariableLatch waiting for transaction completions
    // Used if the SequentialFile supports Callbacks
    private static class TransactionCallback implements IOCallback
@@ -2032,7 +2090,9 @@
       public void onError(final int errorCode, final String errorMessage)
       {
          this.errorMessage = errorMessage;
+         
          this.errorCode = errorCode;
+         
          countLatch.down();
       }
       
@@ -2077,7 +2137,6 @@
          }
       }
    }
-
    
    /** Class that will control buffer-reuse */
    class ReuseBuffersController
@@ -2091,14 +2150,16 @@
       
       final BufferCallback callback = new LocalBufferCallback();
 
-      public  ByteBuffer newBuffer(int size)
+      public ByteBuffer newBuffer(final int size)
       {
          // if a new buffer wasn't requested in 10 seconds, we clear the queue
          // This is being done this way as we don't need another Timeout Thread just to cleanup this
          if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
          {
             log.debug("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
+            
             bufferReuseLastTime = System.currentTimeMillis();
+            
             reuseBuffers.clear();
          }
          
@@ -2109,22 +2170,24 @@
          }
          else
          {
-
             // We need to allocate buffers following the rules of the storage being used (AIO/NIO)
             int alignedSize = fileFactory.calculateBlockSize(size);
 
             // Try getting a buffer from the queue...
             ByteBuffer buffer = this.reuseBuffers.poll();
+            
             if (buffer == null)
             {
                // if empty create a new one.
                buffer = fileFactory.newBuffer(reuseBufferSize);
+               
                buffer.limit(alignedSize);
             }
             else
             {
                // set the limit of the buffer to the size being required 
                buffer.limit(alignedSize);
+               
                fileFactory.clearBuffer(buffer);
             }
             
@@ -2136,23 +2199,20 @@
       
       private class LocalBufferCallback implements BufferCallback
       {
-
          public void bufferDone(ByteBuffer buffer)
          {
             bufferReuseLastTime = System.currentTimeMillis();
+            
             // If a buffer has any other than the configured size, the buffer will be just sent to GC
             // This could happen if 
             if (buffer.capacity() == reuseBufferSize)
             {
                reuseBuffers.offer(buffer);
             }
-         }
-         
-      }
+         }         
+      }      
+   }
       
-   }
-   
-   
    private class JournalTransaction
    {
       private List<Pair<JournalFile, Long>> pos;
@@ -2161,13 +2221,13 @@
       
       private Set<JournalFile> transactionPos;
       
-      // Number of elements participating on the transaction
+      // Map of file id to number of elements participating on the transaction in that file
       // Used to verify completion on reload
-      private final Map<Integer, AtomicInteger> numberOfElements = new HashMap<Integer, AtomicInteger>();
+      private final Map<Integer, AtomicInteger> numberOfElementsPerFile = new HashMap<Integer, AtomicInteger>();
       
       public Map<Integer, AtomicInteger> getElementsSummary()
       {
-         return numberOfElements;
+         return numberOfElementsPerFile;
       }
 
       public void addPositive(final JournalFile file, final long id)
@@ -2290,12 +2350,12 @@
       
       private AtomicInteger getCounter(final JournalFile file)
       {
-         AtomicInteger value = numberOfElements.get(file.getOrderingID());
+         AtomicInteger value = numberOfElementsPerFile.get(file.getOrderingID());
          
          if (value == null)
          {
             value = new AtomicInteger();
-            numberOfElements.put(file.getOrderingID(), value);            
+            numberOfElementsPerFile.put(file.getOrderingID(), value);            
          }
          
          return value;
@@ -2303,9 +2363,10 @@
       
    }
    
+   
+   //FIXME: This class should be removed it is only used by methods called by tests - move it to the test not the core code !!
    private static class ByteArrayEncoding implements EncodingSupport
    {
-
       final byte[] data;
       
       ByteArrayEncoding(final byte[] data)

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-09-09 22:41:33 UTC (rev 4924)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-09-10 09:29:43 UTC (rev 4925)
@@ -22,14 +22,13 @@
 
 package org.jboss.messaging.core.journal.impl;
 
-import org.jboss.messaging.core.journal.RecordInfo;
-
-import javax.transaction.xa.Xid;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.jboss.messaging.core.journal.RecordInfo;
+
 /**
  * 
  * A TransactionHolder
@@ -56,6 +55,6 @@
    
    public boolean invalid;
 
-   public byte[] xidData;
+   public byte[] extraData;
    
 }




More information about the jboss-cvs-commits mailing list