[jboss-cvs] JBoss Messaging SVN: r7486 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Jun 28 00:26:56 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-28 00:26:55 -0400 (Sun, 28 Jun 2009)
New Revision: 7486

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Log:
concurrent deletes (no transactions)

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-28 03:52:47 UTC (rev 7485)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-28 04:26:55 UTC (rev 7486)
@@ -406,7 +406,10 @@
 
          if (record == null)
          {
-            throw new IllegalStateException("Cannot find add info " + id);
+            if (!(compactor != null && compactor.lookupRecord(id)))
+            {
+               throw new IllegalStateException("Cannot find add info " + id);
+            }
          }
 
          int size = SIZE_DELETE_RECORD;
@@ -425,7 +428,15 @@
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
 
-            record.delete(usedFile);
+            if (record == null)
+            {
+               compactor.addPendingDelete(id, usedFile);
+            }
+            else
+            {
+               record.delete(usedFile);
+            }
+
          }
          finally
          {
@@ -908,6 +919,12 @@
                JournalRecord updateRecord = this.records.get(pendingRecord.a);
                updateRecord.addUpdateFile(pendingRecord.b);
             }
+            
+            for (Pair<Long, JournalFile> pendingRecord: compactor.pendingDeletes)
+            {
+               JournalRecord deleteRecord = this.records.remove(pendingRecord.a);
+               deleteRecord.delete(pendingRecord.b);
+            }
 
             // Restore relationshipMap
             // Deal with transactions commits that happend during the compacting
@@ -1031,6 +1048,8 @@
 
       final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
 
+      final LinkedList<Pair<Long, JournalFile>> pendingDeletes = new LinkedList<Pair<Long, JournalFile>>();
+
       public Compactor(Map<Long, JournalRecord> recordsSnapshot,
                        Map<Long, JournalTransaction> pendingTransactions,
                        int firstFileID)
@@ -1044,6 +1063,15 @@
        * @param id
        * @param usedFile
        */
+      public void addPendingDelete(long id, JournalFile usedFile)
+      {
+         pendingDeletes.add(new Pair<Long,JournalFile>(id, usedFile));        
+      }
+
+      /**
+       * @param id
+       * @param usedFile
+       */
       public void addPendingUpdate(long id, JournalFile usedFile)
       {
          pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-28 03:52:47 UTC (rev 7485)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-28 04:26:55 UTC (rev 7486)
@@ -66,21 +66,28 @@
    {
    }
 
+   public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
+   {
+      InternalCompactTest(false, true, true);
+   }
+
+
    public void testCompactwithConcurrentDeletes() throws Exception
    {
+      InternalCompactTest(false, false, true);
    }
 
    public void testCompactwithConcurrentUpdates() throws Exception
    {
-      InternalCompactTest(false, true);
+      InternalCompactTest(false, true, false);
    }
 
    public void testCompactWithConcurrentAppend() throws Exception
    {
-      InternalCompactTest(true, false);
+      InternalCompactTest(true, false, false);
    }
 
-   private void InternalCompactTest(boolean performAppend, boolean performUpdate) throws Exception
+   private void InternalCompactTest(final boolean performAppend, final boolean performUpdate, final boolean performDelete) throws Exception
    {
       setup(50, 60 * 1024, true);
 
@@ -90,6 +97,7 @@
       final CountDownLatch latchWait = new CountDownLatch(1);
       journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
       {
+         @Override
          public void onCompactDone()
          {
             latchDone.countDown();
@@ -151,6 +159,7 @@
 
       Thread t = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -189,6 +198,14 @@
             update(liveID);
          }
       }
+      
+      if (performDelete)
+      {
+         for (long liveID : liveIDs)
+         {
+            delete(liveID);
+         }
+      }
 
       /** Some independent adds and updates */
       for (int i = 0; i < 1000; i++)
@@ -209,7 +226,6 @@
 
       t.join();
 
-      delete(0);
       add(idGenerator.generateID());
 
       journal.compact();
@@ -252,7 +268,7 @@
             long id = idGenerator.generateID();
             ids.add(id);
             add(id);
-            if (i > 0 && (i % 100 == 0))
+            if (i > 0 && i % 100 == 0)
             {
                journal.forceMoveNextFile();
             }




More information about the jboss-cvs-commits mailing list