[hornetq-commits] JBoss hornetq SVN: r10031 - in trunk: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Dec 10 09:56:36 EST 2010
Author: ataylor
Date: 2010-12-10 09:56:36 -0500 (Fri, 10 Dec 2010)
New Revision: 10031
Modified:
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
Log:
moved creation of server locator into cluster connection
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-10 14:56:36 UTC (rev 10031)
@@ -139,6 +139,8 @@
private volatile boolean closed;
+ private volatile boolean closing;
+
private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
@@ -408,9 +410,27 @@
}
}
- public void start() throws Exception
+ public void start(Executor executor) throws Exception
{
initialise();
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if(!closing)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
}
public ClientSessionFactory connect() throws Exception
@@ -1000,6 +1020,8 @@
return;
}
+ closing = true;
+
if (discoveryGroup != null)
{
try
@@ -1057,6 +1079,7 @@
}
}
}
+ readOnly = false;
closed = true;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-12-10 14:56:36 UTC (rev 10031)
@@ -19,6 +19,8 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
+import java.util.concurrent.Executor;
+
/**
* A ServerLocatorInternal
*
@@ -28,7 +30,7 @@
*/
public interface ServerLocatorInternal extends ServerLocator
{
- void start() throws Exception;
+ void start(Executor executor) throws Exception;
void factoryClosed(final ClientSessionFactory factory);
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-10 14:56:36 UTC (rev 10031)
@@ -16,17 +16,16 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -35,6 +34,7 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -75,6 +75,8 @@
private final SimpleString address;
+ private final long retryInterval;
+
private final boolean useDuplicateDetection;
private final boolean routeWhenNoConsumers;
@@ -95,7 +97,9 @@
private final String clusterPassword;
- private final ServerLocatorInternal serverLocator;
+ private final ClusterConnector clusterConnector;
+
+ private ServerLocatorInternal serverLocator;
private final TransportConfiguration connector;
@@ -103,7 +107,7 @@
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
- public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
+ public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
@@ -130,39 +134,89 @@
}
this.nodeUUID = nodeUUID;
-
- this.serverLocator = serverLocator;
+ this.connector = connector;
+
+ this.name = name;
+
+ this.address = address;
+
+ this.retryInterval = retryInterval;
+
+ this.useDuplicateDetection = useDuplicateDetection;
+
+ this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+ this.executorFactory = executorFactory;
+
+ this.server = server;
+
+ this.postOffice = postOffice;
+
+ this.managementService = managementService;
+
+ this.scheduledExecutor = scheduledExecutor;
+
+ this.maxHops = maxHops;
+
+ this.backup = backup;
+
+ this.clusterUser = clusterUser;
+
+ this.clusterPassword = clusterPassword;
+
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
- if (this.serverLocator != null)
+
+ clusterConnector = new StaticClusterConnector(tcConfigs);
+
+ if (tcConfigs != null && tcConfigs.length > 0)
{
- this.serverLocator.setClusterConnection(true);
- this.serverLocator.setClusterTransportConfiguration(connector);
- this.serverLocator.setBackup(server.getConfiguration().isBackup());
- this.serverLocator.setInitialConnectAttempts(-1);
- if(retryInterval > 0)
- {
- this.serverLocator.setRetryInterval(retryInterval);
- }
-
// a cluster connection will connect to other nodes only if they are directly connected
// through a static list of connectors or broadcasting using UDP.
- TransportConfiguration[] transportConfigurations = serverLocator.getStaticTransportConfigurations();
- if(this.allowDirectConnectionsOnly)
+ if(allowDirectConnectionsOnly)
{
- for (TransportConfiguration transportConfiguration : transportConfigurations)
- {
- allowableConnections.add(transportConfiguration);
- }
+ allowableConnections.addAll(Arrays.asList(tcConfigs));
}
}
+ }
+
+ public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
+ final TransportConfiguration connector,
+ final SimpleString name,
+ final SimpleString address,
+ final long retryInterval,
+ final boolean useDuplicateDetection,
+ final boolean routeWhenNoConsumers,
+ final int confirmationWindowSize,
+ final ExecutorFactory executorFactory,
+ final HornetQServer server,
+ final PostOffice postOffice,
+ final ManagementService managementService,
+ final ScheduledExecutorService scheduledExecutor,
+ final int maxHops,
+ final UUID nodeUUID,
+ final boolean backup,
+ final String clusterUser,
+ final String clusterPassword,
+ final boolean allowDirectConnectionsOnly) throws Exception
+ {
+
+ if (nodeUUID == null)
+ {
+ throw new IllegalArgumentException("node id is null");
+ }
+
+ this.nodeUUID = nodeUUID;
+
this.connector = connector;
this.name = name;
this.address = address;
+ this.retryInterval = retryInterval;
+
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -184,6 +238,10 @@
this.clusterUser = clusterUser;
this.clusterPassword = clusterPassword;
+
+ this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+
+ clusterConnector = new DiscoveryClusterConnector(dg);
}
public synchronized void start() throws Exception
@@ -238,6 +296,12 @@
managementService.sendNotification(notification);
}
+ if(serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
+
started = false;
}
}
@@ -279,29 +343,28 @@
backup = false;
+ serverLocator = clusterConnector.createServerLocator();
+
if (serverLocator != null)
{
+ serverLocator.setNodeID(nodeUUID.toString());
+
+ serverLocator.setReconnectAttempts(-1);
+
+ serverLocator.setClusterConnection(true);
+ serverLocator.setClusterTransportConfiguration(connector);
+ serverLocator.setBackup(server.getConfiguration().isBackup());
+ serverLocator.setInitialConnectAttempts(-1);
+
+ if(retryInterval > 0)
+ {
+ this.serverLocator.setRetryInterval(retryInterval);
+ }
+
serverLocator.addClusterTopologyListener(this);
- serverLocator.start();
- // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
- server.getExecutorFactory().getExecutor().execute(new Runnable()
- {
- public void run()
- {
- try
- {
- serverLocator.connect();
- }
- catch (Exception e)
- {
- if(started)
- {
- log.warn("did not connect the cluster connection to other nodes", e);
- }
- }
- }
- });
+
+ serverLocator.start(server.getExecutorFactory().getExecutor());
}
if (managementService != null)
@@ -909,4 +972,46 @@
return out;
}
+
+ interface ClusterConnector
+ {
+ ServerLocatorInternal createServerLocator();
+ }
+
+ private class StaticClusterConnector implements ClusterConnector
+ {
+ private final TransportConfiguration[] tcConfigs;
+
+ public StaticClusterConnector(TransportConfiguration[] tcConfigs)
+ {
+ this.tcConfigs = tcConfigs;
+ }
+
+ public ServerLocatorInternal createServerLocator()
+ {
+ if(tcConfigs != null && tcConfigs.length > 0)
+ {
+ return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ private class DiscoveryClusterConnector implements ClusterConnector
+ {
+ private final DiscoveryGroupConfiguration dg;
+
+ public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg)
+ {
+ this.dg = dg;
+ }
+
+ public ServerLocatorInternal createServerLocator()
+ {
+ return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-10 14:56:36 UTC (rev 10031)
@@ -732,18 +732,16 @@
return;
}
- ServerLocatorInternal serverLocator;
- if (config.getStaticConnectors() != null && config.getStaticConnectors().size() > 0)
+ if(clusterConnections.containsKey(config.getName()))
{
- TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
+ log.warn("Clustwer Configuration '" + config.getConnectorName() + "' already exists. The cluster connection will not be deployed.");
+ return;
+ }
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
- serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setReconnectAttempts(-1);
- clusterLocators.add(serverLocator);
- }
- else if (config.getDiscoveryGroupName() != null)
+ ClusterConnectionImpl clusterConnection;
+
+ if (config.getDiscoveryGroupName() != null)
{
DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
@@ -754,38 +752,51 @@
"'. The cluster connection will not be deployed.");
}
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg);
- serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setReconnectAttempts(-1);
- clusterLocators.add(serverLocator);
+ clusterConnection = new ClusterConnectionImpl(dg,
+ connector,
+ new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getRetryInterval(),
+ config.isDuplicateDetection(),
+ config.isForwardWhenNoConsumers(),
+ config.getConfirmationWindowSize(),
+ executorFactory,
+ server,
+ postOffice,
+ managementService,
+ scheduledExecutor,
+ config.getMaxHops(),
+ nodeUUID,
+ backup,
+ server.getConfiguration().getClusterUser(),
+ server.getConfiguration().getClusterPassword(),
+ config.isAllowDirectConnectionsOnly());
}
else
{
- // no connector or discovery group are defined. The cluster connection will only be a target and will
- // no connect to other nodes in the cluster
- serverLocator = null;
+ TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null? connectorNameListToArray(config.getStaticConnectors()):null;
+
+ clusterConnection = new ClusterConnectionImpl(tcConfigs,
+ connector,
+ new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getRetryInterval(),
+ config.isDuplicateDetection(),
+ config.isForwardWhenNoConsumers(),
+ config.getConfirmationWindowSize(),
+ executorFactory,
+ server,
+ postOffice,
+ managementService,
+ scheduledExecutor,
+ config.getMaxHops(),
+ nodeUUID,
+ backup,
+ server.getConfiguration().getClusterUser(),
+ server.getConfiguration().getClusterPassword(),
+ config.isAllowDirectConnectionsOnly());
}
- ClusterConnectionImpl clusterConnection = new ClusterConnectionImpl(serverLocator,
- connector,
- new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getRetryInterval(),
- config.isDuplicateDetection(),
- config.isForwardWhenNoConsumers(),
- config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- config.getMaxHops(),
- nodeUUID,
- backup,
- server.getConfiguration().getClusterUser(),
- server.getConfiguration().getClusterPassword(),
- config.isAllowDirectConnectionsOnly());
-
managementService.registerCluster(clusterConnection, config);
clusterConnections.put(config.getName(), clusterConnection);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-12-10 14:56:36 UTC (rev 10031)
@@ -774,11 +774,6 @@
public void testRouteWhenNoConsumersTrueNonBalancedQueues() throws Exception
{
- // server #0 is connected to server #1
- setupClusterConnection("cluster1", 0, 1, "queues", true, 1, isNetty(), true);
- // server #1 is connected to nobody
- setupClusterConnection("clusterX", 1, -1, "queues", false, 1, isNetty(), true);
-
startServers(1, 0);
setupSessionFactory(0, isNetty(), true);
More information about the hornetq-commits
mailing list