[hornetq-commits] JBoss hornetq SVN: r11063 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 27 23:47:42 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-07-27 23:47:39 -0400 (Wed, 27 Jul 2011)
New Revision: 11063

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
fixes

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-28 03:47:39 UTC (rev 11063)
@@ -648,28 +648,22 @@
 
          if (ha || clusterConnection)
          {
-            long toWait = 30000;
-            long start = System.currentTimeMillis();
-            while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && toWait > 0)
+            long timeout = System.currentTimeMillis() + 30000;
+            while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && timeout > System.currentTimeMillis())
             {
                // Now wait for the topology
-
+               
                try
                {
-                  wait(toWait);
+                  wait(1000);
                }
                catch (InterruptedException ignore)
                {
                }
 
-               long now = System.currentTimeMillis();
-
-               toWait -= now - start;
-
-               start = now;
             }
 
-            if (toWait <= 0)
+            if (System.currentTimeMillis() > timeout && ! receivedTopology && !closed && !closing)
             {
                throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
                                           "Timed out waiting to receive cluster topology");

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-07-28 03:47:39 UTC (rev 11063)
@@ -24,7 +24,6 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -125,7 +124,10 @@
 
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
    
-   public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
+   private final ClusterManagerImpl manager;
+   
+   public ClusterConnectionImpl(final ClusterManagerImpl manager,
+                                final TransportConfiguration[] tcConfigs,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
                                 final SimpleString address,
@@ -199,6 +201,8 @@
       this.clusterPassword = clusterPassword;
 
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+      
+      this.manager = manager;
 
       clusterConnector = new StaticClusterConnector(tcConfigs);
 
@@ -214,7 +218,8 @@
 
    }
 
-   public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
+   public ClusterConnectionImpl(final ClusterManagerImpl manager,
+                                DiscoveryGroupConfiguration dg,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
                                 final SimpleString address,
@@ -290,6 +295,8 @@
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
 
       clusterConnector = new DiscoveryClusterConnector(dg);
+      
+      this.manager = manager;
    }
 
    public synchronized void start() throws Exception
@@ -646,7 +653,6 @@
       targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
       targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
       targetLocator.setClusterConnection(true);
-      targetLocator.setIdentity("(Cluster-connection-bridge::" + this.toString() + ")");
       
       targetLocator.setRetryInterval(retryInterval);
       targetLocator.setMaxRetryInterval(maxRetryInterval);
@@ -660,6 +666,8 @@
       {
          targetLocator.setRetryInterval(retryInterval);
       }
+      
+      manager.addClusterLocator(targetLocator);
 
       ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
                                                                    targetLocator,
@@ -687,7 +695,10 @@
                                                                    record,
                                                                    record.getConnector());
 
-       return bridge;
+
+      targetLocator.setIdentity("(Cluster-connection-bridge::"  + bridge.toString() + "::" + this.toString() + ")");
+
+      return bridge;
    }
 
    // Inner classes -----------------------------------------------------------------------------------

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-07-28 03:47:39 UTC (rev 11063)
@@ -19,7 +19,6 @@
 import java.io.StringWriter;
 import java.lang.reflect.Array;
 import java.net.InetAddress;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,6 +36,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.client.impl.TopologyMember;
@@ -106,7 +106,7 @@
 
    private volatile ServerLocatorInternal backupServerLocator;
 
-   private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
+   private final Set<ServerLocator> clusterLocators = new ConcurrentHashSet<ServerLocator>();
 
    private final Executor executor;
 
@@ -245,9 +245,11 @@
          backupServerLocator = null;
       }
 
-      for (ServerLocatorInternal clusterLocator : clusterLocators)
+      for (ServerLocator clusterLocator : clusterLocators)
       {
+         log.info("WWW Closing clusterLocator " + clusterLocator);
          clusterLocator.close();
+         log.info("WWW Closed clusterLocator " + clusterLocator);
       }
       clusterLocators.clear();
       started = false;
@@ -483,6 +485,11 @@
          log.warn("no cluster connections defined, unable to announce backup");
       }
    }
+   
+   void addClusterLocator(final ServerLocatorInternal serverLocator)
+   {
+      this.clusterLocators.add(serverLocator);
+   }
 
    private synchronized void announceNode()
    {
@@ -721,7 +728,9 @@
          log.debug("Bridge " + config.getName() + 
                    " is configured to not use duplicate detecion, it will send messages synchronously");
       }
+      
       clusterLocators.add(serverLocator);
+      
       Bridge bridge = new BridgeImpl(serverLocator,
                                      config.getReconnectAttempts(),
                                      config.getRetryInterval(),
@@ -819,7 +828,8 @@
             log.debug("XXX " + this + " Starting a Discovery Group Cluster Connection, name=" + config.getDiscoveryGroupName() + ", dg=" + dg);
          }
 
-         clusterConnection = new ClusterConnectionImpl(dg,
+         clusterConnection = new ClusterConnectionImpl(this,
+                                                       dg,
                                                        connector,
                                                        new SimpleString(config.getName()),
                                                        new SimpleString(config.getAddress()),
@@ -854,7 +864,8 @@
             log.debug("XXX " + this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
          }
 
-         clusterConnection = new ClusterConnectionImpl(tcConfigs,
+         clusterConnection = new ClusterConnectionImpl(this,
+                                                       tcConfigs,
                                                        connector,
                                                        new SimpleString(config.getName()),
                                                        new SimpleString(config.getAddress()),



More information about the hornetq-commits mailing list