[jboss-cvs] JBoss Messaging SVN: r4691 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 17 16:47:54 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-17 16:47:53 -0400 (Thu, 17 Jul 2008)
New Revision: 4691

Added:
   trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
Modified:
   trunk/src/main/org/jboss/messaging/core/journal/Journal.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
Log:
More Journal work

Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-07-17 16:02:03 UTC (rev 4690)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-07-17 20:47:53 UTC (rev 4691)
@@ -70,8 +70,11 @@
 	
 	// Load
 	
-	long load(List<RecordInfo> committedRecords,
-			    List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+   long load(List<RecordInfo> committedRecords,
+         List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+
+   long load(LoadManager reloadManager) throws Exception;
+
 	
 	// Start and stop reclaimer
 	

Added: trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java	2008-07-17 20:47:53 UTC (rev 4691)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.journal;
+
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * 
+ */
+public interface LoadManager
+{
+   void addRecord(RecordInfo info);
+   
+   void deleteRecord(long id);
+   
+   void updateRecord(RecordInfo info);
+   
+   void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
+}

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-17 16:02:03 UTC (rev 4690)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-17 20:47:53 UTC (rev 4691)
@@ -49,6 +49,7 @@
 
 import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.LoadManager;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
 import org.jboss.messaging.core.journal.RecordInfo;
 import org.jboss.messaging.core.journal.SequentialFile;
@@ -276,8 +277,8 @@
       
       bb.putByte(ADD_RECORD);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putLong(id);
       bb.putInt(recordLength);
-      bb.putLong(id);
       bb.putByte(recordType);
       record.encode(bb);
       bb.putInt(size);        
@@ -302,8 +303,8 @@
       
       bb.put(ADD_RECORD);
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putLong(id);
       bb.putInt(record.length);     
-      bb.putLong(id);
       bb.put(recordType);
       bb.put(record);		
       bb.putInt(size);			
@@ -334,8 +335,8 @@
       
       bb.put(UPDATE_RECORD);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putLong(id);      
       bb.putInt(record.length);     
-      bb.putLong(id);      
       bb.put(recordType);
       bb.put(record);      
       bb.putInt(size);     
@@ -366,8 +367,8 @@
       
       bb.putByte(UPDATE_RECORD);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putLong(id);      
       bb.putInt(record.getEncodeSize());
-      bb.putLong(id);      
       bb.putByte(recordType);
       record.encode(bb);
       bb.putInt(size);     
@@ -427,10 +428,10 @@
       
       bb.putByte(ADD_RECORD_TX);
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putLong(txID);
+      bb.putLong(id);
       bb.putInt(recordLength);
-      bb.putLong(txID);
       bb.putByte(recordType);
-      bb.putLong(id);
       record.encode(bb);
       bb.putInt(size);     
       bb.rewind();
@@ -455,10 +456,10 @@
       
       bb.put(ADD_RECORD_TX);
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putLong(txID);
+      bb.putLong(id);
       bb.putInt(record.length);
-      bb.putLong(txID);
       bb.put(recordType);
-      bb.putLong(id);
       bb.put(record);
       bb.putInt(size);
       bb.rewind();
@@ -483,10 +484,10 @@
       
       bb.put(UPDATE_RECORD_TX);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putLong(txID);
+      bb.putLong(id);      
       bb.putInt(record.length);     
-      bb.putLong(txID);
       bb.put(recordType);
-      bb.putLong(id);      
       bb.put(record);
       bb.putInt(size);     
       bb.rewind();
@@ -512,10 +513,10 @@
       
       bb.putByte(UPDATE_RECORD_TX);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+      bb.putLong(txID);
+      bb.putLong(id);      
       bb.putInt(record.getEncodeSize());
-      bb.putLong(txID);
       bb.putByte(recordType);
-      bb.putLong(id);      
       record.encode(bb);
       bb.putInt(size);     
       bb.rewind();
@@ -649,61 +650,64 @@
    
    public synchronized long load(final List<RecordInfo> committedRecords,
          final List<PreparedTransactionInfo> preparedTransactions) throws Exception
-         {
-      if (state != STATE_STARTED)
-      {
-         throw new IllegalStateException("Journal must be in started state");
-      }
+   {
       
-      addShutdownHook();
+      final Set<Long> recordsToDelete = new HashSet<Long>();
+      final List<RecordInfo> records = new ArrayList<RecordInfo>();
+
       
-      Set<Long> recordsToDelete = new HashSet<Long>();
-      
-      Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
-      
-      List<RecordInfo> records = new ArrayList<RecordInfo>();
-      
-      List<String> fileNames = fileFactory.listFiles(fileExtension);
-      
-      List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
-      
-      for (String fileName: fileNames)
+      long maxID =  load (new LoadManager()
       {
-         SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
-         
-         file.open();
-         
-         ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
-         
-         file.read(bb);
-         
-         int orderingID = bb.getInt();
-         
-         if (nextOrderingId.get() < orderingID)
+
+         public void addPreparedTransaction(
+               PreparedTransactionInfo preparedTransaction)
          {
-            nextOrderingId.set(orderingID);
+            preparedTransactions.add(preparedTransaction);
          }
+
+         public void addRecord(RecordInfo info)
+         {
+            records.add(info);
+         }
+
+         public void updateRecord(RecordInfo info)
+         {
+            records.add(info);
+         }
+
+         public void deleteRecord(long id)
+         {
+            recordsToDelete.add(id);
+         }
          
-         orderedFiles.add(new JournalFileImpl(file, orderingID));
-         
-         file.close();
-      }
+      });
       
-      //Now order them by ordering id - we can't use the file name for ordering since we can re-use dataFiles
       
-      class JournalFileComparator implements Comparator<JournalFile>
+      for (RecordInfo record: records)
       {
-         public int compare(JournalFile f1, JournalFile f2)
+         if (!recordsToDelete.contains(record.id))
          {
-            int id1 = f1.getOrderingID();
-            int id2 = f2.getOrderingID();
-            
-            return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
+            committedRecords.add(record);
          }
       }
       
-      Collections.sort(orderedFiles, new JournalFileComparator());
+      return maxID;
+   }
+   
+   public synchronized long load (LoadManager loadManager) throws Exception
+   {
       
+      if (state != STATE_STARTED)
+      {
+         throw new IllegalStateException("Journal must be in started state");
+      }
+      
+      addShutdownHook();
+      
+      Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+      
+      List<JournalFile> orderedFiles = orderFiles();
+      
       int lastDataPos = -1;
       
       long maxTransactionID = -1;
@@ -753,12 +757,45 @@
 
             int readFileId = bb.getInt();
             
+            if (readFileId != file.getOrderingID())
+            {
+               bb.position(pos + 1);
+               //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
+               continue;
+            }
+            
+            long transactionID = 0;
+            
+            if (isTransaction(recordType))
+            {
+               if (bb.position() + SIZE_LONG > fileSize)
+               {
+                  continue;
+               }
+               transactionID = bb.getLong();
+               maxTransactionID = Math.max(maxTransactionID, transactionID); 
+               
+            }
+            
+            long recordID = 0;
+            if (!isCompleteTransaction(recordType))
+            {
+               if (bb.position() + SIZE_LONG > fileSize)
+               {
+                  continue;
+               }
+               recordID = bb.getLong();
+               maxMessageID = Math.max(maxMessageID, recordID);
+               
+            }
+            
+            
             // The variable record portion used on Updates and Appends
             int variableSize = 0;
-            // The record size (without the variable portion)
-            int recordSize = 0;
+            byte userRecordType = 0;
+            byte record[] = null;
             
-            if (recordType >= ADD_RECORD && recordType <= UPDATE_RECORD_TX)
+            if (isContainsBody(recordType))
             {
                if (bb.position() + SIZE_INT > fileSize)
                {
@@ -766,43 +803,20 @@
                }
                
                variableSize = bb.getInt();
-            }
-            
-            switch(recordType)
-            {
-               case ADD_RECORD:
-                  recordSize = SIZE_ADD_RECORD;
-                  break;
-               case UPDATE_RECORD:
-                  recordSize = SIZE_UPDATE_RECORD;
-                  break;
-               case ADD_RECORD_TX:
-                  recordSize = SIZE_ADD_RECORD_TX;
-                  break;
-               case UPDATE_RECORD_TX:
-                  recordSize = SIZE_UPDATE_RECORD_TX;
-                  break;
-               case DELETE_RECORD:
-                  recordSize = SIZE_DELETE_RECORD;
-                  break;
-               case DELETE_RECORD_TX:
-                  recordSize = SIZE_DELETE_RECORD_TX;
-                  break;
-               case PREPARE_RECORD:
-                  recordSize = SIZE_PREPARE_RECORD;
-                  break;
-               case COMMIT_RECORD:
-                  recordSize = SIZE_COMMIT_RECORD;
-                  break;
-               case ROLLBACK_RECORD:
-                  recordSize = SIZE_ROLLBACK_RECORD;
-                  break;
-               default:
-                  // Sanity check, this was previously tested, nothing different should be on this switch
-                  throw new IllegalStateException("Record other than expected");
                
+               if (bb.position() + variableSize > fileSize)
+               {
+                  continue;
+               }
+               
+               userRecordType = bb.get();
+               
+               record = new byte[variableSize];
+               bb.get(record);
             }
             
+            int recordSize = getRecordSize(recordType);
+            
             if (pos + recordSize + variableSize > fileSize)
             {
                continue;
@@ -821,53 +835,27 @@
                continue;
             }
             
-            if (readFileId != file.getOrderingID())
-            {
-               //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
-               continue;
-            }
-            
             bb.position(oldPos);
             
-            
-            
             switch(recordType)
             {
                case ADD_RECORD:
                {                          
-                  long id = bb.getLong();  
+                  loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
                   
-                  maxMessageID = Math.max(maxMessageID, id);
+                  posFilesMap.put(recordID, new PosFiles(file));
                   
-                  byte userRecordType = bb.get();
-                  
-                  byte[] record = new byte[variableSize];                 
-                  
-                  bb.get(record);
+                  hasData = true;                  
 
-                  records.add(new RecordInfo(id, userRecordType, record, false));
-                  hasData = true;                  
-                  
-                  posFilesMap.put(id, new PosFiles(file));
-                  
                   break;
                }                             
                case UPDATE_RECORD:                 
                {
-                  long id = bb.getLong();    
-                  
-                  maxMessageID = Math.max(maxMessageID, id);
-                  
-                  byte userRecordType = bb.get();
-                  
-                  byte[] record = new byte[variableSize];                 
-                  bb.get(record);                  
-
-                  records.add(new RecordInfo(id, userRecordType, record, true));                    
+                  loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
                   hasData = true;      
                   file.incPosCount();
                   
-                  PosFiles posFiles = posFilesMap.get(id);
+                  PosFiles posFiles = posFilesMap.get(recordID);
                   
                   if (posFiles != null)
                   {
@@ -881,14 +869,10 @@
                }              
                case DELETE_RECORD:                 
                {
-                  long id = bb.getLong(); 
-                  
-                  maxMessageID = Math.max(maxMessageID, id);
-                  
-                  recordsToDelete.add(id);                     
+                  loadManager.deleteRecord(recordID);
                   hasData = true;
                   
-                  PosFiles posFiles = posFilesMap.remove(id);
+                  PosFiles posFiles = posFilesMap.remove(recordID);
                   
                   if (posFiles != null)
                   {
@@ -898,108 +882,55 @@
                   break;
                }              
                case ADD_RECORD_TX:
+               case UPDATE_RECORD_TX:
                {              
-                  long txID = bb.getLong();                    
-                  maxTransactionID = Math.max(maxTransactionID, txID); 
+                  TransactionHolder tx = transactions.get(transactionID);
                   
-                  byte userRecordType = bb.get();
-                  
-                  long id = bb.getLong();          
-                  maxMessageID = Math.max(maxMessageID, id);
-                  
-                  byte[] record = new byte[variableSize];                 
-                  bb.get(record);                  
-                  
-                  TransactionHolder tx = transactions.get(txID);
-                  
                   if (tx == null)
                   {
-                     tx = new TransactionHolder(txID);                        
-                     transactions.put(txID, tx);
+                     tx = new TransactionHolder(transactionID);                        
+                     transactions.put(transactionID, tx);
                   }
                   
-                  tx.recordInfos.add(new RecordInfo(id, userRecordType, record, false));                     
+                  tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));                     
                   
-                  JournalTransaction tnp = transactionInfos.get(txID);
+                  JournalTransaction tnp = transactionInfos.get(transactionID);
                   
                   if (tnp == null)
                   {
                      tnp = new JournalTransaction();
                      
-                     transactionInfos.put(txID, tnp);
+                     transactionInfos.put(transactionID, tnp);
                   }
                   
-                  tnp.addPositive(file, id);
+                  tnp.addPositive(file, recordID);
                   
                   hasData = true;                                          
                   
                   break;
                }     
-               case UPDATE_RECORD_TX:
-               {              
-                  long txID = bb.getLong();  
-                  maxTransactionID = Math.max(maxTransactionID, txID);
-                  
-                  byte userRecordType = bb.get();
-                  
-                  long id = bb.getLong();
-                  maxMessageID = Math.max(maxMessageID, id);
-                  
-                  byte[] record = new byte[variableSize];                 
-                  bb.get(record);                  
-                  
-                  TransactionHolder tx = transactions.get(txID);
-                  
-                  if (tx == null)
-                  {
-                     tx = new TransactionHolder(txID);                        
-                     transactions.put(txID, tx);
-                  }
-                  
-                  tx.recordInfos.add(new RecordInfo(id, userRecordType, record, true));
-                  
-                  JournalTransaction tnp = transactionInfos.get(txID);
-                  
-                  if (tnp == null)
-                  {
-                     tnp = new JournalTransaction();
-                     
-                     transactionInfos.put(txID, tnp);
-                  }
-                  
-                  tnp.addPositive(file, id);
-                  
-                  hasData = true;                     
-                  
-                  break;
-               }  
                case DELETE_RECORD_TX:
                {              
-                  long txID = bb.getLong();  
-                  maxTransactionID = Math.max(maxTransactionID, txID);                 
-                  long id = bb.getLong(); 
-                  maxMessageID = Math.max(maxMessageID, id);
+                  TransactionHolder tx = transactions.get(transactionID);
                   
-                  TransactionHolder tx = transactions.get(txID);
-                  
                   if (tx == null)
                   {
-                     tx = new TransactionHolder(txID);                        
-                     transactions.put(txID, tx);
+                     tx = new TransactionHolder(transactionID);                        
+                     transactions.put(transactionID, tx);
                   }
                   
-                  tx.recordsToDelete.add(id);                     
+                  tx.recordsToDelete.add(recordID);                     
                   
-                  JournalTransaction tnp = transactionInfos.get(txID);
+                  JournalTransaction tnp = transactionInfos.get(transactionID);
                   
                   if (tnp == null)
                   {
                      tnp = new JournalTransaction();
                      
-                     transactionInfos.put(txID, tnp);
+                     transactionInfos.put(transactionID, tnp);
                   }
                   
-                  tnp.addNegative(file, id);
+                  tnp.addNegative(file, recordID);
                   
                   hasData = true;                     
                   
@@ -1007,25 +938,22 @@
                }  
                case PREPARE_RECORD:
                {
-                  long txID = bb.getLong();
                   int numberOfElements = bb.getInt();
                   
-                  maxTransactionID = Math.max(maxTransactionID, txID);                 
-
-                  TransactionHolder tx = transactions.get(txID);
+                  TransactionHolder tx = transactions.get(transactionID);
                   
                   if (tx == null)
                   {
-                     throw new IllegalStateException("Cannot find tx with id " + txID);
+                     throw new IllegalStateException("Cannot find tx with id " + transactionID);
                   }
                   
                   tx.prepared = true;
                   
-                  JournalTransaction journalTransaction = transactionInfos.get(txID);
+                  JournalTransaction journalTransaction = transactionInfos.get(transactionID);
                   
                   if (journalTransaction == null)
                   {
-                     throw new IllegalStateException("Cannot find tx " + txID);
+                     throw new IllegalStateException("Cannot find tx " + transactionID);
                   }
 
                   if (numberOfElements == journalTransaction.getNumberOfElements())
@@ -1044,31 +972,43 @@
                }
                case COMMIT_RECORD:
                {
-                  long txID = bb.getLong();
                   int numberOfElements = bb.getInt();
                   
-                  maxTransactionID = Math.max(maxTransactionID, txID);
-                  TransactionHolder tx = transactions.remove(txID);
+                  TransactionHolder tx = transactions.remove(transactionID);
                   
                   if (tx != null)
                   {
                      
-                     JournalTransaction tnp = transactionInfos.remove(txID);
+                     JournalTransaction tnp = transactionInfos.remove(transactionID);
                      
                      if (tnp == null)
                      {
-                        throw new IllegalStateException("Cannot find tx " + txID);
+                        throw new IllegalStateException("Cannot find tx " + transactionID);
                      }
                      
                      if (numberOfElements == tnp.getNumberOfElements())
                      {
-                        records.addAll(tx.recordInfos);                    
-                        recordsToDelete.addAll(tx.recordsToDelete);  
+                        for (RecordInfo txRecord: tx.recordInfos)
+                        {
+                           if (txRecord.isUpdate)
+                           {
+                              loadManager.updateRecord(txRecord);
+                           }
+                           else
+                           {
+                              loadManager.addRecord(txRecord);
+                           }
+                        }
+                        
+                        for (Long deleteValue: tx.recordsToDelete)
+                        {
+                           loadManager.deleteRecord(deleteValue);
+                        }
                         tnp.commit(file);       
                      }
                      else
                      {
-                        log.warn("Transaction " + txID + " is missing " + (numberOfElements - tnp.getNumberOfElements()) + " so the transaction is being ignored");
+                        log.warn("Transaction " + transactionID + " is missing " + (numberOfElements - tnp.getNumberOfElements()) + " so the transaction is being ignored");
                         tnp.rollback(file);
                      }
                      
@@ -1079,20 +1019,17 @@
                }
                case ROLLBACK_RECORD:
                {
-                  long txID = bb.getLong();
                   /* int numberOfElements = */ bb.getInt(); // Not being currently used
                   
-                  maxTransactionID = Math.max(maxTransactionID, txID);                 
+                  TransactionHolder tx = transactions.remove(transactionID);
                   
-                  TransactionHolder tx = transactions.remove(txID);
-                  
                   if (tx != null)
                   {                       
-                     JournalTransaction tnp = transactionInfos.remove(txID);
+                     JournalTransaction tnp = transactionInfos.remove(transactionID);
                      
                      if (tnp == null)
                      {
-                        throw new IllegalStateException("Cannot find tx " + txID);
+                        throw new IllegalStateException("Cannot find tx " + transactionID);
                      }
                      
                      tnp.rollback(file);  
@@ -1180,14 +1117,6 @@
       
       pushOpenedFile();
       
-      for (RecordInfo record: records)
-      {
-         if (!recordsToDelete.contains(record.id))
-         {
-            committedRecords.add(record);
-         }
-      }
-      
       for (TransactionHolder transaction: transactions.values())
       {
          if (!transaction.prepared || transaction.invalid)
@@ -1215,15 +1144,15 @@
             
             info.recordsToDelete.addAll(transaction.recordsToDelete);
             
-            preparedTransactions.add(info);
+            loadManager.addPreparedTransaction(info);
          }
       }
       
       state = STATE_LOADED;
       
       return maxMessageID;
-         }
-   
+   }
+
    public int getAlignment() throws Exception
    {
       return this.fileFactory.getAlignment();
@@ -1537,6 +1466,108 @@
    
    // Private -----------------------------------------------------------------------------
    
+   private boolean isTransaction(final byte recordType)
+   {
+      return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX || 
+             recordType == DELETE_RECORD_TX || isCompleteTransaction(recordType);
+   }
+   
+   private boolean isCompleteTransaction(final byte recordType)
+   {
+      return recordType == COMMIT_RECORD || recordType == PREPARE_RECORD || recordType == ROLLBACK_RECORD;  
+   }
+   
+   private boolean isContainsBody(final byte recordType)
+   {
+      return recordType >= ADD_RECORD && recordType <= UPDATE_RECORD_TX;
+   }
+   
+   private int getRecordSize(byte recordType)
+   {
+      // The record size (without the variable portion)
+      int recordSize = 0;
+      switch(recordType)
+      {
+         case ADD_RECORD:
+            recordSize = SIZE_ADD_RECORD;
+            break;
+         case UPDATE_RECORD:
+            recordSize = SIZE_UPDATE_RECORD;
+            break;
+         case ADD_RECORD_TX:
+            recordSize = SIZE_ADD_RECORD_TX;
+            break;
+         case UPDATE_RECORD_TX:
+            recordSize = SIZE_UPDATE_RECORD_TX;
+            break;
+         case DELETE_RECORD:
+            recordSize = SIZE_DELETE_RECORD;
+            break;
+         case DELETE_RECORD_TX:
+            recordSize = SIZE_DELETE_RECORD_TX;
+            break;
+         case PREPARE_RECORD:
+            recordSize = SIZE_PREPARE_RECORD;
+            break;
+         case COMMIT_RECORD:
+            recordSize = SIZE_COMMIT_RECORD;
+            break;
+         case ROLLBACK_RECORD:
+            recordSize = SIZE_ROLLBACK_RECORD;
+            break;
+         default:
+            // Sanity check, this was previously tested, nothing different should be on this switch
+            throw new IllegalStateException("Record other than expected");
+         
+      }
+      return recordSize;
+   }
+
+   private List<JournalFile> orderFiles() throws Exception
+   {
+      List<String> fileNames = fileFactory.listFiles(fileExtension);
+      
+      List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
+      
+      for (String fileName: fileNames)
+      {
+         SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
+         
+         file.open();
+         
+         ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+         
+         file.read(bb);
+         
+         int orderingID = bb.getInt();
+         
+         if (nextOrderingId.get() < orderingID)
+         {
+            nextOrderingId.set(orderingID);
+         }
+         
+         orderedFiles.add(new JournalFileImpl(file, orderingID));
+         
+         file.close();
+      }
+      
+      //Now order them by ordering id - we can't use the file name for ordering since we can re-use dataFiles
+      
+      class JournalFileComparator implements Comparator<JournalFile>
+      {
+         public int compare(JournalFile f1, JournalFile f2)
+         {
+            int id1 = f1.getOrderingID();
+            int id2 = f2.getOrderingID();
+            
+            return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
+         }
+      }
+      
+      Collections.sort(orderedFiles, new JournalFileComparator());
+      return orderedFiles;
+   }
+   
    private void clearShutdownHook()
    {
       if (shutdownHook != null)

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-07-17 16:02:03 UTC (rev 4690)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-07-17 20:47:53 UTC (rev 4691)
@@ -272,6 +272,7 @@
 		messageIDSequence.set(maxMessageID + 1);
 		
 		//TODO - recover prepared transactions
+		//TODO - Use load(ReloadManager) instead of Load(lists)
       
 		for (RecordInfo record: records)
 		{

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java	2008-07-17 16:02:03 UTC (rev 4690)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java	2008-07-17 20:47:53 UTC (rev 4691)
@@ -60,8 +60,8 @@
    {
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD, 
                                                /*FileID*/1, 
+                                               /* ID */14l, 
                                                /*RecordLength*/1, 
-                                               /* ID */14l, 
                                                /*RecordType*/(byte)33, 
                                                /* body */(byte)10, 
                                                JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
@@ -79,8 +79,8 @@
 
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD, 
             /*FileID*/1, 
+            /* ID */14l, 
             /*RecordLength*/1, 
-            /* ID */14l, 
             /*RecordType*/(byte)33, 
             /* body */(byte)10, 
             JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
@@ -98,8 +98,8 @@
    {
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD, 
                                                /*FileID*/1, 
+                                               /* ID */14l, 
                                                /*RecordLength*/1, 
-                                               /* ID */14l, 
                                                /*RecordType*/(byte)33, 
                                                /* body */(byte)10, 
                                                JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
@@ -123,8 +123,8 @@
    {
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD, 
             /*FileID*/1, 
+            /* ID */15l, 
             /*RecordLength*/1, 
-            /* ID */15l, 
             /*RecordType*/(byte)33, 
             /* body */(byte)10, 
             JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
@@ -166,10 +166,10 @@
       EasyMock.expect(
             file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
             /* FileID */1,
+            /* TXID */3l,
+            /* ID */14l,
             /* RecordLength */1,
-            /* TXID */3l,
             /* RecordType */(byte) 33,
-            /* ID */14l,
             /* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
                   EasyMock.eq(false))).andReturn(
             JournalImpl.SIZE_ADD_RECORD_TX + 1);
@@ -177,10 +177,10 @@
       EasyMock.expect(
             file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
             /* FileID */1,
+            /* TXID */3l,
+            /* ID */15l,
             /* RecordLength */1,
-            /* TXID */3l,
             /* RecordType */(byte) 33,
-            /* ID */15l,
             /* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
                   EasyMock.eq(false))).andReturn(
             JournalImpl.SIZE_ADD_RECORD_TX + 1);
@@ -215,10 +215,10 @@
       EasyMock.expect(
             file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
             /* FileID */1,
+            /* TXID */3l,
+            /* ID */14l,
             /* RecordLength */1,
-            /* TXID */3l,
             /* RecordType */(byte) 33,
-            /* ID */14l,
             /* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
                   EasyMock.eq(false))).andReturn(
             JournalImpl.SIZE_ADD_RECORD_TX + 1);
@@ -242,24 +242,24 @@
    {
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD, 
             /* FileID */1, 
+            /* ID */15l, 
             /* RecordLength */1, 
-            /* ID */15l, 
             /* RecordType */(byte)33, 
             /* body */(byte)10, 
             JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
 
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD, 
             /* FileID */1, 
+            /* ID */15l, 
             /* RecordLength */1, 
-            /* ID */15l, 
             /* RecordType */(byte)34, 
             /* body */(byte)11, 
             JournalImpl.SIZE_UPDATE_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_UPDATE_RECORD + 1);
       
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD, 
             /* FileID */1, 
+            /* ID */15l, 
             /* RecordLength */1, 
-            /* ID */15l, 
             /* RecordType */(byte)35, 
             /* body */(byte)12, 
             JournalImpl.SIZE_UPDATE_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_UPDATE_RECORD + 1);
@@ -281,27 +281,27 @@
    {
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD, 
             /* FileID */1, 
+            /* ID */15l, 
             /* RecordLength */1, 
-            /* ID */15l, 
             /* RecordType */(byte)33, 
             /* body */(byte)10, 
             JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
 
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD_TX, 
             /* FileID */1, 
+            /* TransactionID */33l,
+            /* ID */15l, 
             /* RecordLength */1,
-            /* TransactionID */33l,
             /* RecordType */ (byte)34,
-            /* ID */15l, 
             /* body */(byte)11, 
             JournalImpl.SIZE_UPDATE_RECORD_TX + 1)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_UPDATE_RECORD_TX + 1);
       
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD_TX, 
             /* FileID */1, 
+            /* TransactionID */33l,
+            /* ID */15l, 
             /* RecordLength */1,
-            /* TransactionID */33l,
             /* RecordType */ (byte)35,
-            /* ID */15l, 
             /* body */(byte)12, 
             JournalImpl.SIZE_UPDATE_RECORD_TX + 1)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_UPDATE_RECORD_TX + 1);
       




More information about the jboss-cvs-commits mailing list