Author: ataylor
Date: 2010-12-10 09:56:36 -0500 (Fri, 10 Dec 2010)
New Revision: 10031
Modified:
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
Log:
moved creation of server locator into cluster connection
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-09 22:24:10
UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-10 14:56:36
UTC (rev 10031)
@@ -139,6 +139,8 @@
private volatile boolean closed;
+ private volatile boolean closing;
+
private final List<Interceptor> interceptors = new
CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
@@ -408,9 +410,27 @@
}
}
- public void start() throws Exception
+ public void start(Executor executor) throws Exception
{
initialise();
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if(!closing)
+ {
+ log.warn("did not connect the cluster connection to other
nodes", e);
+ }
+ }
+ }
+ });
}
public ClientSessionFactory connect() throws Exception
@@ -1000,6 +1020,8 @@
return;
}
+ closing = true;
+
if (discoveryGroup != null)
{
try
@@ -1057,6 +1079,7 @@
}
}
}
+ readOnly = false;
closed = true;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-12-09
22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-12-10
14:56:36 UTC (rev 10031)
@@ -19,6 +19,8 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
+import java.util.concurrent.Executor;
+
/**
* A ServerLocatorInternal
*
@@ -28,7 +30,7 @@
*/
public interface ServerLocatorInternal extends ServerLocator
{
- void start() throws Exception;
+ void start(Executor executor) throws Exception;
void factoryClosed(final ClientSessionFactory factory);
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-09
22:24:10 UTC (rev 10030)
+++
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-10
14:56:36 UTC (rev 10031)
@@ -16,17 +16,16 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -35,6 +34,7 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -75,6 +75,8 @@
private final SimpleString address;
+ private final long retryInterval;
+
private final boolean useDuplicateDetection;
private final boolean routeWhenNoConsumers;
@@ -95,7 +97,9 @@
private final String clusterPassword;
- private final ServerLocatorInternal serverLocator;
+ private final ClusterConnector clusterConnector;
+
+ private ServerLocatorInternal serverLocator;
private final TransportConfiguration connector;
@@ -103,7 +107,7 @@
private final Set<TransportConfiguration> allowableConnections = new
HashSet<TransportConfiguration>();
- public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
+ public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
@@ -130,39 +134,89 @@
}
this.nodeUUID = nodeUUID;
-
- this.serverLocator = serverLocator;
+ this.connector = connector;
+
+ this.name = name;
+
+ this.address = address;
+
+ this.retryInterval = retryInterval;
+
+ this.useDuplicateDetection = useDuplicateDetection;
+
+ this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+ this.executorFactory = executorFactory;
+
+ this.server = server;
+
+ this.postOffice = postOffice;
+
+ this.managementService = managementService;
+
+ this.scheduledExecutor = scheduledExecutor;
+
+ this.maxHops = maxHops;
+
+ this.backup = backup;
+
+ this.clusterUser = clusterUser;
+
+ this.clusterPassword = clusterPassword;
+
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
- if (this.serverLocator != null)
+
+ clusterConnector = new StaticClusterConnector(tcConfigs);
+
+ if (tcConfigs != null && tcConfigs.length > 0)
{
- this.serverLocator.setClusterConnection(true);
- this.serverLocator.setClusterTransportConfiguration(connector);
- this.serverLocator.setBackup(server.getConfiguration().isBackup());
- this.serverLocator.setInitialConnectAttempts(-1);
- if(retryInterval > 0)
- {
- this.serverLocator.setRetryInterval(retryInterval);
- }
-
// a cluster connection will connect to other nodes only if they are directly
connected
// through a static list of connectors or broadcasting using UDP.
- TransportConfiguration[] transportConfigurations =
serverLocator.getStaticTransportConfigurations();
- if(this.allowDirectConnectionsOnly)
+ if(allowDirectConnectionsOnly)
{
- for (TransportConfiguration transportConfiguration :
transportConfigurations)
- {
- allowableConnections.add(transportConfiguration);
- }
+ allowableConnections.addAll(Arrays.asList(tcConfigs));
}
}
+ }
+
+ public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
+ final TransportConfiguration connector,
+ final SimpleString name,
+ final SimpleString address,
+ final long retryInterval,
+ final boolean useDuplicateDetection,
+ final boolean routeWhenNoConsumers,
+ final int confirmationWindowSize,
+ final ExecutorFactory executorFactory,
+ final HornetQServer server,
+ final PostOffice postOffice,
+ final ManagementService managementService,
+ final ScheduledExecutorService scheduledExecutor,
+ final int maxHops,
+ final UUID nodeUUID,
+ final boolean backup,
+ final String clusterUser,
+ final String clusterPassword,
+ final boolean allowDirectConnectionsOnly) throws
Exception
+ {
+
+ if (nodeUUID == null)
+ {
+ throw new IllegalArgumentException("node id is null");
+ }
+
+ this.nodeUUID = nodeUUID;
+
this.connector = connector;
this.name = name;
this.address = address;
+ this.retryInterval = retryInterval;
+
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -184,6 +238,10 @@
this.clusterUser = clusterUser;
this.clusterPassword = clusterPassword;
+
+ this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+
+ clusterConnector = new DiscoveryClusterConnector(dg);
}
public synchronized void start() throws Exception
@@ -238,6 +296,12 @@
managementService.sendNotification(notification);
}
+ if(serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
+
started = false;
}
}
@@ -279,29 +343,28 @@
backup = false;
+ serverLocator = clusterConnector.createServerLocator();
+
if (serverLocator != null)
{
+ serverLocator.setNodeID(nodeUUID.toString());
+
+ serverLocator.setReconnectAttempts(-1);
+
+ serverLocator.setClusterConnection(true);
+ serverLocator.setClusterTransportConfiguration(connector);
+ serverLocator.setBackup(server.getConfiguration().isBackup());
+ serverLocator.setInitialConnectAttempts(-1);
+
+ if(retryInterval > 0)
+ {
+ this.serverLocator.setRetryInterval(retryInterval);
+ }
+
serverLocator.addClusterTopologyListener(this);
- serverLocator.start();
- // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
- server.getExecutorFactory().getExecutor().execute(new Runnable()
- {
- public void run()
- {
- try
- {
- serverLocator.connect();
- }
- catch (Exception e)
- {
- if(started)
- {
- log.warn("did not connect the cluster connection to other
nodes", e);
- }
- }
- }
- });
+
+ serverLocator.start(server.getExecutorFactory().getExecutor());
}
if (managementService != null)
@@ -909,4 +972,46 @@
return out;
}
+
+ interface ClusterConnector
+ {
+ ServerLocatorInternal createServerLocator();
+ }
+
+ private class StaticClusterConnector implements ClusterConnector
+ {
+ private final TransportConfiguration[] tcConfigs;
+
+ public StaticClusterConnector(TransportConfiguration[] tcConfigs)
+ {
+ this.tcConfigs = tcConfigs;
+ }
+
+ public ServerLocatorInternal createServerLocator()
+ {
+ if(tcConfigs != null && tcConfigs.length > 0)
+ {
+ return (ServerLocatorInternal)
HornetQClient.createServerLocatorWithHA(tcConfigs);
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ private class DiscoveryClusterConnector implements ClusterConnector
+ {
+ private final DiscoveryGroupConfiguration dg;
+
+ public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg)
+ {
+ this.dg = dg;
+ }
+
+ public ServerLocatorInternal createServerLocator()
+ {
+ return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-09
22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-10
14:56:36 UTC (rev 10031)
@@ -732,18 +732,16 @@
return;
}
- ServerLocatorInternal serverLocator;
- if (config.getStaticConnectors() != null &&
config.getStaticConnectors().size() > 0)
+ if(clusterConnections.containsKey(config.getName()))
{
- TransportConfiguration[] tcConfigs =
connectorNameListToArray(config.getStaticConnectors());
+ log.warn("Clustwer Configuration '" + config.getConnectorName() +
"' already exists. The cluster connection will not be deployed.");
+ return;
+ }
- serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
- serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setReconnectAttempts(-1);
- clusterLocators.add(serverLocator);
- }
- else if (config.getDiscoveryGroupName() != null)
+ ClusterConnectionImpl clusterConnection;
+
+ if (config.getDiscoveryGroupName() != null)
{
DiscoveryGroupConfiguration dg =
configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
@@ -754,38 +752,51 @@
"'. The cluster connection will not be
deployed.");
}
- serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg);
- serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setReconnectAttempts(-1);
- clusterLocators.add(serverLocator);
+ clusterConnection = new ClusterConnectionImpl(dg,
+ connector,
+ new
SimpleString(config.getName()),
+ new
SimpleString(config.getAddress()),
+ config.getRetryInterval(),
+ config.isDuplicateDetection(),
+
config.isForwardWhenNoConsumers(),
+
config.getConfirmationWindowSize(),
+ executorFactory,
+ server,
+ postOffice,
+ managementService,
+ scheduledExecutor,
+ config.getMaxHops(),
+ nodeUUID,
+ backup,
+
server.getConfiguration().getClusterUser(),
+
server.getConfiguration().getClusterPassword(),
+
config.isAllowDirectConnectionsOnly());
}
else
{
- // no connector or discovery group are defined. The cluster connection will only
be a target and will
- // no connect to other nodes in the cluster
- serverLocator = null;
+ TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null?
connectorNameListToArray(config.getStaticConnectors()):null;
+
+ clusterConnection = new ClusterConnectionImpl(tcConfigs,
+ connector,
+ new
SimpleString(config.getName()),
+ new
SimpleString(config.getAddress()),
+ config.getRetryInterval(),
+ config.isDuplicateDetection(),
+
config.isForwardWhenNoConsumers(),
+
config.getConfirmationWindowSize(),
+ executorFactory,
+ server,
+ postOffice,
+ managementService,
+ scheduledExecutor,
+ config.getMaxHops(),
+ nodeUUID,
+ backup,
+
server.getConfiguration().getClusterUser(),
+
server.getConfiguration().getClusterPassword(),
+
config.isAllowDirectConnectionsOnly());
}
- ClusterConnectionImpl clusterConnection = new ClusterConnectionImpl(serverLocator,
- connector,
- new
SimpleString(config.getName()),
- new
SimpleString(config.getAddress()),
-
config.getRetryInterval(),
-
config.isDuplicateDetection(),
-
config.isForwardWhenNoConsumers(),
-
config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
-
config.getMaxHops(),
- nodeUUID,
- backup,
-
server.getConfiguration().getClusterUser(),
-
server.getConfiguration().getClusterPassword(),
-
config.isAllowDirectConnectionsOnly());
-
managementService.registerCluster(clusterConnection, config);
clusterConnections.put(config.getName(), clusterConnection);
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-12-09
22:24:10 UTC (rev 10030)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-12-10
14:56:36 UTC (rev 10031)
@@ -774,11 +774,6 @@
public void testRouteWhenNoConsumersTrueNonBalancedQueues() throws Exception
{
- // server #0 is connected to server #1
- setupClusterConnection("cluster1", 0, 1, "queues", true, 1,
isNetty(), true);
- // server #1 is connected to nobody
- setupClusterConnection("clusterX", 1, -1, "queues", false, 1,
isNetty(), true);
-
startServers(1, 0);
setupSessionFactory(0, isNetty(), true);