[hornetq-commits] JBoss hornetq SVN: r11060 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 27 21:32:47 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-07-27 21:32:47 -0400 (Wed, 27 Jul 2011)
New Revision: 11060

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-27 20:03:39 UTC (rev 11059)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-28 01:32:47 UTC (rev 11060)
@@ -46,6 +46,7 @@
 import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -65,10 +66,10 @@
    private boolean finalizeCheck = true;
 
    private boolean clusterConnection;
-   
+
    private String identity;
 
-   private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
+   private final Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
 
    private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
 
@@ -159,7 +160,7 @@
    private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
 
    private static ExecutorService globalThreadPool;
-   
+
    private Executor startExecutor;
 
    private static ScheduledExecutorService globalScheduledThreadPool;
@@ -476,7 +477,7 @@
    public void start(Executor executor) throws Exception
    {
       initialise();
-      
+
       this.startExecutor = executor;
 
       executor.execute(new Runnable()
@@ -650,7 +651,7 @@
          {
             long toWait = 30000;
             long start = System.currentTimeMillis();
-            while (!receivedTopology && toWait > 0)
+            while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && toWait > 0)
             {
                // Now wait for the topology
 
@@ -674,12 +675,14 @@
                throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
                                           "Timed out waiting to receive cluster topology");
             }
+
          }
 
          addFactory(factory);
 
          return factory;
       }
+
    }
 
    public boolean isHA()
@@ -1037,7 +1040,7 @@
          throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
       }
    }
-   
+
    public void setIdentity(String identity)
    {
       this.identity = identity;
@@ -1107,7 +1110,7 @@
 
       if (log.isDebugEnabled())
       {
-         log.debug("YYY " + this + " is calling close", new Exception ("trace"));
+         log.debug("YYY " + this + " is calling close", new Exception("trace"));
       }
 
       closing = true;
@@ -1188,7 +1191,7 @@
          }
          return;
       }
-      
+
       if (log.isDebugEnabled())
       {
          log.debug("XXX YYY nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
@@ -1229,7 +1232,11 @@
       {
          if (log.isDebugEnabled())
          {
-            log.debug(this + "::Ignoring notifyNodeUp for " + nodeID + " connectorPair=" + connectorPair + ", since ha=false and clusterConnection=false");
+            log.debug(this + "::Ignoring notifyNodeUp for " +
+                      nodeID +
+                      " connectorPair=" +
+                      connectorPair +
+                      ", since ha=false and clusterConnection=false");
          }
          return;
       }
@@ -1277,9 +1284,11 @@
    @Override
    public String toString()
    {
-      if (clusterConnection)
+      if (identity != null)
       {
-         return "ServerLocatorImpl (clusterConnection identity="  + identity + ") [initialConnectors=" + Arrays.toString(initialConnectors) +
+         return "ServerLocatorImpl (identity=" + identity +
+                ") [initialConnectors=" +
+                Arrays.toString(initialConnectors) +
                 ", discoveryGroupConfiguration=" +
                 discoveryGroupConfiguration +
                 "]";
@@ -1444,10 +1453,14 @@
                            }
                         }
                      });
-                     
+
                      if (log.isDebugEnabled())
                      {
-                        log.debug("XXX Returning " + csf + " after " + retryNumber + " retries on StaticConnector " + ServerLocatorImpl.this);
+                        log.debug("XXX Returning " + csf +
+                                  " after " +
+                                  retryNumber +
+                                  " retries on StaticConnector " +
+                                  ServerLocatorImpl.this);
                      }
 
                      return csf;

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-07-27 20:03:39 UTC (rev 11059)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-07-28 01:32:47 UTC (rev 11060)
@@ -625,7 +625,7 @@
       BridgeImpl.log.debug("Connecting  " + this + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
 
       retryCount++;
-
+      
       try
       {
          if (csf == null || csf.isClosed())
@@ -712,12 +712,15 @@
 
             // We are not going to count this one as a retry
             retryCount--;
-            scheduleRetryConnectFixedTimeout(100);
+            scheduleRetryConnectFixedTimeout(this.retryInterval);
             return;
          }
          else
          {
-            BridgeImpl.log.warn("Bridge " + this + " is unable to connect to destination. Retrying", e);
+            if (log.isDebugEnabled())
+            {
+               log.debug("Bridge " + this + " is unable to connect to destination. Retrying", e);
+            }
          }
       }
       catch (Exception e)

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-07-27 20:03:39 UTC (rev 11059)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-07-28 01:32:47 UTC (rev 11060)
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -31,9 +32,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -498,9 +497,9 @@
          {
             log.error("Failed to close flow record", e);
          }
+         
+         server.getClusterManager().notifyNodeDown(nodeID);
       }
-      
-      server.getClusterManager().notifyNodeDown(nodeID);
    }
 
 
@@ -538,6 +537,7 @@
       // and empty static connectors to create bridges... ulgy!
       if (serverLocator == null)
       {
+          log.warn("ServerLocator==null FixME!!!!!");
           return;
       }
       /*we dont create bridges to backups*/
@@ -633,7 +633,7 @@
    protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
    {
       
-      ServerLocatorInternal targetLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+      final ServerLocatorInternal targetLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
       
       targetLocator.setReconnectAttempts(0);
 
@@ -656,8 +656,6 @@
 
       targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
       
-      targetLocator.addClusterTopologyListener(this);
-
       if(retryInterval > 0)
       {
          targetLocator.setRetryInterval(retryInterval);

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-07-27 20:03:39 UTC (rev 11059)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-07-28 01:32:47 UTC (rev 11060)
@@ -265,7 +265,7 @@
          return;
       }
       
-      log.info("XXX " + this + "::removing nodeID=" + nodeID);
+      log.debug("XXX " + this + "::removing nodeID=" + nodeID, new Exception ("trace"));
 
       boolean removed = topology.removeMember(nodeID);
 



More information about the hornetq-commits mailing list