[hornetq-commits] JBoss hornetq SVN: r11499 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/server/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 10 09:34:44 EDT 2011


Author: borges
Date: 2011-10-10 09:34:43 -0400 (Mon, 10 Oct 2011)
New Revision: 11499

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-720 stop sync'ing if the storageManager stops, stop ReplicationEndpoint first

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-10-10 13:34:06 UTC (rev 11498)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-10-10 13:34:43 UTC (rev 11499)
@@ -428,6 +428,8 @@
    {
       for (Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet())
       {
+         if (!started)
+            return;
          PagingStore store = manager.getPageStore(entry.getKey());
          store.sendPages(replicator, entry.getValue());
       }
@@ -460,6 +462,8 @@
          SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1);
          if (!seqFile.exists())
             continue;
+         if (!started)
+            return;
          replicator.syncLargeMessageFile(seqFile, size, getLargeMessageIdFromFilename(fileName));
       }
    }
@@ -498,6 +502,8 @@
    {
       for (JournalFile jf : journalFiles)
       {
+         if (!started)
+            return;
          replicator.syncJournalFile(jf, type);
          jf.setCanReclaim(true);
       }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-10-10 13:34:06 UTC (rev 11498)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-10-10 13:34:43 UTC (rev 11499)
@@ -823,6 +823,12 @@
             pagingManager.stop();
          }
 
+         if (replicationEndpoint != null)
+         {
+            replicationEndpoint.stop();
+            replicationEndpoint = null;
+         }
+
          if (storageManager != null)
          {
             storageManager.stop();
@@ -834,12 +840,6 @@
             replicationManager = null;
          }
 
-         if (replicationEndpoint != null)
-         {
-            replicationEndpoint.stop();
-            replicationEndpoint = null;
-         }
-
          if (securityManager != null)
          {
             securityManager.stop();
@@ -1669,7 +1669,10 @@
 
          Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
 
-         PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvider().createSubscription(queueBindingInfo.getId(), filter, true);
+         PageSubscription subscription =
+                  pagingManager.getPageStore(queueBindingInfo.getAddress())
+                               .getCursorProvider()
+                               .createSubscription(queueBindingInfo.getId(), filter, true);
 
          Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
                                                 queueBindingInfo.getAddress(),

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2011-10-10 13:34:06 UTC (rev 11498)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2011-10-10 13:34:43 UTC (rev 11499)
@@ -292,4 +292,11 @@
          }
       }
    }
+
+   @Override
+   public String toString()
+   {
+      return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped +
+               "):" + super.toString();
+   }
 }

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java	2011-10-10 13:34:06 UTC (rev 11498)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java	2011-10-10 13:34:43 UTC (rev 11499)
@@ -480,6 +480,7 @@
       return locatorWithoutHA;
    }
 
+   // XXX unused
    protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws Exception
    {
       ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(connectorClass));



More information about the hornetq-commits mailing list