[jboss-cvs] JBoss Messaging SVN: r7498 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 29 19:47:11 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-29 19:47:11 -0400 (Mon, 29 Jun 2009)
New Revision: 7498

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Log:
tweaks

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-29 18:36:59 UTC (rev 7497)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-29 23:47:11 UTC (rev 7498)
@@ -28,6 +28,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.jboss.messaging.core.buffers.ChannelBuffer;
 import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -36,6 +37,7 @@
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.JournalImpl.JournalRecord;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.ConcurrentHashSet;
 import org.jboss.messaging.utils.DataConstants;
 import org.jboss.messaging.utils.Pair;
 
@@ -69,45 +71,48 @@
 
    final Map<Long, JournalRecord> recordsSnapshot;
 
-   final Map<Long, JournalTransaction> pendingTransactions;
+   // Snapshot of transactions that were pending when the compactor started
+   final Set<Long> pendingTransactions = new ConcurrentHashSet<Long>();
 
    final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
 
    final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
 
-   final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
+   /** Commands that happened during compacting */
+   final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
 
-   final LinkedList<Pair<Long, JournalFile>> pendingDeletes = new LinkedList<Pair<Long, JournalFile>>();
-
    public JournalCompactor(final SequentialFileFactory fileFactory,
                            final JournalImpl journal,
                            Map<Long, JournalRecord> recordsSnapshot,
-                           Map<Long, JournalTransaction> pendingTransactions,
                            int firstFileID)
    {
       this.fileFactory = fileFactory;
       this.journal = journal;
       this.recordsSnapshot = recordsSnapshot;
       this.nextOrderingID = firstFileID;
-      this.pendingTransactions = pendingTransactions;
    }
 
+   public void addPendingTransaction(long transactionID)
+   {
+      pendingTransactions.add(transactionID);
+   }
+
    /**
     * @param id
     * @param usedFile
     */
-   public void addPendingDelete(long id, JournalFile usedFile)
+   public void addCommandDelete(long id, JournalFile usedFile)
    {
-      pendingDeletes.add(new Pair<Long, JournalFile>(id, usedFile));
+      pendingCommands.add(new DeleteCompactCommand(id, usedFile));
    }
 
    /**
     * @param id
     * @param usedFile
     */
-   public void addPendingUpdate(long id, JournalFile usedFile)
+   public void addCommandUpdate(long id, JournalFile usedFile)
    {
-      pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));
+      pendingCommands.add(new UpdateCompactCommand(id, usedFile));
    }
 
    public boolean lookupRecord(long id)
@@ -130,6 +135,7 @@
       }
    }
 
+   /** Write pending output into file */
    public void flush() throws Exception
    {
       if (channelWrapper != null)
@@ -143,8 +149,28 @@
       channelWrapper = null;
    }
 
+   /**
+    * Replay pending counts that happened during compacting
+    */
+   public void replayPendingCommands()
+   {
+      for (CompactCommand command : pendingCommands)
+      {
+         try
+         {
+            command.execute();
+         }
+         catch (Exception e)
+         {
+            log.warn("Error replaying pending commands after compacting", e);
+         }
+      }
+      
+      pendingCommands.clear();
+   }
+
    // JournalReaderCallback implementation -------------------------------------------
-   
+
    public void addRecord(RecordInfo info) throws Exception
    {
       if (recordsSnapshot.get(info.id) != null)
@@ -166,7 +192,7 @@
 
    public void addRecordTX(long transactionID, RecordInfo info) throws Exception
    {
-      if (pendingTransactions.get(transactionID) != null)
+      if (pendingTransactions.contains(transactionID))
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -193,9 +219,7 @@
 
    public void commitRecord(long transactionID, int numberOfRecords) throws Exception
    {
-      JournalTransaction pendingTx = pendingTransactions.get(transactionID);
-
-      if (pendingTx != null)
+      if (pendingTransactions.contains(transactionID))
       {
          // Sanity check, this should never happen
          throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
@@ -215,7 +239,7 @@
 
    public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
    {
-      if (pendingTransactions.get(transactionID) != null)
+      if (pendingTransactions.contains(transactionID))
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -232,6 +256,7 @@
 
          newTransaction.addNegative(currentFile, info.id);
       }
+      // else.. nothing to be done
    }
 
    public void markAsDataFile(JournalFile file)
@@ -241,7 +266,7 @@
 
    public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
    {
-      if (pendingTransactions.get(transactionID) != null)
+      if (pendingTransactions.contains(transactionID))
       {
 
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -266,7 +291,7 @@
 
    public void rollbackRecord(long transactionID) throws Exception
    {
-      if (pendingTransactions.get(transactionID) != null)
+      if (pendingTransactions.contains(transactionID))
       {
          // Sanity check, this should never happen
          throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
@@ -303,8 +328,7 @@
 
    public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
    {
-
-      if (pendingTransactions.get(transactionID) != null)
+      if (pendingTransactions.contains(transactionID))
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -324,7 +348,6 @@
       }
       else
       {
-
          updateRecord(info);
       }
    }
@@ -343,8 +366,7 @@
       }
       return newTransaction;
    }
-   
-   
+
    /**
     * @throws Exception
     */
@@ -366,6 +388,50 @@
       channelWrapper.writeInt(fileID);
    }
 
-   
+   static abstract class CompactCommand
+   {
+      abstract void execute() throws Exception;
+   }
 
+   class DeleteCompactCommand extends CompactCommand
+   {
+      long id;
+
+      JournalFile usedFile;
+      
+      
+      public DeleteCompactCommand(long id, JournalFile usedFile)
+      {
+         this.id = id;
+         this.usedFile = usedFile;
+      }
+
+      @Override
+      void execute() throws Exception
+      {
+         JournalRecord deleteRecord = journal.getRecords().remove(id);
+         deleteRecord.delete(usedFile);
+      }
+   }
+
+   class UpdateCompactCommand extends CompactCommand
+   {
+      long id;
+
+      JournalFile usedFile;
+      
+      
+      public UpdateCompactCommand(long id, JournalFile usedFile)
+      {
+         this.id = id;
+         this.usedFile = usedFile;
+      }
+
+      @Override
+      void execute() throws Exception
+      {
+         JournalRecord updateRecord = journal.getRecords().get(id);
+         updateRecord.addUpdateFile(usedFile);
+      }
+   }
 }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-29 18:36:59 UTC (rev 7497)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-29 23:47:11 UTC (rev 7498)
@@ -266,6 +266,12 @@
    {
       return records;
    }
+   
+   
+   public JournalCompactor getCompactor()
+   {
+      return this.compactor;
+   }
 
    // Journal implementation
    // ----------------------------------------------------------------
@@ -365,7 +371,7 @@
             // compacting is done
             if (posFiles == null)
             {
-               compactor.addPendingUpdate(id, usedFile);
+               compactor.addCommandUpdate(id, usedFile);
             }
             else
             {
@@ -432,7 +438,7 @@
             // compacting is done
             if (record == null)
             {
-               compactor.addPendingDelete(id, usedFile);
+               compactor.addCommandDelete(id, usedFile);
             }
             else
             {
@@ -857,21 +863,21 @@
             pendingTransactions = JournalImpl.this.pendingTransactions;
             pendingTransactions.putAll(this.pendingTransactions);
 
-            for (Map.Entry<Long, JournalTransaction> entry : pendingTransactions.entrySet())
-            {
-               System.out.println("TransactionID = " + entry.getKey());
-            }
+            this.records = new ConcurrentHashMap<Long, JournalRecord>();
 
-            JournalImpl.this.records = new ConcurrentHashMap<Long, JournalRecord>();
-
-            records = new ConcurrentHashMap<Long, JournalRecord>();
-
             dataFilesToProcess.addAll(dataFiles);
 
             dataFiles.clear();
 
-            this.compactor = new JournalCompactor(fileFactory, this, recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
+            this.compactor = new JournalCompactor(fileFactory, this, recordsSnapshot, dataFilesToProcess.get(0).getFileID());
 
+            for (Map.Entry<Long, JournalTransaction> entry : pendingTransactions.entrySet())
+            {
+               System.out.println("TransactionID = " + entry.getKey());
+               entry.getValue().setCompacting();
+               compactor.addPendingTransaction(entry.getKey());
+            }
+
          }
          finally
          {
@@ -912,28 +918,22 @@
          {
             newDatafiles = compactor.newDataFiles;
 
+            // Restore newRecords created during compacting
             for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.newRecords.entrySet())
             {
                records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
             }
 
+            // Restore compacted dataFiles
             for (int i = newDatafiles.size() - 1; i >= 0; i--)
             {
                dataFiles.addFirst(newDatafiles.get(i));
             }
 
-            for (Pair<Long, JournalFile> pendingRecord : compactor.pendingUpdates)
-            {
-               JournalRecord updateRecord = this.records.get(pendingRecord.a);
-               updateRecord.addUpdateFile(pendingRecord.b);
-            }
-
-            for (Pair<Long, JournalFile> pendingRecord : compactor.pendingDeletes)
-            {
-               JournalRecord deleteRecord = this.records.remove(pendingRecord.a);
-               deleteRecord.delete(pendingRecord.b);
-            }
-
+            // Replay pending commands
+            
+            compactor.replayPendingCommands();
+            
             // Restore relationshipMap
             // Deal with transactions commits that happend during the compacting
             // Deal with updates and deletes that happened during the compacting

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-29 18:36:59 UTC (rev 7497)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-29 23:47:11 UTC (rev 7498)
@@ -78,6 +78,39 @@
       internalgetCounter(file).incrementAndGet();
    }
 
+   public void setCompacting()
+   {
+      // / Compacting is recreating all the previous files and everything
+      // / so we just clear the list of previous files, previous pos and previous adds
+      // / The transaction may be working at the top from now
+
+      if (pendingFiles != null)
+      {
+         pendingFiles.clear();
+      }
+
+      if (callbackList != null)
+      {
+         callbackList.clear();
+      }
+
+      if (pos != null)
+      {
+         pos.clear();
+      }
+
+      if (neg != null)
+      {
+         neg.clear();
+      }
+
+      counter.set(0);
+
+      lastFile = null;
+
+      currentCallback = null;
+   }
+
    /**
     * @param currentFile
     * @param bb
@@ -180,35 +213,50 @@
     * */
    public void commit(final JournalFile file)
    {
+      JournalCompactor compactor = journal.getCompactor();
       if (pos != null)
       {
-         for (Pair<JournalFile, Long> p : pos)
+         for (Pair<JournalFile, Long> trUpdate : pos)
          {
-            JournalImpl.JournalRecord posFiles = journal.getRecords().get(p.b);
+            JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
 
             if (posFiles == null)
             {
-               posFiles = new JournalImpl.JournalRecord(p.a);
+               posFiles = new JournalImpl.JournalRecord(trUpdate.a);
 
-               journal.getRecords().put(p.b, posFiles);
+               journal.getRecords().put(trUpdate.b, posFiles);
             }
+            else if (compactor != null && compactor.lookupRecord(trUpdate.b))
+            {
+               // This is a case where the transaction was opened after compacting was started,
+               // but the commit arrived while compacting was working
+               // We need to cache the counter update, so compacting will take the correct files when it is done
+               compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
+            }
             else
             {
-               posFiles.addUpdateFile(p.a);
+               posFiles.addUpdateFile(trUpdate.a);
             }
          }
       }
 
       if (neg != null)
       {
-         for (Pair<JournalFile, Long> n : neg)
+         for (Pair<JournalFile, Long> trDelete : neg)
          {
-            JournalImpl.JournalRecord posFiles = journal.getRecords().remove(n.b);
+            JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
 
             if (posFiles != null)
             {
-               posFiles.delete(n.a);
+               posFiles.delete(trDelete.a);
             }
+            else if (compactor != null && compactor.lookupRecord(trDelete.b))
+            {
+               // This is a case where the transaction was opened after compacting was started,
+               // but the commit arrived while compacting was working
+               // We need to cache the counter update, so compacting will take the correct files when it is done
+               compactor.addCommandDelete(trDelete.b, trDelete.a);
+            }
          }
       }
 

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-29 18:36:59 UTC (rev 7497)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-29 23:47:11 UTC (rev 7498)
@@ -54,6 +54,10 @@
    // General tests
    // =============
 
+   public void testCrashRenamingFiles() throws Exception
+   {
+   }
+
    public void testCompactwithPendingXACommit() throws Exception
    {
    }
@@ -127,7 +131,6 @@
       if (regularAdd)
       {
 
-
          for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
          {
             add(i);
@@ -205,7 +208,7 @@
 
       if (performAppend)
       {
-         for (int i = 0; i < 100; i++)
+         for (int i = 0; i < 50; i++)
          {
             add(nextID++);
             if (i % 10 == 0)
@@ -213,21 +216,53 @@
                journal.forceMoveNextFile();
             }
          }
+
+         for (int i = 0; i < 50; i++)
+         {
+            // A Total new transaction (that was created after the compact started) to add new record while compacting is still working
+            addTx(transactionID, nextID++);
+            commit(transactionID++);
+            if (i % 10 == 0)
+            {
+               journal.forceMoveNextFile();
+            }
+         }
       }
 
       if (performUpdate)
       {
+         int count = 0;
          for (Long liveID : liveIDs)
          {
-            update(liveID);
+            if (count++ % 2 == 0)
+            {
+               update(liveID);
+            }
+            else
+            {
+               // A Total new transaction (that was created after the compact started) to update a record that is being compacted
+               updateTx(transactionID, liveID);
+               commit(transactionID++);
+            }
          }
       }
 
       if (performDelete)
       {
+         int count = 0;
          for (long liveID : liveIDs)
          {
-            delete(liveID);
+            if (count++ % 2 == 0)
+            {
+               delete(liveID);
+            }
+            else
+            {
+               // A Total new transaction (that was created after the compact started) to delete a record that is being compacted
+               deleteTx(transactionID, liveID);
+               commit(transactionID++);
+            }
+
          }
       }
 
@@ -267,7 +302,7 @@
 
       add(idGenerator.generateID());
 
-      //journal.compact();
+      // journal.compact();
 
       stopJournal();
       createJournal();




More information about the jboss-cvs-commits mailing list