Author: gaohoward
Date: 2012-02-20 03:57:26 -0500 (Mon, 20 Feb 2012)
New Revision: 12145
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
Log:
https://community.jboss.org/thread/195519
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-02-17
22:47:37 UTC (rev 12144)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-02-20
08:57:26 UTC (rev 12145)
@@ -119,6 +119,9 @@
private boolean started;
private QuorumManager quorumManager;
+
+ //https://community.jboss.org/thread/195519
+ private Object stopLock = new Object();
// Constructors --------------------------------------------------
public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener
criticalErrorListener)
@@ -156,67 +159,76 @@
try
{
- if (type == PacketImpl.REPLICATION_APPEND)
+ synchronized (stopLock)
{
- handleAppendAddRecord((ReplicationAddMessage)packet);
+ if (!started)
+ {
+ return;
+ }
+
+ if (type == PacketImpl.REPLICATION_APPEND)
+ {
+ handleAppendAddRecord((ReplicationAddMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_APPEND_TX)
+ {
+ handleAppendAddTXRecord((ReplicationAddTXMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_DELETE)
+ {
+ handleAppendDelete((ReplicationDeleteMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_DELETE_TX)
+ {
+ handleAppendDeleteTX((ReplicationDeleteTXMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PREPARE)
+ {
+ handlePrepare((ReplicationPrepareMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+ {
+ handleCommitRollback((ReplicationCommitMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
+ {
+ handlePageWrite((ReplicationPageWriteMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
+ {
+ handlePageEvent((ReplicationPageEventMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
+ {
+ handleLargeMessageBegin((ReplicationLargeMessageBeingMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
+ {
+ handleLargeMessageWrite((ReplicationLargeMessageWriteMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
+ {
+ handleLargeMessageEnd((ReplicationLargeMessageEndMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
+ {
+ handleCompareDataMessage((ReplicationCompareDataMessage) packet);
+ response = new NullResponseMessage();
+ }
+ else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
+ {
+ handleStartReplicationSynchronization((ReplicationStartSyncMessage)
packet);
+ }
+ else if (type == PacketImpl.REPLICATION_SYNC_FILE)
+ {
+ handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
+ }
+ else
+ {
+ log.warn("Packet " + packet
+ + " can't be processed by the ReplicationEndpoint");
+ }
}
- else if (type == PacketImpl.REPLICATION_APPEND_TX)
- {
- handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_DELETE)
- {
- handleAppendDelete((ReplicationDeleteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_DELETE_TX)
- {
- handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PREPARE)
- {
- handlePrepare((ReplicationPrepareMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
- {
- handleCommitRollback((ReplicationCommitMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
- {
- handlePageWrite((ReplicationPageWriteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
- {
- handlePageEvent((ReplicationPageEventMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
- {
- handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
- {
- handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
- {
- handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
- {
- handleCompareDataMessage((ReplicationCompareDataMessage)packet);
- response = new NullResponseMessage();
- }
- else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
- {
- handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_SYNC_FILE)
- {
- handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
- }
- else
- {
- log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
- }
}
catch (HornetQException e)
{
@@ -280,64 +292,68 @@
public synchronized void stop() throws Exception
{
- if (!started)
+ synchronized (stopLock)
{
- return;
- }
+ if (!started)
+ {
+ return;
+ }
- // Channel may be null if there isn't a connection to a live server
- if (channel != null)
- {
- channel.close();
- }
+ // Channel may be null if there isn't a connection to a live server
+ if (channel != null)
+ {
+ channel.close();
+ }
- for (ConcurrentMap<Integer, Page> map : pageIndex.values())
- {
- for (Page page : map.values())
+ for (ConcurrentMap<Integer, Page> map : pageIndex.values())
{
- try
+ for (Page page : map.values())
{
- page.close();
+ try
+ {
+ page.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error while closing the page on backup", e);
+ }
}
- catch (Exception e)
- {
- log.warn("Error while closing the page on backup", e);
- }
}
- }
- pageIndex.clear();
+ pageIndex.clear();
- for (ReplicatedLargeMessage largeMessage : largeMessages.values())
- {
- largeMessage.releaseResources();
- }
- largeMessages.clear();
+ for (ReplicatedLargeMessage largeMessage : largeMessages.values())
+ {
+ largeMessage.releaseResources();
+ }
+ largeMessages.clear();
- for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry :
filesReservedForSync.entrySet())
- {
- for (JournalSyncFile filesReserved : entry.getValue().values())
+ for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry :
filesReservedForSync
+ .entrySet())
{
- filesReserved.close();
+ for (JournalSyncFile filesReserved : entry.getValue().values())
+ {
+ filesReserved.close();
+ }
}
- }
- filesReservedForSync.clear();
- if (journals != null)
- {
- for (Journal j : journals)
+ filesReservedForSync.clear();
+ if (journals != null)
{
- if (j instanceof FileWrapperJournal)
- j.stop();
+ for (Journal j : journals)
+ {
+ if (j instanceof FileWrapperJournal)
+ j.stop();
+ }
}
- }
- pageManager.stop();
+ pageManager.stop();
- // Storage needs to be the last to stop
- storage.stop();
+ // Storage needs to be the last to stop
+ storage.stop();
- started = false;
+ started = false;
+ }
}