[jboss-cvs] JBoss Messaging SVN: r7485 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Jun 27 23:52:47 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-27 23:52:47 -0400 (Sat, 27 Jun 2009)
New Revision: 7485

Added:
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Removed:
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java
Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
concurrent updates

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-28 03:00:49 UTC (rev 7484)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-28 03:52:47 UTC (rev 7485)
@@ -30,6 +30,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -343,7 +344,10 @@
 
          if (posFiles == null)
          {
-            throw new IllegalStateException("Cannot find add info " + id);
+            if (!(compactor != null && compactor.lookupRecord(id)))
+            {
+               throw new IllegalStateException("Cannot find add info " + id);
+            }
          }
 
          int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
@@ -359,7 +363,14 @@
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
 
-            posFiles.addUpdateFile(usedFile);
+            if (posFiles == null)
+            {
+               compactor.addPendingUpdate(id, usedFile);
+            }
+            else
+            {
+               posFiles.addUpdateFile(usedFile);
+            }
          }
          finally
          {
@@ -798,7 +809,6 @@
       return maxID;
    }
 
-   
    // Note: This method can't be called from the executor, as it will invoke other methods depending on it.
    public void compact() throws Exception
    {
@@ -838,7 +848,7 @@
             records = new ConcurrentHashMap<Long, JournalRecord>();
 
             dataFilesToProcess.addAll(dataFiles);
-            
+
             dataFiles.clear();
 
             this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
@@ -848,7 +858,7 @@
          {
             compactingLock.writeLock().unlock();
          }
-         
+
          // Read the files, and use the Compactor class to create the new outputFiles, and the new collections as well
          JournalFile previousFile = null;
          for (final JournalFile file : dataFilesToProcess)
@@ -875,29 +885,34 @@
          onCompactDone();
 
          SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.newDataFiles);
-         
-         
+
          List<JournalFile> newDatafiles = null;
 
          compactingLock.writeLock().lock();
          try
          {
-            for ( Map.Entry<Long, JournalRecord> newRecordEntry: compactor.newRecords.entrySet())
+            newDatafiles = compactor.newDataFiles;
+
+            for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.newRecords.entrySet())
             {
                records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
             }
-            
-            for (JournalFile data: compactor.newDataFiles)
+
+            for (int i = newDatafiles.size() - 1; i >= 0; i--)
             {
-               dataFiles.addFirst(data);
+               dataFiles.addFirst(newDatafiles.get(i));
             }
-            
+
+            for (Pair<Long, JournalFile> pendingRecord : compactor.pendingUpdates)
+            {
+               JournalRecord updateRecord = this.records.get(pendingRecord.a);
+               updateRecord.addUpdateFile(pendingRecord.b);
+            }
+
             // Restore relationshipMap
             // Deal with transactions commits that happend during the compacting
             // Deal with updates and deletes that happened during the compacting
 
-            newDatafiles = compactor.newDataFiles;
-            
             this.compactor = null;
 
          }
@@ -1014,6 +1029,8 @@
 
       final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
 
+      final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
+
       public Compactor(Map<Long, JournalRecord> recordsSnapshot,
                        Map<Long, JournalTransaction> pendingTransactions,
                        int firstFileID)
@@ -1023,6 +1040,20 @@
          this.pendingTransactions = pendingTransactions;
       }
 
+      /**
+       * @param id
+       * @param usedFile
+       */
+      public void addPendingUpdate(long id, JournalFile usedFile)
+      {
+         pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));
+      }
+
+      public boolean lookupRecord(long id)
+      {
+         return recordsSnapshot.get(id) != null;
+      }
+
       private void checkSize(int size) throws Exception
       {
          if (channelWrapper == null)

Copied: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java (from rev 7484, branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java)
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	                        (rev 0)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-28 03:52:47 UTC (rev 7485)
@@ -0,0 +1,358 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.jboss.messaging.utils.IDGenerator;
+import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
+
+/**
+ * 
+ * A JournalImplTestBase
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class JournalCompactTest extends JournalImplTestBase
+{
+   private static final Logger log = Logger.getLogger(JournalCompactTest.class);
+
+   protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+   private static final int NUMBER_OF_RECORDS = 1000;
+
+   IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+
+   // General tests
+   // =============
+
+   public void testCompactwithPendingXACommit() throws Exception
+   {
+   }
+
+   public void testCompactwithPendingXAPrepareAndCommit() throws Exception
+   {
+   }
+
+   public void testCompactwithPendingCommit() throws Exception
+   {
+   }
+
+   public void testCompactwithConcurrentDeletes() throws Exception
+   {
+   }
+
+   public void testCompactwithConcurrentUpdates() throws Exception
+   {
+      InternalCompactTest(false, true);
+   }
+
+   public void testCompactWithConcurrentAppend() throws Exception
+   {
+      InternalCompactTest(true, false);
+   }
+
+   private void InternalCompactTest(boolean performAppend, boolean performUpdate) throws Exception
+   {
+      setup(50, 60 * 1024, true);
+
+      ArrayList<Long> liveIDs = new ArrayList<Long>();
+
+      final CountDownLatch latchDone = new CountDownLatch(1);
+      final CountDownLatch latchWait = new CountDownLatch(1);
+      journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+         public void onCompactDone()
+         {
+            latchDone.countDown();
+            System.out.println("Waiting on Compact");
+            try
+            {
+               latchWait.await();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            System.out.println("Done");
+         }
+      };
+      startJournal();
+      load();
+
+      long transactionID = 0;
+
+      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+      {
+         add(i);
+         if (i % 10 == 0 && i > 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         update(i);
+      }
+
+      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+      {
+
+         addTx(transactionID, i);
+         updateTx(transactionID, i);
+         if (i % 10 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         commit(transactionID++);
+         update(i);
+      }
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+      {
+         if (!(i % 10 == 0))
+         {
+            delete(i);
+         }
+         else
+         {
+            liveIDs.add((long)i);
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               journal.compact();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      t.start();
+
+      latchDone.await();
+
+      int nextID = NUMBER_OF_RECORDS;
+
+      if (performAppend)
+      {
+         for (int i = 0; i < 100; i++)
+         {
+            add(nextID++);
+            if (i % 10 == 0)
+            {
+               journal.forceMoveNextFile();
+            }
+         }
+      }
+
+      if (performUpdate)
+      {
+         for (Long liveID : liveIDs)
+         {
+            update(liveID);
+         }
+      }
+
+      /** Some independent adds and updates */
+      for (int i = 0; i < 1000; i++)
+      {
+         long id = idGenerator.generateID();
+         add(id);
+         delete(id);
+
+         if (i % 100 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      latchWait.countDown();
+
+      t.join();
+
+      delete(0);
+      add(idGenerator.generateID());
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   public void testCompactwithConcurrentAppendAndUpdate() throws Exception
+   {
+   }
+
+   public void testCompactWithPendingTransactionAndDelete() throws Exception
+   {
+   }
+
+   public void testCompactingWithPendingTransaction() throws Exception
+   {
+
+   }
+
+   public void testSimpleCompacting() throws Exception
+   {
+      setup(50, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      int NUMBER_OF_RECORDS = 1000;
+
+      // add and remove some data to force reclaiming
+      {
+         ArrayList<Long> ids = new ArrayList<Long>();
+         for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+         {
+            long id = idGenerator.generateID();
+            ids.add(id);
+            add(id);
+            if (i > 0 && (i % 100 == 0))
+            {
+               journal.forceMoveNextFile();
+            }
+         }
+
+         for (Long id : ids)
+         {
+            delete(id);
+         }
+
+         journal.forceMoveNextFile();
+
+         journal.checkAndReclaimFiles();
+      }
+
+      long transactionID = 0;
+
+      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+      {
+         add(i);
+         if (i % 10 == 0 && i > 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         update(i);
+      }
+
+      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+      {
+
+         addTx(transactionID, i);
+         updateTx(transactionID, i);
+         if (i % 10 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         commit(transactionID++);
+         update(i);
+      }
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+      {
+         if (!(i % 10 == 0))
+         {
+            delete(i);
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      System.out.println("Before compact ****************************");
+      System.out.println(journal.debug());
+      System.out.println("*****************************************");
+
+      journal.compact();
+
+      add(idGenerator.generateID());
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   protected int getAlignment()
+   {
+      return 1;
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      File file = new File(journalDir);
+
+      deleteDirectory(file);
+
+      file.mkdir();
+   }
+
+   protected SequentialFileFactory createFactory()
+   {
+      return new NIOSequentialFileFactory(journalDir);
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+    */
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      return createFactory();
+   }
+
+}


Property changes on: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Deleted: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java	2009-06-28 03:00:49 UTC (rev 7484)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java	2009-06-28 03:52:47 UTC (rev 7485)
@@ -1,326 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- * 
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- * 
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.journal.impl;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.IDGenerator;
-import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
-
-/**
- * 
- * A JournalImplTestBase
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class JournalCompactTest extends JournalImplTestBase
-{
-   private static final Logger log = Logger.getLogger(JournalCompactTest.class);
-
-   protected String journalDir = System.getProperty("user.home") + "/journal-test";
-
-   private static final int NUMBER_OF_RECORDS = 1000;
-
-   IDGenerator idGenerator = new TimeAndCounterIDGenerator();
-
-   // General tests
-   // =============
-
-   public void testCompactwithPendingXACommit() throws Exception
-   {
-   }
-
-   public void testCompactwithPendingXAPrepareAndCommit() throws Exception
-   {
-   }
-
-   public void testCompactwithPendingCommit() throws Exception
-   {
-   }
-
-   public void testCompactwithConcurrentDeletes() throws Exception
-   {
-   }
-
-   public void testCompactWithConcurrentAppend() throws Exception
-   {
-      setup(50, 60 * 1024, true);
-
-      final CountDownLatch latchDone = new CountDownLatch(1);
-      final CountDownLatch latchWait = new CountDownLatch(1);
-      journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-         public void onCompactDone()
-         {
-            latchDone.countDown();
-            System.out.println("Waiting on Compact");
-            try
-            {
-               latchWait.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-            System.out.println("Done");
-         }
-      };
-      startJournal();
-      load();
-
-      long transactionID = 0;
-
-      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
-      {
-         add(i);
-         if (i % 10 == 0 && i > 0)
-         {
-            journal.forceMoveNextFile();
-         }
-         update(i);
-      }
-
-      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
-      {
-
-         addTx(transactionID, i);
-         updateTx(transactionID, i);
-         if (i % 10 == 0)
-         {
-            journal.forceMoveNextFile();
-         }
-         commit(transactionID++);
-         update(i);
-      }
-
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
-      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-      {
-         if (!(i % 10 == 0))
-         {
-            delete(i);
-         }
-      }
-
-      journal.forceMoveNextFile();
-
-      Thread t = new Thread()
-      {
-         public void run()
-         {
-            try
-            {
-               journal.compact();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
-
-      t.start();
-
-      latchDone.await();
-
-      int nextID = NUMBER_OF_RECORDS;
-
-      for (int i = 0; i < 100; i++)
-      {
-         add(nextID++);
-         if (i % 10 == 0)
-         {
-            journal.forceMoveNextFile();
-         }
-      }
-
-      latchWait.countDown();
-
-      t.join();
-      
-      
-      for (int i = 0 ; i < 1000; i++)
-      {
-         long id = idGenerator.generateID();
-         add(id);
-         delete(id);
-         
-         if (i % 100 == 0)
-         {
-            journal.forceMoveNextFile();
-         }
-      }
-
-      journal.forceMoveNextFile();
-      
-      delete(0);
-      add(idGenerator.generateID());
-      
-      journal.compact();
-
-      stopJournal();
-      createJournal();
-      startJournal();
-      loadAndCheck();
-
-   }
-
-   public void testCompactwithConcurrentAppendAndUpdate() throws Exception
-   {
-   }
-
-   public void testCompactWithPendingTransactionAndDelete() throws Exception
-   {
-   }
-
-   public void testCompactingWithPendingTransaction() throws Exception
-   {
-
-   }
-
-   public void testSimpleCompacting() throws Exception
-   {
-      setup(50, 60 * 1024, true);
-
-      createJournal();
-      startJournal();
-      load();
-
-      int NUMBER_OF_RECORDS = 1000;
-
-      // add and remove some data to force reclaiming
-      {
-         ArrayList<Long> ids = new ArrayList<Long>();
-         for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-         {
-            long id = idGenerator.generateID();
-            ids.add(id);
-            add(id);
-            if (i > 0 && (i % 100 == 0))
-            {
-               journal.forceMoveNextFile();
-            }
-         }
-
-         for (Long id : ids)
-         {
-            delete(id);
-         }
-
-         journal.forceMoveNextFile();
-
-         journal.checkAndReclaimFiles();
-      }
-
-      long transactionID = 0;
-
-      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
-      {
-         add(i);
-         if (i % 10 == 0 && i > 0)
-         {
-            journal.forceMoveNextFile();
-         }
-         update(i);
-      }
-
-      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
-      {
-
-         addTx(transactionID, i);
-         updateTx(transactionID, i);
-         if (i % 10 == 0)
-         {
-            journal.forceMoveNextFile();
-         }
-         commit(transactionID++);
-         update(i);
-      }
-
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
-      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-      {
-         if (!(i % 10 == 0))
-         {
-            delete(i);
-         }
-      }
-
-      journal.forceMoveNextFile();
-
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
-      System.out.println("Before compact ****************************");
-      System.out.println(journal.debug());
-      System.out.println("*****************************************");
-
-      journal.compact();
-
-      stopJournal();
-      createJournal();
-      startJournal();
-      loadAndCheck();
-
-   }
-
-   protected int getAlignment()
-   {
-      return 1;
-   }
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-
-      File file = new File(journalDir);
-
-      deleteDirectory(file);
-
-      file.mkdir();
-   }
-
-   protected SequentialFileFactory createFactory()
-   {
-      return new NIOSequentialFileFactory(journalDir);
-   }
-
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
-    */
-   @Override
-   protected SequentialFileFactory getFileFactory() throws Exception
-   {
-      return createFactory();
-   }
-
-}




More information about the jboss-cvs-commits mailing list