[hornetq-commits] JBoss hornetq SVN: r12145 - trunk/hornetq-core/src/main/java/org/hornetq/core/replication.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 20 03:57:27 EST 2012


Author: gaohoward
Date: 2012-02-20 03:57:26 -0500 (Mon, 20 Feb 2012)
New Revision: 12145

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
Log:
https://community.jboss.org/thread/195519



Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java	2012-02-17 22:47:37 UTC (rev 12144)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java	2012-02-20 08:57:26 UTC (rev 12145)
@@ -119,6 +119,9 @@
    private boolean started;
 
    private QuorumManager quorumManager;
+   
+   //https://community.jboss.org/thread/195519
+   private Object stopLock = new Object();
 
    // Constructors --------------------------------------------------
    public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener)
@@ -156,67 +159,76 @@
 
       try
       {
-         if (type == PacketImpl.REPLICATION_APPEND)
+         synchronized (stopLock)
          {
-            handleAppendAddRecord((ReplicationAddMessage)packet);
+            if (!started)
+            {
+               return;
+            }
+            
+            if (type == PacketImpl.REPLICATION_APPEND)
+            {
+               handleAppendAddRecord((ReplicationAddMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_APPEND_TX)
+            {
+               handleAppendAddTXRecord((ReplicationAddTXMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_DELETE)
+            {
+               handleAppendDelete((ReplicationDeleteMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_DELETE_TX)
+            {
+               handleAppendDeleteTX((ReplicationDeleteTXMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_PREPARE)
+            {
+               handlePrepare((ReplicationPrepareMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+            {
+               handleCommitRollback((ReplicationCommitMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
+            {
+               handlePageWrite((ReplicationPageWriteMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
+            {
+               handlePageEvent((ReplicationPageEventMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
+            {
+               handleLargeMessageBegin((ReplicationLargeMessageBeingMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
+            {
+               handleLargeMessageWrite((ReplicationLargeMessageWriteMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
+            {
+               handleLargeMessageEnd((ReplicationLargeMessageEndMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
+            {
+               handleCompareDataMessage((ReplicationCompareDataMessage) packet);
+               response = new NullResponseMessage();
+            }
+            else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
+            {
+               handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet);
+            }
+            else if (type == PacketImpl.REPLICATION_SYNC_FILE)
+            {
+               handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
+            }
+            else
+            {
+               log.warn("Packet " + packet
+                     + " can't be processed by the ReplicationEndpoint");
+            }
          }
-         else if (type == PacketImpl.REPLICATION_APPEND_TX)
-         {
-            handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_DELETE)
-         {
-            handleAppendDelete((ReplicationDeleteMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_DELETE_TX)
-         {
-            handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_PREPARE)
-         {
-            handlePrepare((ReplicationPrepareMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
-         {
-            handleCommitRollback((ReplicationCommitMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
-         {
-            handlePageWrite((ReplicationPageWriteMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
-         {
-            handlePageEvent((ReplicationPageEventMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
-         {
-            handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
-         {
-            handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
-         {
-            handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
-         {
-            handleCompareDataMessage((ReplicationCompareDataMessage)packet);
-            response = new NullResponseMessage();
-         }
-         else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
-         {
-            handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
-         }
-         else if (type == PacketImpl.REPLICATION_SYNC_FILE)
-         {
-            handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
-         }
-         else
-         {
-            log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
-         }
       }
       catch (HornetQException e)
       {
@@ -280,64 +292,68 @@
 
    public synchronized void stop() throws Exception
    {
-      if (!started)
+      synchronized (stopLock)
       {
-          return;
-      }
+         if (!started)
+         {
+            return;
+         }
 
-      // Channel may be null if there isn't a connection to a live server
-      if (channel != null)
-      {
-         channel.close();
-      }
+         // Channel may be null if there isn't a connection to a live server
+         if (channel != null)
+         {
+            channel.close();
+         }
 
-      for (ConcurrentMap<Integer, Page> map : pageIndex.values())
-      {
-         for (Page page : map.values())
+         for (ConcurrentMap<Integer, Page> map : pageIndex.values())
          {
-            try
+            for (Page page : map.values())
             {
-               page.close();
+               try
+               {
+                  page.close();
+               }
+               catch (Exception e)
+               {
+                  log.warn("Error while closing the page on backup", e);
+               }
             }
-            catch (Exception e)
-            {
-               log.warn("Error while closing the page on backup", e);
-            }
          }
-      }
 
-      pageIndex.clear();
+         pageIndex.clear();
 
-      for (ReplicatedLargeMessage largeMessage : largeMessages.values())
-      {
-         largeMessage.releaseResources();
-      }
-      largeMessages.clear();
+         for (ReplicatedLargeMessage largeMessage : largeMessages.values())
+         {
+            largeMessage.releaseResources();
+         }
+         largeMessages.clear();
 
-      for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry : filesReservedForSync.entrySet())
-      {
-         for (JournalSyncFile filesReserved : entry.getValue().values())
+         for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry : filesReservedForSync
+               .entrySet())
          {
-            filesReserved.close();
+            for (JournalSyncFile filesReserved : entry.getValue().values())
+            {
+               filesReserved.close();
+            }
          }
-      }
 
-      filesReservedForSync.clear();
-      if (journals != null)
-      {
-         for (Journal j : journals)
+         filesReservedForSync.clear();
+         if (journals != null)
          {
-            if (j instanceof FileWrapperJournal)
-               j.stop();
+            for (Journal j : journals)
+            {
+               if (j instanceof FileWrapperJournal)
+                  j.stop();
+            }
          }
-      }
 
-      pageManager.stop();
+         pageManager.stop();
 
-      // Storage needs to be the last to stop
-      storage.stop();
+         // Storage needs to be the last to stop
+         storage.stop();
 
-      started = false;
+         started = false;
+      }
    }
 
 



More information about the hornetq-commits mailing list