[jboss-cvs] JBoss Messaging SVN: r7501 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 30 01:52:22 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-30 01:52:22 -0400 (Tue, 30 Jun 2009)
New Revision: 7501

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallback.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
Log:
Concurrent transactions changes

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-30 05:52:22 UTC (rev 7501)
@@ -87,35 +87,79 @@
    {
       return newDataFiles;
    }
-   
+
    public Map<Long, JournalRecord> getNewRecords()
    {
       return newRecords;
    }
-   
+
+   public Map<Long, JournalTransaction> getNewTransactions()
+   {
+      return newTransactions;
+   }
+
    public JournalCompactor(final SequentialFileFactory fileFactory,
                            final JournalImpl journal,
-                           Set<Long> recordsSnapshot,
-                           int firstFileID)
+                           final Set<Long> recordsSnapshot,
+                           final int firstFileID)
    {
       this.fileFactory = fileFactory;
       this.journal = journal;
       this.recordsSnapshot.addAll(recordsSnapshot);
-      this.nextOrderingID = firstFileID;
+      nextOrderingID = firstFileID;
    }
 
    /** This methods informs the Compactor about the existence of a pending (non committed) transaction */
-   public void addPendingTransaction(long transactionID, long ids[])
+   public void addPendingTransaction(final long transactionID, final long ids[])
    {
       pendingTransactions.put(transactionID, new PendingTransaction(transactionID, ids));
    }
 
    /**
     * @param id
+    * @param journalTransaction
+    */
+   public void addCommandCommit(final JournalTransaction liveTransaction, final JournalFile currentFile)
+   {
+      log.info("Adding commit command");
+      pendingCommands.add(new CommitCompactCommand(liveTransaction, currentFile));
+
+      long ids[] = liveTransaction.getPositiveArray();
+
+      PendingTransaction oldTransaction = pendingTransactions.get(liveTransaction.getId());
+      long ids2[] = null;
+
+      if (oldTransaction != null)
+      {
+         ids2 = oldTransaction.pendingIDs;
+      }
+
+      /** If a delete comes for these records, while the compactor still working, we need to be able to take them into account for later deletes
+       *  instead of throwing exceptions about non existent records */
+      for (long id : ids)
+      {
+         recordsSnapshot.add(id);
+      }
+
+      for (long id : ids2)
+      {
+         recordsSnapshot.add(id);
+      }
+   }
+
+   public void addCommandRollback(final JournalTransaction liveTransaction, final JournalFile currentFile)
+   {
+      log.info("Adding rollback command");
+      pendingCommands.add(new RollbackCompactCommand(liveTransaction, currentFile));
+   }
+
+   /**
+    * @param id
     * @param usedFile
     */
-   public void addCommandDelete(long id, JournalFile usedFile)
+   public void addCommandDelete(final long id, final JournalFile usedFile)
    {
+      log.info("Adding delete command");
       pendingCommands.add(new DeleteCompactCommand(id, usedFile));
    }
 
@@ -123,17 +167,18 @@
     * @param id
     * @param usedFile
     */
-   public void addCommandUpdate(long id, JournalFile usedFile)
+   public void addCommandUpdate(final long id, final JournalFile usedFile)
    {
+      log.info("Adding update command");
       pendingCommands.add(new UpdateCompactCommand(id, usedFile));
    }
 
-   public boolean lookupRecord(long id)
+   public boolean lookupRecord(final long id)
    {
       return recordsSnapshot.contains(id);
    }
 
-   private void checkSize(int size) throws Exception
+   private void checkSize(final int size) throws Exception
    {
       if (channelWrapper == null)
       {
@@ -169,6 +214,7 @@
    {
       for (CompactCommand command : pendingCommands)
       {
+         log.info("Replaying " + command.getClass().getName());
          try
          {
             command.execute();
@@ -178,13 +224,13 @@
             log.warn("Error replaying pending commands after compacting", e);
          }
       }
-      
+
       pendingCommands.clear();
    }
 
    // JournalReaderCallback implementation -------------------------------------------
 
-   public void addRecord(RecordInfo info) throws Exception
+   public void onReadAddRecord(final RecordInfo info) throws Exception
    {
       if (recordsSnapshot.contains(info.id))
       {
@@ -203,7 +249,7 @@
       }
    }
 
-   public void addRecordTX(long transactionID, RecordInfo info) throws Exception
+   public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
    {
       if (pendingTransactions.get(transactionID) != null)
       {
@@ -226,11 +272,11 @@
       else
       {
          // Will try it as a regular record, the method addRecord will validate if this is a live record or not
-         addRecord(info);
+         onReadAddRecord(info);
       }
    }
 
-   public void commitRecord(long transactionID, int numberOfRecords) throws Exception
+   public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
    {
       if (pendingTransactions.get(transactionID) != null)
       {
@@ -240,7 +286,7 @@
       }
    }
 
-   public void deleteRecord(long recordID) throws Exception
+   public void onReadDeleteRecord(final long recordID) throws Exception
    {
       if (newRecords.get(recordID) != null)
       {
@@ -250,7 +296,7 @@
 
    }
 
-   public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
+   public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
    {
       if (pendingTransactions.get(transactionID) != null)
       {
@@ -272,12 +318,12 @@
       // else.. nothing to be done
    }
 
-   public void markAsDataFile(JournalFile file)
+   public void markAsDataFile(final JournalFile file)
    {
       // nothing to be done here
    }
 
-   public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+   public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
    {
       if (pendingTransactions.get(transactionID) != null)
       {
@@ -302,7 +348,7 @@
       }
    }
 
-   public void rollbackRecord(long transactionID) throws Exception
+   public void onReadRollbackRecord(final long transactionID) throws Exception
    {
       if (pendingTransactions.get(transactionID) != null)
       {
@@ -312,7 +358,7 @@
       }
    }
 
-   public void updateRecord(RecordInfo info) throws Exception
+   public void onReadUpdateRecord(final RecordInfo info) throws Exception
    {
       if (recordsSnapshot.contains(info.id))
       {
@@ -339,7 +385,7 @@
       }
    }
 
-   public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
+   public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
    {
       if (pendingTransactions.get(transactionID) != null)
       {
@@ -361,7 +407,7 @@
       }
       else
       {
-         updateRecord(info);
+         onReadUpdateRecord(info);
       }
    }
 
@@ -369,7 +415,7 @@
     * @param transactionID
     * @return
     */
-   private JournalTransaction getNewJournalTransaction(long transactionID)
+   private JournalTransaction getNewJournalTransaction(final long transactionID)
    {
       JournalTransaction newTransaction = newTransactions.get(transactionID);
       if (newTransaction == null)
@@ -411,9 +457,8 @@
       long id;
 
       JournalFile usedFile;
-      
-      
-      public DeleteCompactCommand(long id, JournalFile usedFile)
+
+      public DeleteCompactCommand(final long id, final JournalFile usedFile)
       {
          this.id = id;
          this.usedFile = usedFile;
@@ -422,23 +467,24 @@
       @Override
       void execute() throws Exception
       {
+         System.out.println("Deleting command " + id);
          JournalRecord deleteRecord = journal.getRecords().remove(id);
          deleteRecord.delete(usedFile);
       }
    }
-   
+
    private class PendingTransaction
    {
       long transactionID;
+
       long pendingIDs[];
-      
-      
-      PendingTransaction(long transactionID, long ids[])
+
+      PendingTransaction(final long transactionID, final long ids[])
       {
          this.transactionID = transactionID;
-         this.pendingIDs = ids;
+         pendingIDs = ids;
       }
-      
+
    }
 
    private class UpdateCompactCommand extends CompactCommand
@@ -446,15 +492,13 @@
       long id;
 
       JournalFile usedFile;
-      
-      
-      public UpdateCompactCommand(long id, JournalFile usedFile)
+
+      public UpdateCompactCommand(final long id, final JournalFile usedFile)
       {
          this.id = id;
          this.usedFile = usedFile;
       }
 
-      
       @Override
       void execute() throws Exception
       {
@@ -463,13 +507,56 @@
       }
    }
 
-   /**
-    * @param id
-    * @param journalTransaction
-    */
-   public void addCommandCommit(long id, JournalTransaction journalTransaction)
+   private class CommitCompactCommand extends CompactCommand
    {
-      // TODO Auto-generated method stub
-      
+      private final JournalTransaction liveTransaction;
+
+      /** File containing the commit record */
+      private final JournalFile commitFile;
+
+      public CommitCompactCommand(final JournalTransaction liveTransaction, final JournalFile commitFile)
+      {
+         this.liveTransaction = liveTransaction;
+         this.commitFile = commitFile;
+      }
+
+      @Override
+      void execute() throws Exception
+      {
+         JournalTransaction newTransaction = newTransactions.get(liveTransaction.getId());
+         if (newTransaction != null)
+         {
+            liveTransaction.merge(newTransaction);
+            liveTransaction.commit(commitFile);
+         }
+         newTransactions.remove(liveTransaction.getId());
+      }
    }
+
+   private class RollbackCompactCommand extends CompactCommand
+   {
+      private final JournalTransaction liveTransaction;
+
+      /** File containing the commit record */
+      private final JournalFile rollbackFile;
+
+      public RollbackCompactCommand(final JournalTransaction liveTransaction, final JournalFile rollbackFile)
+      {
+         this.liveTransaction = liveTransaction;
+         this.rollbackFile = rollbackFile;
+      }
+
+      @Override
+      void execute() throws Exception
+      {
+         JournalTransaction newTransaction = newTransactions.get(liveTransaction.getId());
+         if (newTransaction != null)
+         {
+            liveTransaction.merge(newTransaction);
+            liveTransaction.rollback(rollbackFile);
+         }
+         newTransactions.remove(liveTransaction.getId());
+      }
+   }
+
 }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-30 05:52:22 UTC (rev 7501)
@@ -266,7 +266,11 @@
       return records;
    }
    
-   
+   public JournalFile getCurrentFile()
+   {
+      return currentFile;
+   }
+
    public JournalCompactor getCompactor()
    {
       return this.compactor;
@@ -864,8 +868,8 @@
 
             for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
             {
+               compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
                entry.getValue().setCompacting();
-               compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
             }
 
             // We will calculate the new records during compacting, what will take the position the records will take after compacting
@@ -905,13 +909,18 @@
 
          List<JournalFile> newDatafiles = null;
 
+         JournalCompactor localCompactor = compactor;
+         
          compactingLock.writeLock().lock();
          try
          {
-            newDatafiles = compactor.getNewDataFiles();
+            // Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
+            this.compactor = null;
 
+            newDatafiles = localCompactor.getNewDataFiles();
+
             // Restore newRecords created during compacting
-            for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.getNewRecords().entrySet())
+            for (Map.Entry<Long, JournalRecord> newRecordEntry : localCompactor.getNewRecords().entrySet())
             {
                records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
             }
@@ -919,17 +928,43 @@
             // Restore compacted dataFiles
             for (int i = newDatafiles.size() - 1; i >= 0; i--)
             {
-               dataFiles.addFirst(newDatafiles.get(i));
+               JournalFile fileToAdd = newDatafiles.get(i);
+               if (trace)
+               {
+                  trace("Adding file " + fileToAdd + " back as datafile");
+               }
+               dataFiles.addFirst(fileToAdd);
             }
+            
 
             // Replay pending commands (including updates, deletes and commits)
             
-            compactor.replayPendingCommands();
+            localCompactor.replayPendingCommands();
+
+            // Merge transactions back after compacting
             
-            // Deal with transactions commits that happened during the compacting
+            for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
+            {
+               if (trace) 
+               {
+                  trace("Merging pending transaction " +  newTransaction + " after compacting to the journal");
+               }
+               JournalTransaction liveTransaction = this.transactions.get(newTransaction.getId());
+               if (liveTransaction == null)
+               {
+                  log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() + " back into JournalTransactions");
+               }
+               else
+               {
+                  liveTransaction.merge(newTransaction);
+               }
+            }
+            
+            for (Map.Entry<Long, JournalRecord> record : records.entrySet())
+            {
+               trace("We have " + record.getKey() + " on the list now");
+            }
 
-            this.compactor = null;
-
          }
          finally
          {
@@ -1107,7 +1142,7 @@
          int resultLastPost = readJournalFile(file, new JournalReaderCallback()
          {
 
-            public void addRecord(RecordInfo info) throws Exception
+            public void onReadAddRecord(RecordInfo info) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1120,7 +1155,7 @@
                records.put(info.id, new JournalRecord(file));
             }
 
-            public void updateRecord(RecordInfo info) throws Exception
+            public void onReadUpdateRecord(RecordInfo info) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1142,7 +1177,7 @@
                }
             }
 
-            public void deleteRecord(long recordID) throws Exception
+            public void onReadDeleteRecord(long recordID) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1160,12 +1195,12 @@
                }
             }
 
-            public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
+            public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception
             {
-               addRecordTX(transactionID, info);
+               onReadAddRecordTX(transactionID, info);
             }
 
-            public void addRecordTX(long transactionID, RecordInfo info) throws Exception
+            public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception
             {
 
                if (trace && LOAD_TRACE)
@@ -1198,7 +1233,7 @@
                tnp.addPositive(file, info.id);
             }
 
-            public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
+            public void onReadDeleteRecordTX(long transactionID, RecordInfo info) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1231,7 +1266,7 @@
 
             }
 
-            public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+            public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1276,7 +1311,7 @@
                }
             }
 
-            public void commitRecord(long transactionID, int numberOfRecords) throws Exception
+            public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1337,7 +1372,7 @@
 
             }
 
-            public void rollbackRecord(long transactionID) throws Exception
+            public void onReadRollbackRecord(long transactionID) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -2006,37 +2041,37 @@
             {
                case ADD_RECORD:
                {
-                  reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+                  reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
                   break;
                }
 
                case UPDATE_RECORD:
                {
-                  reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+                  reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
                   break;
                }
 
                case DELETE_RECORD:
                {
-                  reader.deleteRecord(recordID);
+                  reader.onReadDeleteRecord(recordID);
                   break;
                }
 
                case ADD_RECORD_TX:
                {
-                  reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
                   break;
                }
 
                case UPDATE_RECORD_TX:
                {
-                  reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
                   break;
                }
 
                case DELETE_RECORD_TX:
                {
-                  reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
                   break;
                }
 
@@ -2047,19 +2082,19 @@
 
                   wholeFileBuffer.get(extraData);
 
-                  reader.prepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+                  reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
 
                   break;
                }
                case COMMIT_RECORD:
                {
 
-                  reader.commitRecord(transactionID, transactionCheckNumberOfRecords);
+                  reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
                   break;
                }
                case ROLLBACK_RECORD:
                {
-                  reader.rollbackRecord(transactionID);
+                  reader.onReadRollbackRecord(transactionID);
                   break;
                }
                default:

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallback.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallback.java	2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallback.java	2009-06-30 05:52:22 UTC (rev 7501)
@@ -34,56 +34,56 @@
  */
 public interface JournalReaderCallback
 {
-   void addRecord(RecordInfo info) throws Exception;
+   void onReadAddRecord(RecordInfo info) throws Exception;
 
    /**
     * @param recordInfo
     * @throws Exception 
     */
-   void updateRecord(RecordInfo recordInfo) throws Exception;
+   void onReadUpdateRecord(RecordInfo recordInfo) throws Exception;
 
    /**
     * @param recordID
     */
-   void deleteRecord(long recordID) throws Exception;
+   void onReadDeleteRecord(long recordID) throws Exception;
 
    /**
     * @param transactionID
     * @param recordInfo
     * @throws Exception 
     */
-   void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+   void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
 
    /**
     * @param transactionID
     * @param recordInfo
     * @throws Exception 
     */
-   void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+   void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
 
    /**
     * @param transactionID
     * @param recordInfo
     */
-   void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+   void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
 
    /**
     * @param transactionID
     * @param extraData
     * @param summaryData
     */
-   void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
+   void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
 
    /**
     * @param transactionID
     * @param summaryData
     */
-   void commitRecord(long transactionID, int numberOfRecords) throws Exception;
+   void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception;
 
    /**
     * @param transactionID
     */
-   void rollbackRecord(long transactionID) throws Exception;
+   void onReadRollbackRecord(long transactionID) throws Exception;
 
    public void markAsDataFile(JournalFile file);
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-30 05:52:22 UTC (rev 7501)
@@ -51,8 +51,8 @@
 
    private List<Pair<JournalFile, Long>> neg;
 
-   private long id;
-   
+   private final long id;
+
    // All the files this transaction is touching on.
    // We can't have those files being reclaimed if there is a pending transaction
    private Set<JournalFile> pendingFiles;
@@ -73,6 +73,14 @@
       this.journal = journal;
    }
 
+   /**
+    * @return the id
+    */
+   public long getId()
+   {
+      return id;
+   }
+
    public int getCounter(final JournalFile file)
    {
       return internalgetCounter(file).intValue();
@@ -288,7 +296,7 @@
 
       if (compacting)
       {
-         compactor.addCommandCommit(id, this);
+         compactor.addCommandCommit(this, file);
       }
       else
       {
@@ -299,21 +307,22 @@
             {
                JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
 
-               if (posFiles == null)
+               if (compactor != null && compactor.lookupRecord(trUpdate.b))
                {
-                  posFiles = new JournalImpl.JournalRecord(trUpdate.a);
-
-                  journal.getRecords().put(trUpdate.b, posFiles);
-               }
-               else if (compactor != null && compactor.lookupRecord(trUpdate.b))
-               {
                   // This is a case where the transaction was opened after compacting was started,
                   // but the commit arrived while compacting was working
                   // We need to cache the counter update, so compacting will take the correct files when it is done
                   compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
                }
                else
+               if (posFiles == null)
                {
+                  posFiles = new JournalImpl.JournalRecord(trUpdate.a);
+
+                  journal.getRecords().put(trUpdate.b, posFiles);
+               }
+               else
+               {
                   posFiles.addUpdateFile(trUpdate.a);
                }
             }
@@ -325,6 +334,8 @@
             {
                JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
 
+               System.out.println("Deleting record " + trDelete.b + " posFiles = " + posFiles);
+               
                if (posFiles != null)
                {
                   posFiles.delete(trDelete.a);
@@ -375,20 +386,29 @@
     * */
    public void rollback(final JournalFile file)
    {
-      // Now add negs for the pos we added in each file in which there were
-      // transactional operations
-      // Note that we do this on rollback as we do on commit, since we need
-      // to ensure the file containing
-      // the rollback record doesn't get deleted before the files with the
-      // transactional operations are deleted
-      // Otherwise we may run into problems especially with XA where we are
-      // just left with a prepare when the tx
-      // has actually been rolled back
+      JournalCompactor compactor = journal.getCompactor();
 
-      for (JournalFile jf : pendingFiles)
+      if (compacting && compactor != null)
       {
-         file.incNegCount(jf);
+         compactor.addCommandRollback(this, file);
       }
+      else
+      {
+         // Now add negs for the pos we added in each file in which there were
+         // transactional operations
+         // Note that we do this on rollback as we do on commit, since we need
+         // to ensure the file containing
+         // the rollback record doesn't get deleted before the files with the
+         // transactional operations are deleted
+         // Otherwise we may run into problems especially with XA where we are
+         // just left with a prepare when the tx
+         // has actually been rolled back
+
+         for (JournalFile jf : pendingFiles)
+         {
+            file.incNegCount(jf);
+         }
+      }
    }
 
    /** 
@@ -414,6 +434,11 @@
 
    }
 
+   public String toString()
+   {
+      return "JournalTransaction(" + this.id + ")";
+   }
+
    private AtomicInteger internalgetCounter(final JournalFile file)
    {
       if (lastFile != file)

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java	2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java	2009-06-30 05:52:22 UTC (rev 7501)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
 import org.jboss.messaging.utils.IDGenerator;
+import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
 
 /**
@@ -68,45 +69,52 @@
 
    public void testCompactwithPendingCommit() throws Exception
    {
-      InternalCompactTest(false, false, false, false, true);
+      InternalCompactTest(false, false, false, false, true, false, false);
    }
 
+   public void testCompactwithDelayedCommit() throws Exception
+   {
+      InternalCompactTest(false, false, false, false, true, false, true);
+   }
+
    public void testCompactwithPendingCommitFollowedByDelete() throws Exception
    {
+      InternalCompactTest(false, false, false, false, true, true, false);
    }
-   
-   
+
    public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
    {
-      InternalCompactTest(true, false, true, true, false);
+      InternalCompactTest(true, false, true, true, false, false, false);
    }
 
    public void testCompactwithConcurrentDeletes() throws Exception
    {
-      InternalCompactTest(true, false, false, true, false);
+      InternalCompactTest(true, false, false, true, false, false, false);
    }
 
    public void testCompactwithConcurrentUpdates() throws Exception
    {
-      InternalCompactTest(true, false, true, false, false);
+      InternalCompactTest(true, false, true, false, false, false, false);
    }
 
    public void testCompactWithConcurrentAppend() throws Exception
    {
-      InternalCompactTest(true, true, false, false, false);
+      InternalCompactTest(true, true, false, false, false, false, false);
    }
 
    private void InternalCompactTest(final boolean regularAdd,
                                     final boolean performAppend,
                                     final boolean performUpdate,
                                     final boolean performDelete,
-                                    final boolean pendingTransactions) throws Exception
+                                    final boolean pendingTransactions,
+                                    final boolean deleteTransactRecords,
+                                    final boolean delayCommit) throws Exception
    {
       setup(50, 60 * 1024, true);
 
       ArrayList<Long> liveIDs = new ArrayList<Long>();
 
-      ArrayList<Long> listPendingTransactions = new ArrayList<Long>();
+      ArrayList<Pair<Long, Long>> transactedRecords = new ArrayList<Pair<Long, Long>>();
 
       final CountDownLatch latchDone = new CountDownLatch(1);
       final CountDownLatch latchWait = new CountDownLatch(1);
@@ -164,9 +172,10 @@
       {
          for (long i = 0; i < 100; i++)
          {
-            addTx(transactionID, idGenerator.generateID());
-            updateTx(transactionID, idGenerator.generateID());
-            listPendingTransactions.add(transactionID++);
+            long recordID = idGenerator.generateID();
+            addTx(transactionID, recordID);
+            updateTx(transactionID, recordID);
+            transactedRecords.add(new Pair<Long, Long>(transactionID++, recordID));
          }
       }
 
@@ -224,7 +233,8 @@
 
          for (int i = 0; i < 50; i++)
          {
-            // A Total new transaction (that was created after the compact started) to add new record while compacting is still working
+            // A Total new transaction (that was created after the compact started) to add new record while compacting
+            // is still working
             addTx(transactionID, nextID++);
             commit(transactionID++);
             if (i % 10 == 0)
@@ -245,7 +255,8 @@
             }
             else
             {
-               // A Total new transaction (that was created after the compact started) to update a record that is being compacted
+               // A Total new transaction (that was created after the compact started) to update a record that is being
+               // compacted
                updateTx(transactionID, liveID);
                commit(transactionID++);
             }
@@ -259,55 +270,84 @@
          {
             if (count++ % 2 == 0)
             {
+               System.out.println("Deleting no trans " + liveID );
                delete(liveID);
             }
             else
             {
-               // A Total new transaction (that was created after the compact started) to delete a record that is being compacted
+               System.out.println("Deleting TX " + liveID );
+               // A Total new transaction (that was created after the compact started) to delete a record that is being
+               // compacted
                deleteTx(transactionID, liveID);
                commit(transactionID++);
             }
 
+            System.out.println("Deletes are going into " + ((JournalImpl)journal).getCurrentFile());
          }
       }
 
-      if (pendingTransactions)
+      if (pendingTransactions && !delayCommit)
       {
-         for (long tx : listPendingTransactions)
+         for (Pair<Long, Long> tx : transactedRecords)
          {
-            if (tx % 2 == 0)
+            if (tx.a % 2 == 0)
             {
-               commit(tx);
+               commit(tx.a);
+
+               if (deleteTransactRecords)
+               {
+                  delete(tx.b);
+               }
             }
             else
             {
-               rollback(tx);
+               rollback(tx.a);
             }
          }
       }
 
       /** Some independent adds and updates */
-      for (int i = 0; i < 1000; i++)
-      {
-         long id = idGenerator.generateID();
-         add(id);
-         delete(id);
+//      for (int i = 0; i < 1000; i++)
+//      {
+//         long id = idGenerator.generateID();
+//         add(id);
+//         delete(id);
+//
+//         if (i % 100 == 0)
+//         {
+//            journal.forceMoveNextFile();
+//         }
+//      }
 
-         if (i % 100 == 0)
-         {
-            journal.forceMoveNextFile();
-         }
-      }
-
       journal.forceMoveNextFile();
 
       latchWait.countDown();
 
       t.join();
 
+      if (pendingTransactions && delayCommit)
+      {
+         for (Pair<Long, Long> tx : transactedRecords)
+         {
+            if (tx.a % 2 == 0)
+            {
+               commit(tx.a);
+
+               if (deleteTransactRecords)
+               {
+                  delete(tx.b);
+               }
+            }
+            else
+            {
+               rollback(tx.a);
+            }
+         }
+      }
+
       add(idGenerator.generateID());
 
-      // journal.compact();
+      journal.compact();
 
       stopJournal();
       createJournal();




More information about the jboss-cvs-commits mailing list