[hornetq-commits] JBoss hornetq SVN: r11102 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq: core/protocol/core/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 3 15:53:18 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-03 15:53:17 -0400 (Wed, 03 Aug 2011)
New Revision: 11102

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/utils/HornetQThreadFactory.java
Log:
tweaks on code (fixing tests)

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-03 19:53:17 UTC (rev 11102)
@@ -21,8 +21,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -117,7 +117,7 @@
 
    private final ExecutorFactory orderedExecutorFactory;
 
-   private final ExecutorService threadPool;
+   private final Executor threadPool;
 
    private final ScheduledExecutorService scheduledThreadPool;
 
@@ -168,7 +168,7 @@
                                    final double retryIntervalMultiplier,
                                    final long maxRetryInterval,
                                    final int reconnectAttempts,
-                                   final ExecutorService threadPool,
+                                   final Executor threadPool,
                                    final ScheduledExecutorService scheduledThreadPool,
                                    final List<Interceptor> interceptors)
    {
@@ -1402,7 +1402,7 @@
             
             if (log.isTraceEnabled())
             {
-               log.trace("XXX Disconnect being called on client:" + msg, new Exception ("trace"));
+               log.trace("ZZZ Disconnect being called on client:" + msg + " server locator = " + serverLocator, new Exception ("trace"));
             }
 
             closeExecutor.execute(new Runnable()
@@ -1414,7 +1414,7 @@
                   SimpleString nodeID = msg.getNodeID();
                   if (log.isTraceEnabled())
                   {
-                     log.trace("XXX YYY notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator);
+                     log.trace("ZZZ notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator + " csf created at ", ClientSessionFactoryImpl.this.e);
                   }
                   if (nodeID != null)
                   {
@@ -1443,11 +1443,11 @@
             {
                if (isDebug)
                {
-                  log.debug("Node " + topMessage.getNodeID() +
+                  log.debug("ZZZ Node " + topMessage.getNodeID() +
                             " going up, connector = " +
                             topMessage.getPair() +
                             ", isLast=" +
-                            topMessage.isLast());
+                            topMessage.isLast() + " csf created at\nserverLocator=" + serverLocator, ClientSessionFactoryImpl.this.e);
                }
                serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
             }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-03 19:53:17 UTC (rev 11102)
@@ -84,6 +84,9 @@
    private boolean receivedTopology;
 
    private boolean compressLargeMessage;
+   
+   // if the system should shutdown the pool when shutting down
+   private transient boolean shutdownPool;
 
    private ExecutorService threadPool;
 
@@ -249,6 +252,11 @@
 
    private void setThreadPools()
    {
+	  if (threadPool != null)
+	  {
+		  return;
+	  }
+	  else
       if (useGlobalPools)
       {
          threadPool = getGlobalThreadPool();
@@ -257,6 +265,8 @@
       }
       else
       {
+         this.shutdownPool = true;
+         
          ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
                                                           true,
                                                           getThisClassLoader());
@@ -359,12 +369,19 @@
 
    private ServerLocatorImpl(final Topology topology,
                              final boolean useHA,
+                             final ExecutorService threadPool, 
+                             final ScheduledExecutorService scheduledExecutor, 
                              final DiscoveryGroupConfiguration discoveryGroupConfiguration,
                              final TransportConfiguration[] transportConfigs)
    {
       e.fillInStackTrace();
       
+      this.scheduledThreadPool = scheduledExecutor;
+      
+      this.threadPool = threadPool;
+      
       this.topology = topology;
+      
       this.ha = useHA;
 
       this.discoveryGroupConfiguration = discoveryGroupConfiguration;
@@ -442,7 +459,7 @@
     */
    public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
    {
-      this(new Topology(null), useHA, groupConfiguration, null);
+      this(new Topology(null), useHA, null, null, groupConfiguration, null);
       topology.setOwner(this);
    }
 
@@ -453,7 +470,7 @@
     */
    public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
    {
-      this(new Topology(null), useHA, null, transportConfigs);
+      this(new Topology(null), useHA, null, null, null, transportConfigs);
       topology.setOwner(this);
    }
 
@@ -463,9 +480,10 @@
     * @param discoveryAddress
     * @param discoveryPort
     */
-   public ServerLocatorImpl(final Topology topology, final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+   public ServerLocatorImpl(final Topology topology, final boolean useHA, final ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final DiscoveryGroupConfiguration groupConfiguration)
    {
-      this(topology, useHA, groupConfiguration, null);
+      this(topology, useHA, threadPool, scheduledExecutor, groupConfiguration, null);
+      
    }
 
    /**
@@ -473,9 +491,9 @@
     *
     * @param transportConfigs
     */
-   public ServerLocatorImpl(final Topology topology, final boolean useHA, final TransportConfiguration... transportConfigs)
+   public ServerLocatorImpl(final Topology topology, final boolean useHA, final ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final TransportConfiguration... transportConfigs)
    {
-      this(topology, useHA, null, transportConfigs);
+      this(topology, useHA, threadPool, scheduledExecutor, null, transportConfigs);
    }
 
    private TransportConfiguration selectConnector()
@@ -1163,7 +1181,7 @@
 
       factories.clear();
 
-      if (!useGlobalPools)
+      if (shutdownPool)
       {
          if (threadPool != null)
          {
@@ -1264,7 +1282,7 @@
 
       TopologyMember actMember = topology.getMember(nodeID);
 
-      if (actMember.getConnector().a != null && actMember.getConnector().b != null)
+      if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
       {
          for (ClientSessionFactory factory : factories)
          {

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-03 19:53:17 UTC (rev 11102)
@@ -55,7 +55,7 @@
    public Topology(final Object owner)
    {
       this.owner = owner;
-      Topology.log.debug("ZZZ III Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+      Topology.log.debug("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
                          new Exception("trace")); // Delete this line
    }
 
@@ -115,7 +115,7 @@
             replaced = true;
             if (Topology.log.isDebugEnabled())
             {
-               Topology.log.debug("ZZZ " + this +
+               Topology.log.debug("Add " + this +
                                   " MEMBER WAS NULL, Add member nodeId=" +
                                   nodeId +
                                   " member = " +
@@ -158,7 +158,7 @@
 
          if (Topology.log.isDebugEnabled())
          {
-            Topology.log.debug("ZZZ " + this +
+            Topology.log.debug(this +
                                " Add member nodeId=" +
                                nodeId +
                                " member = " +
@@ -178,10 +178,17 @@
          {
             if (Topology.log.isTraceEnabled())
             {
-               Topology.log.trace("XXX ZZZ " + this + " informing " + listener + " about node up = " + nodeId);
+               Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
             }
 
-            listener.nodeUP(nodeId, member.getConnector(), last);
+            try
+            {
+               listener.nodeUP(nodeId, member.getConnector(), last);
+            }
+            catch (Throwable e)
+            {
+               log.warn (e.getMessage(), e);
+            }
          }
       }
 
@@ -209,7 +216,19 @@
       {
          member = topology.remove(nodeId);
       }
+      
 
+      if (Topology.log.isDebugEnabled())
+      {
+         Topology.log.debug("ZZZ removeMember " + this +
+                            " removing nodeID=" +
+                            nodeId +
+                            ", result=" +
+                            member +
+                            ", size = " +
+                            topology.size(), new Exception("trace"));
+      }
+
       if (member != null)
       {
          ArrayList<ClusterTopologyListener> copy = copyListeners();
@@ -218,23 +237,11 @@
          {
             if (Topology.log.isTraceEnabled())
             {
-               Topology.log.trace("XXX ZZZ " + this + " informing " + listener + " about node down = " + nodeId);
+               Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
             }
             listener.nodeDown(nodeId);
          }
       }
-
-      if (Topology.log.isDebugEnabled())
-      {
-         Topology.log.debug("ZZZ " + this +
-                            " removing nodeID=" +
-                            nodeId +
-                            ", result=" +
-                            member +
-                            ", size = " +
-                            topology.size(), new Exception("trace"));
-      }
-
       return member != null;
    }
 

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-03 19:53:17 UTC (rev 11102)
@@ -118,6 +118,7 @@
                {
                   public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
                   {
+                      Logger.getLogger(this.getClass()).info("ZZZ send nodeDown on " + this);
                       channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
                   }
                   

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java	2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java	2011-08-03 19:53:17 UTC (rev 11102)
@@ -15,6 +15,7 @@
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 import javax.management.MBeanServer;
@@ -152,6 +153,8 @@
 
    ScheduledExecutorService getScheduledPool();
    
+   ExecutorService getThreadPool();
+   
    ExecutorFactory getExecutorFactory();
 
    void setGroupingHandler(GroupingHandler groupingHandler);

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-03 19:53:17 UTC (rev 11102)
@@ -71,13 +71,13 @@
 public class ClusterConnectionImpl implements ClusterConnection
 {
    private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
-   
+
    private static final boolean isTrace = log.isTraceEnabled();
 
    private final ExecutorFactory executorFactory;
-   
+
    private final Topology clusterManagerTopology;
-   
+
    private final Executor executor;
 
    private final HornetQServer server;
@@ -91,15 +91,15 @@
    private final SimpleString address;
 
    private final long clientFailureCheckPeriod;
-   
+
    private final long connectionTTL;
-   
+
    private final long retryInterval;
-   
+
    private final double retryIntervalMultiplier;
-   
+
    private final long maxRetryInterval;
-   
+
    private final int reconnectAttempts;
 
    private final boolean useDuplicateDetection;
@@ -125,15 +125,15 @@
    private final ClusterConnector clusterConnector;
 
    private ServerLocatorInternal serverLocator;
-   
+
    private final TransportConfiguration connector;
 
    private final boolean allowDirectConnectionsOnly;
 
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
-   
+
    private final ClusterManagerImpl manager;
-   
+
    public ClusterConnectionImpl(final ClusterManagerImpl manager,
                                 final Topology clusterManagerTopology,
                                 final TransportConfiguration[] tcConfigs,
@@ -176,15 +176,15 @@
       this.address = address;
 
       this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-      
+
       this.connectionTTL = connectionTTL;
-      
+
       this.retryInterval = retryInterval;
-      
+
       this.retryIntervalMultiplier = retryIntervalMultiplier;
-      
+
       this.maxRetryInterval = maxRetryInterval;
-      
+
       this.reconnectAttempts = reconnectAttempts;
 
       this.useDuplicateDetection = useDuplicateDetection;
@@ -192,7 +192,7 @@
       this.routeWhenNoConsumers = routeWhenNoConsumers;
 
       this.executorFactory = executorFactory;
-      
+
       this.executor = executorFactory.getExecutor();
 
       this.server = server;
@@ -212,9 +212,9 @@
       this.clusterPassword = clusterPassword;
 
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
-      
+
       this.manager = manager;
-      
+
       this.clusterManagerTopology = clusterManagerTopology;
 
       clusterConnector = new StaticClusterConnector(tcConfigs);
@@ -223,7 +223,7 @@
       {
          // a cluster connection will connect to other nodes only if they are directly connected
          // through a static list of connectors or broadcasting using UDP.
-         if(allowDirectConnectionsOnly)
+         if (allowDirectConnectionsOnly)
          {
             allowableConnections.addAll(Arrays.asList(tcConfigs));
          }
@@ -237,12 +237,12 @@
                                 final TransportConfiguration connector,
                                 final SimpleString name,
                                 final SimpleString address,
-								        final long clientFailureCheckPeriod,
-								        final long connectionTTL,
-								        final long retryInterval,
-								        final double retryIntervalMultiplier,
-								        final long maxRetryInterval,
-								        final int reconnectAttempts,
+                                final long clientFailureCheckPeriod,
+                                final long connectionTTL,
+                                final long retryInterval,
+                                final double retryIntervalMultiplier,
+                                final long maxRetryInterval,
+                                final int reconnectAttempts,
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
                                 final int confirmationWindowSize,
@@ -273,15 +273,15 @@
       this.address = address;
 
       this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-      
+
       this.connectionTTL = connectionTTL;
-      
+
       this.retryInterval = retryInterval;
-      
+
       this.retryIntervalMultiplier = retryIntervalMultiplier;
-      
+
       this.maxRetryInterval = maxRetryInterval;
-      
+
       this.reconnectAttempts = reconnectAttempts;
 
       this.useDuplicateDetection = useDuplicateDetection;
@@ -289,7 +289,7 @@
       this.routeWhenNoConsumers = routeWhenNoConsumers;
 
       this.executorFactory = executorFactory;
-      
+
       this.executor = executorFactory.getExecutor();
 
       this.server = server;
@@ -311,9 +311,9 @@
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
 
       clusterConnector = new DiscoveryClusterConnector(dg);
-      
+
       this.manager = manager;
-      
+
       this.clusterManagerTopology = clusterManagerTopology;
    }
 
@@ -325,13 +325,12 @@
       }
 
       started = true;
-      
-      if(!backup)
+
+      if (!backup)
       {
          activate();
       }
 
-
    }
 
    public void stop() throws Exception
@@ -340,14 +339,23 @@
       {
          return;
       }
+      
+      if (log.isDebugEnabled())
+      {
+         log.debug(this + "::stopping ClusterConnection");
+      }
 
       if (serverLocator != null)
       {
          serverLocator.removeClusterTopologyListener(this);
       }
-      
-      log.debug("Cluster connection being stopped for node" + nodeUUID + ", server = " + this.server + " serverLocator = " + serverLocator );
 
+      log.debug("Cluster connection being stopped for node" + nodeUUID +
+                ", server = " +
+                this.server +
+                " serverLocator = " +
+                serverLocator);
+
       synchronized (this)
       {
          for (MessageFlowRecord record : records.values())
@@ -370,14 +378,19 @@
                                                          props);
             managementService.sendNotification(notification);
          }
-         
-         executor.execute(new Runnable(){
+
+         executor.execute(new Runnable()
+         {
             public void run()
             {
-               if(serverLocator != null)
+               synchronized (ClusterConnectionImpl.this)
                {
-                  serverLocator.close();
-                  serverLocator = null;
+                  if (serverLocator != null)
+                  {
+                     serverLocator.removeClusterTopologyListener(ClusterConnectionImpl.this);
+                     serverLocator.close();
+                     serverLocator = null;
+                  }
                }
 
             }
@@ -401,7 +414,7 @@
    {
       return nodeUUID.toString();
    }
-   
+
    public HornetQServer getServer()
    {
       return server;
@@ -429,7 +442,7 @@
       {
          return;
       }
-      
+
       if (log.isDebugEnabled())
       {
          log.debug("Activating cluster connection nodeID=" + nodeUUID + " for server=" + this.server);
@@ -439,7 +452,6 @@
 
       serverLocator = clusterConnector.createServerLocator();
 
-
       if (serverLocator != null)
       {
          serverLocator.setNodeID(nodeUUID.toString());
@@ -451,17 +463,17 @@
          serverLocator.setClusterTransportConfiguration(connector);
          serverLocator.setBackup(server.getConfiguration().isBackup());
          serverLocator.setInitialConnectAttempts(-1);
-         
+
          serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
          serverLocator.setConnectionTTL(connectionTTL);
 
          if (serverLocator.getConfirmationWindowSize() < 0)
          {
-        	// We can't have confirmationSize = -1 on the cluster Bridge
-        	// Otherwise we won't have confirmation working
+            // We can't have confirmationSize = -1 on the cluster Bridge
+            // Otherwise we won't have confirmation working
             serverLocator.setConfirmationWindowSize(0);
          }
-         
+
          if (!useDuplicateDetection)
          {
             log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
@@ -470,7 +482,7 @@
          serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
          serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
 
-         if(retryInterval > 0)
+         if (retryInterval > 0)
          {
             this.serverLocator.setRetryInterval(retryInterval);
          }
@@ -491,7 +503,7 @@
          managementService.sendNotification(notification);
       }
    }
-   
+
    public TransportConfiguration getConnector()
    {
       return connector;
@@ -509,9 +521,9 @@
       {
          return;
       }
-      
-      //Remove the flow record for that node
-      
+
+      // Remove the flow record for that node
+
       MessageFlowRecord record = records.remove(nodeID);
 
       if (record != null)
@@ -528,62 +540,64 @@
          {
             log.error("Failed to close flow record", e);
          }
-         
+
          server.getClusterManager().notifyNodeDown(nodeID);
       }
    }
 
-
    public void nodeUP(final String nodeID,
-                                   final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                                   final boolean last)
+                      final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                      final boolean last)
    {
-      if (log.isDebugEnabled())
-      {
-         String ClusterTestBase = "receiving nodeUP for nodeID=";
-         log.debug(this + ClusterTestBase + nodeID + 
-                   " connectionPair=" + connectorPair);
-      }
-      // discard notifications about ourselves unless its from our backup
 
-      if (nodeID.equals(nodeUUID.toString()))
+      // we propagate the node notifications to all cluster topology listeners
+      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+
+      synchronized (this)
       {
-         if(connectorPair.b != null)
+         if (serverLocator == null)
          {
-            server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+            log.debug("ClusterConnection nodeID=" + nodeID + " has already been stopped, ignoring call");
+            return;
          }
-         return;
-      }
 
-      // we propagate the node notifications to all cluster topology listeners
-      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+         if (log.isDebugEnabled())
+         {
+            String ClusterTestBase = "receiving nodeUP for nodeID=";
+            log.debug(this + ClusterTestBase + nodeID + " connectionPair=" + connectorPair);
+         }
+         // discard notifications about ourselves unless its from our backup
 
-      // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
-      if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
-      {
-         return;
-      }
+         if (nodeID.equals(nodeUUID.toString()))
+         {
+            if (connectorPair.b != null)
+            {
+               server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+            }
+            return;
+         }
 
-      // FIXME required to prevent cluster connections w/o discovery group 
-      // and empty static connectors to create bridges... ulgy!
-      if (serverLocator == null)
-      {
-          log.warn("ServerLocator==null FixME!!!!!");
-          return;
-      }
-      /*we dont create bridges to backups*/
-      if(connectorPair.a == null)
-      {
-         if (isTrace)
+         // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
+         if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
          {
-            log.trace(this + " ignoring call with nodeID="  + nodeID + 
-                      ", connectorPair=" + connectorPair + ", last=" + last);
+            return;
          }
-         return;
-      }
 
-      synchronized (records)
-      {
+         /*we dont create bridges to backups*/
+         if (connectorPair.a == null)
+         {
+            if (isTrace)
+            {
+               log.trace(this + " ignoring call with nodeID=" +
+                         nodeID +
+                         ", connectorPair=" +
+                         connectorPair +
+                         ", last=" +
+                         last);
+            }
+            return;
+         }
+
          try
          {
             MessageFlowRecord record = records.get(nodeID);
@@ -620,32 +634,42 @@
             {
                if (isTrace)
                {
-                  log.trace ("XXX " + this + " ignored nodeUp record for " + connectorPair + " on nodeID=" + nodeID + " as the record already existed");
+                  log.trace("XXX " + this +
+                            " ignored nodeUp record for " +
+                            connectorPair +
+                            " on nodeID=" +
+                            nodeID +
+                            " as the record already existed");
                }
             }
          }
          catch (Exception e)
          {
-            log.error("Failed to update topology", e);
+            log.error(this + "::Failed to update topology", e);
          }
       }
    }
 
-   private void createNewRecord(final String targetNodeID,
-                                final TransportConfiguration connector,
-                                final SimpleString queueName,
-                                final Queue queue,
-                                final boolean start) throws Exception
+   private synchronized void createNewRecord(final String targetNodeID,
+                                             final TransportConfiguration connector,
+                                             final SimpleString queueName,
+                                             final Queue queue,
+                                             final boolean start) throws Exception
    {
+      if (serverLocator != null)
+      {
+         return;
+      }
+
       MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetNodeID, connector, queueName, queue);
 
       Bridge bridge = createClusteredBridge(record);
-      
+
       if (log.isDebugEnabled())
       {
          log.debug("XXX creating record between " + this.connector + " and " + connector + bridge);
       }
-      
+
       record.setBridge(bridge);
 
       records.put(targetNodeID, record);
@@ -663,9 +687,12 @@
     */
    protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
    {
-      
-      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(this.clusterManagerTopology, false, record.getConnector()); 
-      
+
+      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(this.clusterManagerTopology,
+                                                                        false,
+                                                                        server.getThreadPool(), server.getScheduledPool(), 
+                                                                        record.getConnector());
+
       targetLocator.setReconnectAttempts(0);
 
       targetLocator.setInitialConnectAttempts(0);
@@ -677,20 +704,20 @@
       targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
       targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
       targetLocator.setClusterConnection(true);
-      
+
       targetLocator.setRetryInterval(retryInterval);
       targetLocator.setMaxRetryInterval(maxRetryInterval);
       targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
-      
+
       targetLocator.setNodeID(serverLocator.getNodeID());
 
       targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
-      
-      if(retryInterval > 0)
+
+      if (retryInterval > 0)
       {
          targetLocator.setRetryInterval(retryInterval);
       }
-      
+
       manager.addClusterLocator(targetLocator);
 
       ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
@@ -719,9 +746,8 @@
                                                                    record,
                                                                    record.getConnector());
 
+      targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");
 
-      targetLocator.setIdentity("(Cluster-connection-bridge::"  + bridge.toString() + "::" + this.toString() + ")");
-
       return bridge;
    }
 
@@ -732,12 +758,15 @@
       private Bridge bridge;
 
       private final String targetNodeID;
+
       private final TransportConfiguration connector;
+
       private final SimpleString queueName;
+
       private final Queue queue;
 
       private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
-      
+
       private volatile boolean isClosed = false;
 
       private volatile boolean firstReset = false;
@@ -752,8 +781,6 @@
          this.connector = connector;
          this.queueName = queueName;
       }
-      
-      
 
       /* (non-Javadoc)
        * @see java.lang.Object#toString()
@@ -771,12 +798,10 @@
                 ", isClosed=" +
                 isClosed +
                 ", firstReset=" +
-                firstReset + 
+                firstReset +
                 "]";
       }
 
-
-
       public String getAddress()
       {
          return address.toString();
@@ -825,14 +850,14 @@
          {
             log.trace("Stopping bridge " + bridge);
          }
-         
+
          isClosed = true;
          clearBindings();
-         
+
          bridge.stop();
       }
 
-       public boolean isClosed()
+      public boolean isClosed()
       {
          return isClosed;
       }
@@ -842,7 +867,7 @@
          clearBindings();
       }
 
-       public void setBridge(final Bridge bridge)
+      public void setBridge(final Bridge bridge)
       {
          this.bridge = bridge;
       }
@@ -856,7 +881,7 @@
       {
          if (isTrace)
          {
-            log.trace("Flow record on " + clusterConnector + " Receiving message "  + message);
+            log.trace("Flow record on " + clusterConnector + " Receiving message " + message);
          }
          try
          {
@@ -1049,12 +1074,12 @@
 
             return;
          }
-         
+
          if (isTrace)
          {
             log.trace("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this);
          }
-         
+
          bindings.put(clusterName, binding);
 
          try
@@ -1083,7 +1108,7 @@
          }
 
          SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
-         
+
          System.out.println("Removing clusterName=" + clusterName + " on " + ClusterConnectionImpl.this);
 
          removeBinding(clusterName);
@@ -1126,10 +1151,12 @@
          SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
 
          RemoteQueueBinding binding = bindings.get(clusterName);
-         
+
          if (binding == null)
          {
-            throw new IllegalStateException("Cannot find binding for " + clusterName + " on " + ClusterConnectionImpl.this);
+            throw new IllegalStateException("Cannot find binding for " + clusterName +
+                                            " on " +
+                                            ClusterConnectionImpl.this);
          }
 
          binding.addConsumer(filterString);
@@ -1223,7 +1250,7 @@
    {
       return records;
    }
-   
+
    /* (non-Javadoc)
     * @see java.lang.Object#toString()
     */
@@ -1245,18 +1272,16 @@
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);
 
-
       out.println(this);
       out.println("***************************************");
       out.println(name + " connected to");
       for (Entry<String, MessageFlowRecord> messageFlow : records.entrySet())
       {
-         out.println("\t Bridge = "  + messageFlow.getValue().getBridge());
+         out.println("\t Bridge = " + messageFlow.getValue().getBridge());
          out.println("\t Flow Record = " + messageFlow.getValue());
       }
       out.println("***************************************");
-      
-      
+
       return str.toString();
    }
 
@@ -1276,13 +1301,13 @@
 
       public ServerLocatorInternal createServerLocator()
       {
-         if(tcConfigs != null && tcConfigs.length > 0)
+         if (tcConfigs != null && tcConfigs.length > 0)
          {
             if (log.isDebugEnabled())
             {
                log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
             }
-            return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
+            return new ServerLocatorImpl(clusterManagerTopology, true, server.getThreadPool(), server.getScheduledPool(), tcConfigs);
          }
          else
          {
@@ -1298,8 +1323,7 @@
       {
          return "StaticClusterConnector [tcConfigs=" + Arrays.toString(tcConfigs) + "]";
       }
-      
-      
+
    }
 
    private class DiscoveryClusterConnector implements ClusterConnector
@@ -1313,7 +1337,7 @@
 
       public ServerLocatorInternal createServerLocator()
       {
-         return new ServerLocatorImpl(clusterManagerTopology, true, dg);
+         return new ServerLocatorImpl(clusterManagerTopology, true, server.getThreadPool(), server.getScheduledPool(), dg);
       }
    }
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-03 19:53:17 UTC (rev 11102)
@@ -864,6 +864,11 @@
       return scheduledPool;
    }
    
+   public ExecutorService getThreadPool()
+   {
+      return threadPool;
+   }
+   
    public Configuration getConfiguration()
    {
       return configuration;
@@ -1349,7 +1354,7 @@
    {
       // Create the pools - we have two pools - one for non scheduled - and another for scheduled
 
-      ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-server-threads" + System.identityHashCode(this),
+      ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-server-" + this.toString(),
                                                         false,
                                                         getThisClassLoader());
 

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/utils/HornetQThreadFactory.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/utils/HornetQThreadFactory.java	2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/utils/HornetQThreadFactory.java	2011-08-03 19:53:17 UTC (rev 11102)
@@ -63,7 +63,7 @@
       // when sandboxed, the code does not have the RuntimePermission modifyThreadGroup
       if (System.getSecurityManager() == null)
       {
-         t = new Thread(group, command, "Thread-" + threadCount.getAndIncrement() + " (group:" + group.getName() + ")");
+         t = new Thread(group, command, "Thread-" + threadCount.getAndIncrement() + " (" + group.getName() + ")");
       }
       else
       {



More information about the hornetq-commits mailing list