[jboss-cvs] JBoss Messaging SVN: r4722 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 24 08:39:56 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-24 08:39:56 -0400 (Thu, 24 Jul 2008)
New Revision: 4722

Added:
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
Log:
Journal work (dealing with transaction timeouts and Journal Reload)

Modified: trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java	2008-07-24 12:37:05 UTC (rev 4721)
+++ trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java	2008-07-24 12:39:56 UTC (rev 4722)
@@ -36,4 +36,14 @@
    void updateRecord(RecordInfo info);
    
    void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
+   
+   /** 
+    * 
+    * This may happen in a rare situation where a transaction commit timed out on AIO,
+    * And right after that a rollback was fired but the previous transaction was completed when the TransactionCallback was already forgotten.
+    * 
+    * This is because libaio's forget method is not working, so we have to come up with this "hack"
+    * 
+    * */
+   void restart();
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-24 12:37:05 UTC (rev 4721)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-24 12:39:56 UTC (rev 4722)
@@ -575,7 +575,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }		
       
-      JournalTransaction tx = transactionInfos.remove(txID);
+      JournalTransaction tx = transactionInfos.get(txID);
       
       if (tx == null)
       {
@@ -584,6 +584,7 @@
       
       JournalFile usedFile = writeTransaction(COMMIT_RECORD, txID, tx);
       
+      transactionInfos.remove(txID);
       transactionCallbacks.remove(txID);
       
       tx.commit(usedFile);
@@ -597,7 +598,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
       
-      JournalTransaction tx = transactionInfos.remove(txID);
+      JournalTransaction tx = transactionInfos.get(txID);
       
       if (tx == null)
       {
@@ -616,6 +617,7 @@
       
       JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
       
+      transactionInfos.remove(txID);
       transactionCallbacks.remove(txID);
       
       tx.rollback(usedFile);
@@ -652,6 +654,12 @@
          {
             recordsToDelete.add(id);
          }
+
+         public void restart()
+         {
+            recordsToDelete.clear();
+            records.clear();            
+         }
          
       });
       
@@ -675,393 +683,440 @@
          throw new IllegalStateException("Journal must be in started state");
       }
       
-      Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+      boolean fileConsistent = true;
       
-      List<JournalFile> orderedFiles = orderFiles();
+      Map<Long, TransactionHolder> transactions = null;
       
       int lastDataPos = -1;
-      
+
+      long maxMessageID = -1;
+
       long maxTransactionID = -1;
       
-      long maxMessageID = -1;
-      
-      for (JournalFile file: orderedFiles)
-      {  
-         file.getFile().open();
+      HashSet<Long> commitsToForget = new HashSet<Long>();
+      HashSet<Long> performedCommits = new HashSet<Long>();
+
+      do
+      {
+
+         if (!fileConsistent)
+         {
+            loadManager.restart();
+         }
          
-         ByteBuffer bb = fileFactory.newBuffer(fileSize);
+         fileConsistent = true;
          
-         int bytesRead = file.getFile().read(bb);
+         performedCommits.clear();
          
-         if (bytesRead != fileSize)
-         {
-            //deal with this better
-            
-            throw new IllegalStateException("File is wrong size " + bytesRead +
-                  " expected " + fileSize + " : " + file.getFile().getFileName());
-         }
+         dataFiles.clear();
+         freeFiles.clear();
+         currentFile = null;
          
-         //First long is the ordering timestamp, we just jump its position
-         bb.position(file.getFile().calculateBlockStart(SIZE_HEADER));
+         transactions = new LinkedHashMap<Long, TransactionHolder>();
          
-         boolean hasData = false;
+         List<JournalFile> orderedFiles = orderFiles();
          
-         while (bb.hasRemaining())
-         {
-            final int pos = bb.position();
+         lastDataPos = -1;
+         
+         maxTransactionID = -1;
+         
+         maxMessageID = -1;
+         
+         for (JournalFile file: orderedFiles)
+         {  
+            file.getFile().open();
             
-            byte recordType = bb.get();
-            if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
-            {
-               if (trace)
-               {
-                  log.trace("Invalid record type at " + bb.position() + " file:" + file);
-               }
-               continue;
-            }
-
-            if (bb.position() + SIZE_INT > fileSize)
-            {
-               continue;
-            }
-
-            int readFileId = bb.getInt();
+            ByteBuffer bb = fileFactory.newBuffer(fileSize);
             
-            if (readFileId != file.getOrderingID())
+            int bytesRead = file.getFile().read(bb);
+            
+            if (bytesRead != fileSize)
             {
-               // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
-               hasData = true;
-
-               bb.position(pos + 1);
-               //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
-               continue;
+               //deal with this better
+               
+               throw new IllegalStateException("File is wrong size " + bytesRead +
+                     " expected " + fileSize + " : " + file.getFile().getFileName());
             }
             
-            long transactionID = 0;
+            //First long is the ordering timestamp, we just jump its position
+            bb.position(file.getFile().calculateBlockStart(SIZE_HEADER));
             
-            if (isTransaction(recordType))
-            {
-               if (bb.position() + SIZE_LONG > fileSize)
-               {
-                  continue;
-               }
-               transactionID = bb.getLong();
-               maxTransactionID = Math.max(maxTransactionID, transactionID); 
-            }
+            boolean hasData = false;
             
-            long recordID = 0;
-            if (!isCompleteTransaction(recordType))
+            while (bb.hasRemaining())
             {
-               if (bb.position() + SIZE_LONG > fileSize)
+               final int pos = bb.position();
+               
+               byte recordType = bb.get();
+               if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
                {
+                  if (trace)
+                  {
+                     log.trace("Invalid record type at " + bb.position() + " file:" + file);
+                  }
                   continue;
                }
-               recordID = bb.getLong();
-               maxMessageID = Math.max(maxMessageID, recordID);
-            }
-            
-            // The variable record portion used on Updates and Appends
-            int variableSize = 0;
-            byte userRecordType = 0;
-            byte record[] = null;
-            
-            if (isContainsBody(recordType))
-            {
+   
                if (bb.position() + SIZE_INT > fileSize)
                {
                   continue;
                }
+   
+               int readFileId = bb.getInt();
                
-               variableSize = bb.getInt();
-               
-               if (bb.position() + variableSize > fileSize)
+               if (readFileId != file.getOrderingID())
                {
+                  // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+                  hasData = true;
+   
+                  bb.position(pos + 1);
+                  //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
                   continue;
                }
                
-               userRecordType = bb.get();
+               long transactionID = 0;
                
-               record = new byte[variableSize];
-               bb.get(record);
-            }
-            
-            if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
-            {
-               variableSize = bb.getInt() * SIZE_INT * 2;
-            }
-            
-            int recordSize = getRecordSize(recordType);
-            
-            if (pos + recordSize + variableSize > fileSize)
-            {
-               continue;
-            }
-            
-            int oldPos = bb.position();
-            
-            bb.position(pos + variableSize + recordSize - SIZE_INT);
-            
-            int checkSize = bb.getInt();
-            
-            if (checkSize != variableSize + recordSize)
-            {
-               log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
-               // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
-               hasData = true;
-               bb.position(pos + SIZE_BYTE);
-               continue;
-            }
-            
-            bb.position(oldPos);
-            
-            switch(recordType)
-            {
-               case ADD_RECORD:
-               {                          
-                  loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
-                  
-                  posFilesMap.put(recordID, new PosFiles(file));
-                  
-                  hasData = true;                  
-
-                  break;
-               }                             
-               case UPDATE_RECORD:                 
+               if (isTransaction(recordType))
                {
-                  loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
-                  hasData = true;      
-                  file.incPosCount();
-                  
-                  PosFiles posFiles = posFilesMap.get(recordID);
-                  
-                  if (posFiles != null)
+                  if (bb.position() + SIZE_LONG > fileSize)
                   {
-                     //It's legal for this to be null. The file(s) with the  may have been deleted
-                     //just leaving some updates in this file
-                     
-                     posFiles.addUpdateFile(file);
+                     continue;
                   }
-                  
-                  break;
-               }              
-               case DELETE_RECORD:                 
+                  transactionID = bb.getLong();
+                  maxTransactionID = Math.max(maxTransactionID, transactionID); 
+               }
+               
+               long recordID = 0;
+               if (!isCompleteTransaction(recordType))
                {
-                  loadManager.deleteRecord(recordID);
-                  hasData = true;
-                  
-                  PosFiles posFiles = posFilesMap.remove(recordID);
-                  
-                  if (posFiles != null)
+                  if (bb.position() + SIZE_LONG > fileSize)
                   {
-                     posFiles.addDelete(file);
-                  }                    
-                  
-                  break;
-               }              
-               case ADD_RECORD_TX:
-               case UPDATE_RECORD_TX:
-               {              
-                  TransactionHolder tx = transactions.get(transactionID);
-                  
-                  if (tx == null)
-                  {
-                     tx = new TransactionHolder(transactionID);                        
-                     transactions.put(transactionID, tx);
+                     continue;
                   }
-                  
-                  tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));                     
-                  
-                  JournalTransaction tnp = transactionInfos.get(transactionID);
-                  
-                  if (tnp == null)
+                  recordID = bb.getLong();
+                  maxMessageID = Math.max(maxMessageID, recordID);
+               }
+               
+               // The variable record portion used on Updates and Appends
+               int variableSize = 0;
+               byte userRecordType = 0;
+               byte record[] = null;
+               
+               if (isContainsBody(recordType))
+               {
+                  if (bb.position() + SIZE_INT > fileSize)
                   {
-                     tnp = new JournalTransaction();
-                     
-                     transactionInfos.put(transactionID, tnp);
+                     continue;
                   }
                   
-                  tnp.addPositive(file, recordID);
+                  variableSize = bb.getInt();
                   
-                  hasData = true;                                          
-                  
-                  break;
-               }     
-               case DELETE_RECORD_TX:
-               {              
-                  TransactionHolder tx = transactions.get(transactionID);
-                  
-                  if (tx == null)
+                  if (bb.position() + variableSize > fileSize)
                   {
-                     tx = new TransactionHolder(transactionID);                        
-                     transactions.put(transactionID, tx);
+                     continue;
                   }
                   
-                  tx.recordsToDelete.add(recordID);                     
+                  userRecordType = bb.get();
                   
-                  JournalTransaction tnp = transactionInfos.get(transactionID);
-                  
-                  if (tnp == null)
+                  record = new byte[variableSize];
+                  bb.get(record);
+               }
+               
+               if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+               {
+                  variableSize = bb.getInt() * SIZE_INT * 2;
+               }
+               
+               int recordSize = getRecordSize(recordType);
+               
+               if (pos + recordSize + variableSize > fileSize)
+               {
+                  continue;
+               }
+               
+               int oldPos = bb.position();
+               
+               bb.position(pos + variableSize + recordSize - SIZE_INT);
+               
+               int checkSize = bb.getInt();
+               
+               if (checkSize != variableSize + recordSize)
+               {
+                  log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
+                  // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+                  hasData = true;
+                  bb.position(pos + SIZE_BYTE);
+                  continue;
+               }
+               
+               bb.position(oldPos);
+               
+               switch(recordType)
+               {
+                  case ADD_RECORD:
+                  {                          
+                     loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+                     
+                     posFilesMap.put(recordID, new PosFiles(file));
+                     
+                     hasData = true;                  
+   
+                     break;
+                  }                             
+                  case UPDATE_RECORD:                 
                   {
-                     tnp = new JournalTransaction();
+                     loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+                     hasData = true;      
+                     file.incPosCount();
                      
-                     transactionInfos.put(transactionID, tnp);
-                  }
-                  
-                  tnp.addNegative(file, recordID);
-                  
-                  hasData = true;                     
-                  
-                  break;
-               }  
-               case PREPARE_RECORD:
-               {
-                  TransactionHolder tx = transactions.get(transactionID);
-                  
-                  // We need to read it even if transaction was not found, or the reading checks would fail
-                  // Pair <OrderId, NumberOfElements>
-                  Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
-
-                  if (tx != null)
+                     PosFiles posFiles = posFilesMap.get(recordID);
+                     
+                     if (posFiles != null)
+                     {
+                        //It's legal for this to be null. The file(s) with the  may have been deleted
+                        //just leaving some updates in this file
+                        
+                        posFiles.addUpdateFile(file);
+                     }
+                     
+                     break;
+                  }              
+                  case DELETE_RECORD:                 
                   {
+                     loadManager.deleteRecord(recordID);
+                     hasData = true;
                      
-                     tx.prepared = true;
+                     PosFiles posFiles = posFilesMap.remove(recordID);
                      
-                     JournalTransaction journalTransaction = transactionInfos.get(transactionID);
+                     if (posFiles != null)
+                     {
+                        posFiles.addDelete(file);
+                     }                    
                      
-                     if (journalTransaction == null)
+                     break;
+                  }              
+                  case ADD_RECORD_TX:
+                  case UPDATE_RECORD_TX:
+                  {              
+                     TransactionHolder tx = transactions.get(transactionID);
+                     
+                     if (tx == null)
                      {
-                        throw new IllegalStateException("Cannot find tx " + transactionID);
+                        tx = new TransactionHolder(transactionID);                        
+                        transactions.put(transactionID, tx);
                      }
                      
+                     tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));                     
                      
-                     boolean healthy = checkTransactionHealth(
-                           journalTransaction, orderedFiles, values);
+                     JournalTransaction tnp = transactionInfos.get(transactionID);
                      
-                     if (healthy)
+                     if (tnp == null)
                      {
-                        journalTransaction.prepare(file);
+                        tnp = new JournalTransaction();
+                        
+                        transactionInfos.put(transactionID, tnp);
                      }
-                     else
+                     
+                     tnp.addPositive(file, recordID);
+                     
+                     hasData = true;                                          
+                     
+                     break;
+                  }     
+                  case DELETE_RECORD_TX:
+                  {              
+                     TransactionHolder tx = transactions.get(transactionID);
+                     
+                     if (tx == null)
                      {
-                        log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
-                        journalTransaction.setInvalid(true);
-                        tx.invalid = true;
+                        tx = new TransactionHolder(transactionID);                        
+                        transactions.put(transactionID, tx);
                      }
                      
-                     hasData = true;
-                  }
-                  
-                  break;
-               }
-               case COMMIT_RECORD:
-               {
-                  TransactionHolder tx = transactions.remove(transactionID);
-                  
-                  // We need to read it even if transaction was not found, or the reading checks would fail
-                  // Pair <OrderId, NumberOfElements>
-                  Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
-
-                  if (tx != null)
-                  {
+                     tx.recordsToDelete.add(recordID);                     
                      
-                     JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
+                     JournalTransaction tnp = transactionInfos.get(transactionID);
                      
-                     if (journalTransaction == null)
+                     if (tnp == null)
                      {
-                        throw new IllegalStateException("Cannot find tx " + transactionID);
+                        tnp = new JournalTransaction();
+                        
+                        transactionInfos.put(transactionID, tnp);
                      }
-
-                     boolean healthy = checkTransactionHealth(
-                           journalTransaction, orderedFiles, values);
                      
+                     tnp.addNegative(file, recordID);
                      
-                     if (healthy)
+                     hasData = true;                     
+                     
+                     break;
+                  }  
+                  case PREPARE_RECORD:
+                  {
+                     TransactionHolder tx = transactions.get(transactionID);
+                     
+                     // We need to read it even if transaction was not found, or the reading checks would fail
+                     // Pair <OrderId, NumberOfElements>
+                     Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+   
+                     if (tx != null)
                      {
-                        for (RecordInfo txRecord: tx.recordInfos)
+                        
+                        tx.prepared = true;
+                        
+                        JournalTransaction journalTransaction = transactionInfos.get(transactionID);
+                        
+                        if (journalTransaction == null)
                         {
-                           if (txRecord.isUpdate)
+                           throw new IllegalStateException("Cannot find tx " + transactionID);
+                        }
+                        
+                        
+                        boolean healthy = checkTransactionHealth(
+                              journalTransaction, orderedFiles, values);
+                        
+                        if (healthy)
+                        {
+                           journalTransaction.prepare(file);
+                        }
+                        else
+                        {
+                           log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
+                           journalTransaction.setInvalid(true);
+                           tx.invalid = true;
+                        }
+                        
+                        hasData = true;
+                     }
+                     
+                     break;
+                  }
+                  case COMMIT_RECORD:
+                  {
+                     TransactionHolder tx = transactions.remove(transactionID);
+                     
+                     // We need to read it even if transaction was not found, or the reading checks would fail
+                     // Pair <OrderId, NumberOfElements>
+                     Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+   
+                     if (tx != null)
+                     {
+                        
+                        JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
+                        
+                        if (journalTransaction == null)
+                        {
+                           throw new IllegalStateException("Cannot find tx " + transactionID);
+                        }
+   
+                        boolean healthy = checkTransactionHealth(
+                              journalTransaction, orderedFiles, values);
+                        
+                        
+                        if (commitsToForget.contains(transactionID))
+                        {
+                           log.warn("Transaction being ignored because of a post rollback");
+                           journalTransaction.forget();
+                        }
+                        else
+                        if (healthy)
+                        {
+                           performedCommits.add(transactionID);
+
+                           for (RecordInfo txRecord: tx.recordInfos)
                            {
-                              loadManager.updateRecord(txRecord);
+                              if (txRecord.isUpdate)
+                              {
+                                 loadManager.updateRecord(txRecord);
+                              }
+                              else
+                              {
+                                 loadManager.addRecord(txRecord);
+                              }
                            }
-                           else
+                           
+                           for (Long deleteValue: tx.recordsToDelete)
                            {
-                              loadManager.addRecord(txRecord);
+                              loadManager.deleteRecord(deleteValue);
                            }
+                           journalTransaction.commit(file);  
                         }
-                        
-                        for (Long deleteValue: tx.recordsToDelete)
+                        else
                         {
-                           loadManager.deleteRecord(deleteValue);
+                           log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
+                           journalTransaction.forget();
                         }
-                        journalTransaction.commit(file);       
+                        
+                        hasData = true;         
                      }
-                     else
-                     {
-                        log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
-                        journalTransaction.forget();
-                     }
                      
-                     hasData = true;         
+                     break;
                   }
-                  
-                  break;
-               }
-               case ROLLBACK_RECORD:
-               {
-                  TransactionHolder tx = transactions.remove(transactionID);
-                  
-                  if (tx != null)
-                  {                       
-                     JournalTransaction tnp = transactionInfos.remove(transactionID);
+                  case ROLLBACK_RECORD:
+                  {
+                     TransactionHolder tx = transactions.remove(transactionID);
                      
-                     if (tnp == null)
+                     if (performedCommits.contains(transactionID) && !commitsToForget.contains(transactionID))
                      {
-                        throw new IllegalStateException("Cannot find tx " + transactionID);
+                        log.warn("Transaction " + transactionID + " was rolled back after its commit! Reload will need to be restarted with that transaction being ignored");
+                        commitsToForget.add(transactionID);
+                        fileConsistent = false;
                      }
                      
-                     tnp.rollback(file);  
                      
-                     hasData = true;         
+                     if (tx != null)
+                     {                       
+                        JournalTransaction tnp = transactionInfos.remove(transactionID);
+                        
+                        if (tnp == null)
+                        {
+                           throw new IllegalStateException("Cannot find tx " + transactionID);
+                        }
+                        
+                        tnp.rollback(file);  
+                        
+                        hasData = true;         
+                     }
+                     
+                     break;
                   }
-                  
-                  break;
+                  default:                
+                  {
+                     throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+                           " is corrupt, invalid record type " + recordType);
+                  }
                }
-               default:                
+               
+               checkSize = bb.getInt();
+               
+               if (checkSize != variableSize + recordSize)
                {
-                  throw new IllegalStateException("Journal " + file.getFile().getFileName() +
-                        " is corrupt, invalid record type " + recordType);
+                  throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
                }
+               
+               bb.position(file.getFile().calculateBlockStart(bb.position()));
+               
+               if (recordType != FILL_CHARACTER)
+               {
+                  lastDataPos = bb.position();
+               }
             }
             
-            checkSize = bb.getInt();
+            file.getFile().close();          
             
-            if (checkSize != variableSize + recordSize)
-            {
-               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+            if (hasData)
+            {        
+               dataFiles.add(file);
             }
-            
-            bb.position(file.getFile().calculateBlockStart(bb.position()));
-            
-            if (recordType != FILL_CHARACTER)
-            {
-               lastDataPos = bb.position();
-            }
+            else
+            {           
+               //Empty dataFiles with no data
+               freeFiles.add(file);
+            }                       
          }
-         
-         file.getFile().close();          
-         
-         if (hasData)
-         {        
-            dataFiles.add(file);
-         }
-         else
-         {           
-            //Empty dataFiles with no data
-            freeFiles.add(file);
-         }                       
-      }        
+         transactionIDSequence.set(maxTransactionID + 1);
+      } 
+      while (!fileConsistent);
       
-      transactionIDSequence.set(maxTransactionID + 1);
       
       //Create any more files we need
       

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java	2008-07-24 12:37:05 UTC (rev 4721)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java	2008-07-24 12:39:56 UTC (rev 4722)
@@ -227,6 +227,11 @@
          numberOfUpdates++;
          
       }
+
+      public void restart()
+      {
+         ex = new Exception ("Journal was restarted");
+      }
       
    }
    

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java	2008-07-24 12:37:05 UTC (rev 4721)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java	2008-07-24 12:39:56 UTC (rev 4722)
@@ -107,6 +107,10 @@
          public void updateRecord(RecordInfo info)
          {
          }
+
+         public void restart()
+         {
+         }
       });
       
       

Added: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java	2008-07-24 12:39:56 UTC (rev 4722)
@@ -0,0 +1,235 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.timing.core.journal.impl;
+
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+public class JournalAsyncTimeoutsTest extends UnitTestCase
+{
+   
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   private FakeSequentialFileFactory factory;
+   
+   JournalImpl journalImpl = null;
+   
+   private ArrayList<RecordInfo> records = null;
+   
+   private ArrayList<PreparedTransactionInfo> transactions = null;
+   
+   // Static --------------------------------------------------------
+   
+   private static final Logger log = Logger
+         .getLogger(JournalAsyncTimeoutsTest.class);
+   
+   // Constructors --------------------------------------------------
+   
+   // Public --------------------------------------------------------
+   
+   public void testAsynchronousCommit() throws Exception
+   {
+//      final int JOURNAL_SIZE = 20000;
+//      
+//      setupJournal(JOURNAL_SIZE, 100, 5);
+//      
+//      assertEquals(2, factory.listFiles("tt").size());
+//      
+//      assertEquals(0, records.size());
+//      assertEquals(0, transactions.size());
+//      
+//      for (int i = 0; i < 10 ; i++)
+//      {
+//         journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+//         journalImpl.forceMoveNextFile();
+//      }
+//      
+//      
+//      for (int i = 10; i < 20 ; i++)
+//      {
+//         journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+//         journalImpl.forceMoveNextFile();
+//      }
+//      
+//      journalImpl.forceMoveNextFile();
+//      
+//      journalImpl.appendCommitRecord(1l);
+//      
+   }
+   
+   
+   
+   public void testTransactionTimeoutOnCommit() throws Exception
+   {
+      final int JOURNAL_SIZE = 20000;
+      
+      setupJournal(JOURNAL_SIZE, 1, 5, 1000);
+      
+      assertEquals(5, factory.listFiles("tt").size());
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+
+      factory.setHoldCallbacks(true);
+      
+      for (int i = 0; i < 20; i++)
+      {
+         journalImpl.appendAddRecordTransactional(1l, (long) i, (byte) 0,
+               new SimpleEncoding(1, (byte) 15));
+      }
+      
+      try
+      {
+         journalImpl.appendCommitRecord(1l);
+         fail ("Supposed to timeout");
+      }
+      catch (Exception e)
+      {
+      }
+
+      factory.flushAllCallbacks();
+      
+      factory.setHoldCallbacks(false);
+      
+      journalImpl.appendRollbackRecord(1l);
+      
+      setupJournal(JOURNAL_SIZE, 1, 5, 1000);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, journalImpl.getDataFilesCount());
+   }
+   
+   public void testTransactionTimeoutOnRollback() throws Exception
+   {
+      final int JOURNAL_SIZE = 20000;
+      
+      setupJournal(JOURNAL_SIZE, 1, 5, 1000);
+      
+      assertEquals(5, factory.listFiles("tt").size());
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+
+      factory.setHoldCallbacks(true);
+      
+      for (int i = 0; i < 20; i++)
+      {
+         journalImpl.appendAddRecordTransactional(1l, (long) i, (byte) 0,
+               new SimpleEncoding(1, (byte) 15));
+      }
+      
+      try
+      {
+         journalImpl.appendRollbackRecord(1l);
+         fail ("Supposed to timeout");
+      }
+      catch (Exception e)
+      {
+      }
+
+      factory.flushAllCallbacks();
+      
+      factory.setHoldCallbacks(false);
+      
+      // it shouldn't fail
+      journalImpl.appendRollbackRecord(1l);
+      
+      setupJournal(JOURNAL_SIZE, 1, 5, 1000);
+      
+      assertEquals(0, records.size());
+   }
+
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      records = new ArrayList<RecordInfo>();
+      
+      transactions = new ArrayList<PreparedTransactionInfo>();
+      
+      factory = null;
+      
+      journalImpl = null;
+      
+   }
+   
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      if (journalImpl != null)
+      {
+         try
+         {
+            journalImpl.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+   
+   // Private -------------------------------------------------------
+   private void setupJournal(final int journalSize, final int alignment,
+         final int numberOfMinimalFiles, final int timeout) throws Exception
+   {
+      if (factory == null)
+      {
+         factory = new FakeSequentialFileFactory(alignment, true);
+      }
+      
+      if (journalImpl != null)
+      {
+         journalImpl.stop();
+      }
+      
+      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true,
+            true, factory, "tt", "tt", 1000, timeout);
+      
+      journalImpl.start();
+      
+      records.clear();
+      transactions.clear();
+      
+      journalImpl.load(records, transactions);
+   }
+   
+   // Inner classes -------------------------------------------------
+   
+}




More information about the jboss-cvs-commits mailing list