[hornetq-commits] JBoss hornetq SVN: r11476 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 5 12:49:20 EDT 2011


Author: borges
Date: 2011-10-05 12:49:18 -0400 (Wed, 05 Oct 2011)
New Revision: 11476

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/HORNETQ-720_Replication/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
   branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
HORNETQ-720 Add a compactLock to JournalImpl

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -365,39 +365,34 @@
       final Journal localMessageJournal = messageJournal;
       final Journal localBindingsJournal = bindingsJournal;
 
-      final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
-      final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
       Map<String, Long> largeMessageFilesToSync;
       Map<SimpleString, Collection<Integer>> pageFilesToSync;
+      storageManagerLock.writeLock().lock();
       try
       {
-         storageManagerLock.writeLock().lock();
+         replicator = replicationManager;
+         localMessageJournal.synchronizationLock();
+         localBindingsJournal.synchronizationLock();
          try
          {
-            replicator = replicationManager;
-
-            localMessageJournal.writeLock();
-            localBindingsJournal.writeLock();
+            pagingManager.lockAll();
             try
             {
-               pagingManager.lockAll();
-               try
-               {
-                  messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
-                  bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
-                  pageFilesToSync = getPageInformationForSync(pagingManager);
-                  largeMessageFilesToSync = getLargeMessageInformation();
-               }
-               finally
-               {
-                  pagingManager.unlockAll();
-               }
+               messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+               bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+               pageFilesToSync = getPageInformationForSync(pagingManager);
+               largeMessageFilesToSync = getLargeMessageInformation();
             }
             finally
             {
-               localMessageJournal.writeUnlock();
-               localBindingsJournal.writeUnlock();
+               pagingManager.unlockAll();
             }
+         }
+            finally
+            {
+               localMessageJournal.synchronizationUnlock();
+               localBindingsJournal.synchronizationUnlock();
+            }
             bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
             messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
          }
@@ -421,14 +416,9 @@
          {
             storageManagerLock.writeLock().unlock();
          }
-      }
-      finally
-      {
-         localMessageJournal.setAutoReclaim(messageJournalAutoReclaim);
-         localBindingsJournal.setAutoReclaim(bindingsJournalAutoReclaim);
-      }
    }
 
+
    /**
     * @param pageFilesToSync
     * @throws Exception

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -587,30 +587,18 @@
    }
 
    @Override
-   public boolean getAutoReclaim()
+   public void synchronizationLock()
    {
       throw new UnsupportedOperationException();
    }
 
    @Override
-   public void writeLock()
+   public void synchronizationUnlock()
    {
       throw new UnsupportedOperationException();
    }
 
    @Override
-   public void writeUnlock()
-   {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void setAutoReclaim(boolean autoReclaim)
-   {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
    public void forceMoveNextFile()
    {
       throw new UnsupportedOperationException();

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -386,7 +386,7 @@
       for (JournalContent jc : EnumSet.allOf(JournalContent.class))
       {
          JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
-         journal.writeLock();
+         journal.synchronizationLock();
          try
          {
             if (journal.getDataFiles().length != 0)
@@ -401,7 +401,7 @@
          }
          finally
          {
-            journal.writeUnlock();
+            journal.synchronizationUnlock();
          }
       }
       synchronized (largeMessagesOnSync)

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -156,28 +156,18 @@
    Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception;
 
    /**
-    * @return whether automatic reclaiming of Journal files is enabled
+    * Write lock the Journal and write lock the compacting process. Necessary only during
+    * replication for backup synchronization.
     */
-   boolean getAutoReclaim();
+   void synchronizationLock();
 
    /**
-    * Write lock the Journal. Necessary only during replication for backup synchronization.
+    * Unlock the Journal and the compacting process.
+    * @see Journal#synchronizationLock()
     */
-   void writeLock();
+   void synchronizationUnlock();
 
    /**
-    * Write-unlock the Journal.
-    * @see Journal#writeLock()
-    */
-   void writeUnlock();
-
-   /**
-    * Sets whether the journal should auto-reclaim its internal files.
-    * @param autoReclaim
-    */
-   void setAutoReclaim(boolean autoReclaim);
-
-   /**
     * Force the usage of a new {@link JournalFile}.
     * @throws Exception
     */

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -52,4 +52,15 @@
    /** This method is called automatically when a new file is opened.
     * @return true if it needs to re-check due to cleanup or other factors  */
    boolean checkReclaimStatus() throws Exception;
+
+   /**
+    * @return whether automatic reclaiming of Journal files is enabled
+    */
+   boolean getAutoReclaim();
+
+   /**
+    * Sets whether the journal should auto-reclaim its internal files.
+    * @param autoReclaim
+    */
+   void setAutoReclaim(boolean autoReclaim);
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -277,30 +277,18 @@
    }
 
    @Override
-   public boolean getAutoReclaim()
+   public void synchronizationLock()
    {
       throw new UnsupportedOperationException();
    }
 
    @Override
-   public void writeLock()
+   public void synchronizationUnlock()
    {
       throw new UnsupportedOperationException();
    }
 
    @Override
-   public void writeUnlock()
-   {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void setAutoReclaim(boolean autoReclaim)
-   {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
    public void forceMoveNextFile()
    {
       throw new UnsupportedOperationException();

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -206,6 +206,7 @@
     * However we need to lock it while taking and updating snapshots
     */
    private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+   private final ReadWriteLock compactorLock = new ReentrantReadWriteLock();
 
    private volatile JournalState state = JournalState.STOPPED;
 
@@ -1427,6 +1428,9 @@
          throw new IllegalStateException("There is pending compacting operation");
       }
 
+      compactorLock.writeLock().lock();
+      try
+      {
       ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(filesRepository.getDataFilesCount());
 
       boolean previousReclaimValue = autoReclaim;
@@ -1455,7 +1459,7 @@
                return;
             }
 
-            onCompactLock();
+            onCompactLockingTheJournal();
 
             setAutoReclaim(false);
 
@@ -1534,7 +1538,7 @@
             // Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
             compactor = null;
 
-            onCompactLock();
+            onCompactLockingTheJournal();
 
             newDatafiles = localCompactor.getNewDataFiles();
 
@@ -1626,8 +1630,12 @@
             compactor = null;
          }
          setAutoReclaim(previousReclaimValue);
+         }
       }
-
+      finally
+      {
+         compactorLock.writeLock().unlock();
+      }
    }
 
    /**
@@ -2145,11 +2153,18 @@
    // TestableJournal implementation
    // --------------------------------------------------------------
 
+   @Override
    public synchronized void setAutoReclaim(final boolean autoReclaim)
    {
       this.autoReclaim = autoReclaim;
    }
 
+   @Override
+   public boolean getAutoReclaim()
+   {
+      return autoReclaim;
+   }
+
    public String debug() throws Exception
    {
       reclaimer.scan(getDataFiles());
@@ -2493,7 +2508,7 @@
 
    /** This is an interception point for testcases, when the compacted files are written, to be called
     *  as soon as the compactor gets a writeLock */
-   protected void onCompactLock() throws Exception
+   protected void onCompactLockingTheJournal() throws Exception
    {
    }
 
@@ -2973,14 +2988,22 @@
       }
    }
 
-   public void writeLock()
+   public void synchronizationLock()
    {
+      compactorLock.writeLock().lock();
       journalLock.writeLock().lock();
    }
 
-   public void writeUnlock()
+   public void synchronizationUnlock()
    {
-      journalLock.writeLock().unlock();
+      try
+      {
+         compactorLock.writeLock().unlock();
+      }
+      finally
+      {
+         journalLock.writeLock().unlock();
+      }
    }
 
    /**
@@ -2991,7 +3014,7 @@
    @Override
    public synchronized Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
    {
-      writeLock();
+      synchronizationLock();
       try
       {
          Map<Long, JournalFile> map = new HashMap<Long, JournalFile>();
@@ -3007,15 +3030,10 @@
       }
       finally
       {
-         writeUnlock();
+         synchronizationUnlock();
       }
    }
 
-   public boolean getAutoReclaim()
-   {
-      return autoReclaim;
-   }
-
    @Override
    public SequentialFileFactory getFileFactory()
    {

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -861,30 +861,18 @@
       }
 
       @Override
-      public boolean getAutoReclaim()
+      public void synchronizationLock()
       {
-         return false;
-      }
 
-      @Override
-      public void writeLock()
-      {
-
       }
 
       @Override
-      public void writeUnlock()
+      public void synchronizationUnlock()
       {
 
       }
 
       @Override
-      public void setAutoReclaim(boolean autoReclaim)
-      {
-
-      }
-
-      @Override
       public void forceMoveNextFile() throws Exception
       {
 

Modified: branches/HORNETQ-720_Replication/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -117,7 +117,7 @@
                                 "hq",
                                 maxAIO)
       {
-         protected void onCompactLock() throws Exception
+         protected void onCompactLockingTheJournal() throws Exception
          {
          }
 

Modified: branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2011-10-05 16:46:27 UTC (rev 11475)
+++ branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2011-10-05 16:49:18 UTC (rev 11476)
@@ -132,7 +132,7 @@
                                 "hq",
                                 maxAIO)
       {
-         protected void onCompactLock() throws Exception
+         protected void onCompactLockingTheJournal() throws Exception
          {
          }
 



More information about the hornetq-commits mailing list