Author: clebert.suconic(a)jboss.com
Date: 2011-08-09 21:53:32 -0400 (Tue, 09 Aug 2011)
New Revision: 11175
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
Log:
fixing a deadlock found on the testsuite
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-10
00:47:42 UTC (rev 11174)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-10
01:53:32 UTC (rev 11175)
@@ -16,6 +16,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Interceptor;
@@ -36,6 +37,7 @@
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -51,9 +53,9 @@
public class CoreProtocolManager implements ProtocolManager
{
private static final Logger log = Logger.getLogger(CoreProtocolManager.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
-
+
private final HornetQServer server;
private final List<Interceptor> interceptors;
@@ -74,7 +76,7 @@
config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
.getExecutor()
: null,
-
server.getNodeID());
+ server.getNodeID());
Channel channel1 = rc.getChannel(1, -1);
@@ -113,29 +115,51 @@
else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
{
SubscribeClusterTopologyUpdatesMessage msg =
(SubscribeClusterTopologyUpdatesMessage)packet;
-
+
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
+ Executor executor = server.getThreadPool();
+
+ public void nodeUP(final String nodeID,
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean last)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
+ // Using an executor as most of the notifications on the Topology
+ // may come from a channel itself
+ // What could cause deadlocks
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
+ }
+ });
}
-
- public void nodeDown(String nodeID)
+
+ public void nodeDown(final String nodeID)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ // Using an executor as most of the notifications on the Topology
+ // may come from a channel itself
+ // What could cause deadlocks
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ }
+ });
}
-
+
public String toString()
{
return "Remote Proxy on channel " +
Integer.toHexString(System.identityHashCode(this));
}
};
-
+
final boolean isCC = msg.isClusterConnection();
-
+
server.getClusterManager().addClusterTopologyListener(listener, isCC);
-
+
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
@@ -165,8 +189,6 @@
}
}
});
-
-
return entry;
}
Show replies by date