[hornetq-commits] JBoss hornetq SVN: r9430 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: cluster/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Jul 20 10:41:47 EDT 2010
Author: jmesnil
Date: 2010-07-20 10:41:46 -0400 (Tue, 20 Jul 2010)
New Revision: 9430
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
HA refactoring
* fix receiving list of initial connectors when using discovery group
* add start() method to ServerLocatorInternal to create and start the discovery group used by the server locator (prior to create any session factory)
* add setNodeID() method to ServerLocatorInternal so that the serverlocator created by the server's cluster manager will use the server's nodeID when it creates its discovery group
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-20 14:41:46 UTC (rev 9430)
@@ -18,6 +18,7 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -161,6 +162,8 @@
private String groupID;
+ private String nodeID;
+
private static synchronized ExecutorService getGlobalThreadPool()
{
if (globalThreadPool == null)
@@ -283,7 +286,7 @@
lbAddress = null;
}
- discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+ discoveryGroup = new DiscoveryGroupImpl(nodeID,
discoveryAddress,
lbAddress,
groupAddress,
@@ -312,6 +315,8 @@
this.initialConnectors = transportConfigs;
+ this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
+
discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
@@ -412,6 +417,11 @@
}
}
+ public void start() throws Exception
+ {
+ initialise();
+ }
+
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
if (closed)
@@ -514,7 +524,7 @@
{
attempts++;
- if (attempts == topologyArray.length)
+ if (topologyArray != null && attempts == topologyArray.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
@@ -944,6 +954,11 @@
}
}
+ public void setNodeID(String nodeID)
+ {
+ this.nodeID = nodeID;
+ }
+
@Override
protected void finalize() throws Throwable
{
@@ -1100,6 +1115,8 @@
{
this.initialConnectors[count++] = entry.getConnector();
}
+
+ System.out.println(">>>>>>>> Initial connectors = " + Arrays.asList(initialConnectors));
}
public synchronized void factoryClosed(final ClientSessionFactory factory)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-20 14:41:46 UTC (rev 9430)
@@ -27,7 +27,12 @@
*/
public interface ServerLocatorInternal extends ServerLocator, ClusterTopologyListener
{
+ void start() throws Exception;
+
void factoryClosed(final ClientSessionFactory factory);
TransportConfiguration getBackup( TransportConfiguration live);
+
+ void setNodeID(String nodeID);
+
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-07-20 14:41:46 UTC (rev 9430)
@@ -266,7 +266,7 @@
log.warn("There are more than one servers on the network broadcasting the same node id. " + "You will see this message exactly once (per node) if a node is restarted, in which case it can be safely "
+ "ignored. But if it is logged continuously it means you really do have more than one node on the same network "
+ "active concurrently with the same node id. This could occur if you have a backup node active at the same time as "
- + "its live node.");
+ + "its live node. nodeID=" + originatingNodeID);
uniqueIDMap.put(originatingNodeID, uniqueID);
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-20 14:41:46 UTC (rev 9430)
@@ -27,9 +27,9 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -96,11 +96,11 @@
private Pair<TransportConfiguration, TransportConfiguration>[] topology;
- private final ServerLocator serverLocator;
+ private final ServerLocatorInternal serverLocator;
private final TransportConfiguration connector;
- public ClusterConnectionImpl(final ServerLocator serverLocator,
+ public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
@@ -165,7 +165,8 @@
}
serverLocator.registerTopologyListener(this);
-
+ serverLocator.start();
+
started = true;
if (managementService != null)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-20 14:35:06 UTC (rev 9429)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-20 14:41:46 UTC (rev 9430)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -574,13 +575,13 @@
return;
}
- ServerLocator serverLocator;
+ ServerLocatorInternal serverLocator;
if (config.getStaticConnectors() != null)
{
TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
- serverLocator = HornetQClient.createServerLocatorWithHA(tcConfigs);
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
}
else
{
@@ -593,9 +594,10 @@
"'. The cluster connection will not be deployed.");
}
- serverLocator = HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(), dg.getGroupPort());
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(), dg.getGroupPort());
}
+ serverLocator.setNodeID(nodeUUID.toString());
ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
connector,
new SimpleString(config.getName()),
More information about the hornetq-commits
mailing list