[hornetq-commits] JBoss hornetq SVN: r9430 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jul 20 10:41:47 EDT 2010


Author: jmesnil
Date: 2010-07-20 10:41:46 -0400 (Tue, 20 Jul 2010)
New Revision: 9430

Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
HA refactoring

* fix receiving list of initial connectors when using discovery group
* add start() method to ServerLocatorInternal to create and start the discovery group used by the server locator (prior to create any session factory)
* add setNodeID() method to ServerLocatorInternal so that the serverlocator created by the server's cluster manager will use the server's nodeID when it creates its discovery group

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-07-20 14:41:46 UTC (rev 9430)
@@ -18,6 +18,7 @@
 import java.net.InetAddress;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -161,6 +162,8 @@
 
    private String groupID;
 
+   private String nodeID;
+
    private static synchronized ExecutorService getGlobalThreadPool()
    {
       if (globalThreadPool == null)
@@ -283,7 +286,7 @@
                lbAddress = null;
             }
 
-            discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+            discoveryGroup = new DiscoveryGroupImpl(nodeID,
                                                     discoveryAddress,
                                                     lbAddress,
                                                     groupAddress,
@@ -312,6 +315,8 @@
 
       this.initialConnectors = transportConfigs;
 
+      this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
+      
       discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
 
       clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
@@ -412,6 +417,11 @@
       }
    }
 
+   public void start() throws Exception
+   {
+      initialise();
+   }
+   
    public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
    {
       if (closed)
@@ -514,7 +524,7 @@
                {
                   attempts++;
 
-                  if (attempts == topologyArray.length)
+                  if (topologyArray != null && attempts == topologyArray.length)
                   {
                      throw new HornetQException(HornetQException.NOT_CONNECTED,
                                                 "Cannot connect to server(s). Tried with all available servers.");
@@ -944,6 +954,11 @@
       }
    }
 
+   public void setNodeID(String nodeID)
+   {
+      this.nodeID = nodeID;
+   }
+
    @Override
    protected void finalize() throws Throwable
    {
@@ -1100,6 +1115,8 @@
       {
          this.initialConnectors[count++] = entry.getConnector();
       }
+      
+      System.out.println(">>>>>>>> Initial connectors = " + Arrays.asList(initialConnectors));
    }
 
    public synchronized void factoryClosed(final ClientSessionFactory factory)

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-07-20 14:41:46 UTC (rev 9430)
@@ -27,7 +27,12 @@
  */
 public interface ServerLocatorInternal extends ServerLocator, ClusterTopologyListener
 {
+   void start() throws Exception;
+   
    void factoryClosed(final ClientSessionFactory factory);
 
    TransportConfiguration getBackup( TransportConfiguration live);
+
+   void setNodeID(String nodeID);
+
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2010-07-20 14:41:46 UTC (rev 9430)
@@ -266,7 +266,7 @@
             log.warn("There are more than one servers on the network broadcasting the same node id. " + "You will see this message exactly once (per node) if a node is restarted, in which case it can be safely "
                      + "ignored. But if it is logged continuously it means you really do have more than one node on the same network "
                      + "active concurrently with the same node id. This could occur if you have a backup node active at the same time as "
-                     + "its live node.");
+                     + "its live node. nodeID=" + originatingNodeID);
             uniqueIDMap.put(originatingNodeID, uniqueID);
          }
       }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-07-20 14:41:46 UTC (rev 9430)
@@ -27,9 +27,9 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -96,11 +96,11 @@
 
    private Pair<TransportConfiguration, TransportConfiguration>[] topology;
 
-   private final ServerLocator serverLocator;
+   private final ServerLocatorInternal serverLocator;
    
    private final TransportConfiguration connector;
 
-   public ClusterConnectionImpl(final ServerLocator serverLocator,
+   public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
                                 final SimpleString address,
@@ -165,7 +165,8 @@
       }
 
       serverLocator.registerTopologyListener(this);
-
+      serverLocator.start();
+      
       started = true;
 
       if (managementService != null)

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-07-20 14:41:46 UTC (rev 9430)
@@ -31,6 +31,7 @@
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.BroadcastGroupConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -574,13 +575,13 @@
          return;
       }
 
-      ServerLocator serverLocator;
+      ServerLocatorInternal serverLocator;
 
       if (config.getStaticConnectors() != null)
       {
          TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
 
-         serverLocator = HornetQClient.createServerLocatorWithHA(tcConfigs);
+         serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
       }
       else
       {
@@ -593,9 +594,10 @@
                                         "'. The cluster connection will not be deployed.");
          }
 
-         serverLocator = HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(), dg.getGroupPort());
+         serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(), dg.getGroupPort());
       }
 
+      serverLocator.setNodeID(nodeUUID.toString());
       ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
                                                                       connector,
                                                                       new SimpleString(config.getName()),



More information about the hornetq-commits mailing list