[hornetq-commits] JBoss hornetq SVN: r11175 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 9 21:53:33 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-09 21:53:32 -0400 (Tue, 09 Aug 2011)
New Revision: 11175

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
Log:
fixing a deadlock found on the testsuite

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-10 00:47:42 UTC (rev 11174)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-10 01:53:32 UTC (rev 11175)
@@ -16,6 +16,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.Interceptor;
@@ -36,6 +37,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -51,9 +53,9 @@
 public class CoreProtocolManager implements ProtocolManager
 {
    private static final Logger log = Logger.getLogger(CoreProtocolManager.class);
-   
+
    private static final boolean isTrace = log.isTraceEnabled();
-   
+
    private final HornetQServer server;
 
    private final List<Interceptor> interceptors;
@@ -74,7 +76,7 @@
                                                                    config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
                                                                                                                       .getExecutor()
                                                                                                              : null,
-                                                                                                             server.getNodeID());
+                                                                   server.getNodeID());
 
       Channel channel1 = rc.getChannel(1, -1);
 
@@ -113,29 +115,51 @@
             else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
             {
                SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage)packet;
-               
+
                final ClusterTopologyListener listener = new ClusterTopologyListener()
                {
-                  public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+                  Executor executor = server.getThreadPool();
+
+                  public void nodeUP(final String nodeID,
+                                     final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                                     final boolean last)
                   {
-                      channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+                     // Using an executor as most of the notifications on the Topology
+                     // may come from a channel itself
+                     // What could cause deadlocks
+                     executor.execute(new Runnable()
+                     {
+                        public void run()
+                        {
+                           channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+                        }
+                     });
                   }
-                  
-                  public void nodeDown(String nodeID)
+
+                  public void nodeDown(final String nodeID)
                   {
-                      channel0.send(new ClusterTopologyChangeMessage(nodeID));
+                     // Using an executor as most of the notifications on the Topology
+                     // may come from a channel itself
+                     // What could cause deadlocks
+                     executor.execute(new Runnable()
+                     {
+                        public void run()
+                        {
+                           channel0.send(new ClusterTopologyChangeMessage(nodeID));
+                        }
+                     });
                   }
-                  
+
                   public String toString()
                   {
                      return "Remote Proxy on channel " + Integer.toHexString(System.identityHashCode(this));
                   }
                };
-               
+
                final boolean isCC = msg.isClusterConnection();
-               
+
                server.getClusterManager().addClusterTopologyListener(listener, isCC);
-               
+
                rc.addCloseListener(new CloseListener()
                {
                   public void connectionClosed()
@@ -165,8 +189,6 @@
             }
          }
       });
-      
-      
 
       return entry;
    }



More information about the hornetq-commits mailing list