Author: clebert.suconic(a)jboss.com
Date: 2011-09-27 16:42:05 -0400 (Tue, 27 Sep 2011)
New Revision: 11434
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/TransportConstants.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
Parameterizing the cluster-connection on the acceptors
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/TransportConstants.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/TransportConstants.java 2011-09-27
20:06:33 UTC (rev 11433)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/TransportConstants.java 2011-09-27
20:42:05 UTC (rev 11434)
@@ -34,12 +34,13 @@
{
Set<String> allowableAcceptorKeys = new HashSet<String>();
allowableAcceptorKeys.add(TransportConstants.SERVER_ID_PROP_NAME);
+
allowableAcceptorKeys.add(org.hornetq.core.remoting.impl.netty.TransportConstants.CLUSTER_CONNECTION);
- ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
-
+ ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
+
Set<String> allowableConnectorKeys = new HashSet<String>();
allowableConnectorKeys.add(TransportConstants.SERVER_ID_PROP_NAME);
- ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
+ ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2011-09-27
20:06:33 UTC (rev 11433)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java 2011-09-27
20:42:05 UTC (rev 11434)
@@ -74,6 +74,8 @@
public static final String DIRECT_DELIVER = "direct-deliver";
+ public static final String CLUSTER_CONNECTION = "cluster-connection";
+
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -151,6 +153,7 @@
allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
+ allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-27
20:06:33 UTC (rev 11433)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-27
20:42:05 UTC (rev 11434)
@@ -39,6 +39,7 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.impl.ServerSessionImpl;
import org.hornetq.core.server.management.ManagementService;
@@ -207,9 +208,10 @@
ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
ProtocolManager manager = protocolMap.get(protocol);
-
- // TODO: parameterize the cluster connection
- Acceptor acceptor =
factory.createAcceptor(clusterManager.getDefaultConnection(),
+
+ ClusterConnection clusterConnection = lookupClusterConnection(info);
+
+ Acceptor acceptor = factory.createAcceptor(clusterConnection,
info.getParams(),
new DelegatingBufferHandler(),
manager,
@@ -480,6 +482,24 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+
+ private ClusterConnection lookupClusterConnection(TransportConfiguration config)
+ {
+ String clusterConnectionName =
(String)config.getParams().get(org.hornetq.core.remoting.impl.netty.TransportConstants.CLUSTER_CONNECTION);
+
+ ClusterConnection clusterConnection = null;
+ if (clusterConnectionName != null)
+ {
+ clusterConnection = clusterManager.getClusterConnection(clusterConnectionName);
+ }
+
+ if (clusterConnection == null)
+ {
+ clusterConnection = clusterManager.getDefaultConnection();
+ }
+
+ return clusterConnection;
+ }
// Inner classes -------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-27
20:06:33 UTC (rev 11433)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-27
20:42:05 UTC (rev 11434)
@@ -40,7 +40,7 @@
*/
ClusterConnection getDefaultConnection();
- ClusterConnection getClusterConnection(SimpleString name);
+ ClusterConnection getClusterConnection(String name);
Set<BroadcastGroup> getBroadcastGroups();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-27
20:06:33 UTC (rev 11433)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-27
20:42:05 UTC (rev 11434)
@@ -57,6 +57,7 @@
* A ClusterManagerImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author Clebert Suconic
*
* Created 18 Nov 2008 09:23:49
*
@@ -302,9 +303,9 @@
return new HashSet<BroadcastGroup>(broadcastGroups.values());
}
- public ClusterConnection getClusterConnection(final SimpleString name)
+ public ClusterConnection getClusterConnection(final String name)
{
- return clusterConnections.get(name.toString());
+ return clusterConnections.get(name);
}
// backup node becomes live
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
(rev 0)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java 2011-09-27
20:42:05 UTC (rev 11434)
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A IsolatedTopologyTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class IsolatedTopologyTest extends ServiceTestBase
+{
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+
+ public void testIsolatedClusters() throws Exception
+ {
+
+ HornetQServer server1 = createServer1();
+
+ HornetQServer server2 = createServer2();
+
+ try
+ {
+ server1.start();
+ Thread.sleep(500);
+ server2.start();
+
+ Thread.sleep(2000);
+
+
System.out.println(server1.getClusterManager().getClusterConnection("cc1").getTopology().describe());
+
+
System.out.println(server1.getClusterManager().getClusterConnection("cc2").getTopology().describe());
+
+
System.out.println(server2.getClusterManager().getClusterConnection("cc1").getTopology().describe());
+
+
System.out.println(server2.getClusterManager().getClusterConnection("cc2").getTopology().describe());
+
+ waitForTopology(server1, "cc1", 2, 5000);
+
+ waitForTopology(server1, "cc2", 2, 5000);
+
+ waitForTopology(server2, "cc1", 2, 5000);
+
+ waitForTopology(server2, "cc2", 2, 5000);
+ }
+ finally
+ {
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server2.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+
+ private HornetQServer createServer1()
+ {
+ // Server1 with two acceptors, each acceptor on a different cluster connection
+ // talking to a different connector.
+ // i.e. two cluster connections isolated on the same node
+ Configuration config1 = createBasicConfig(0);
+
+ config1.setClustered(true);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.CLUSTER_CONNECTION, "cc1");
+
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME,
"1");
+
+ TransportConfiguration acceptor1VM1 = new
TransportConfiguration(INVM_ACCEPTOR_FACTORY, params, "acceptor-cc1");
+ config1.getAcceptorConfigurations().add(acceptor1VM1);
+
+ config1.getConnectorConfigurations().put("local-cc1",
createInVMTransportConnectorConfig(1, "local-cc1"));
+ config1.getConnectorConfigurations().put("local-cc2",
createInVMTransportConnectorConfig(2, "local-cc2"));
+
+ config1.getConnectorConfigurations().put("other-cc1",
createInVMTransportConnectorConfig(3, "other-cc1"));
+ config1.getConnectorConfigurations().put("other-cc2",
createInVMTransportConnectorConfig(4, "other-cc2"));
+
+
+ params = new HashMap<String, Object>();
+ params.put(TransportConstants.CLUSTER_CONNECTION, "cc2");
+
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME,
"2");
+
+ TransportConfiguration acceptor2VM1 = new
TransportConfiguration(INVM_ACCEPTOR_FACTORY, params, "acceptor-cc2");
+ config1.getAcceptorConfigurations().add(acceptor2VM1);
+
+ List<String> connectTo = new ArrayList<String>();
+ connectTo.add("other-cc1");
+
+ ClusterConnectionConfiguration server1CC1 = new
ClusterConnectionConfiguration("cc1",
+ "jms",
+ "local-cc1",
+ 250,
+ true,
+ false,
+ 1,
+ 1024,
+ connectTo,
+ false);
+
+ config1.getClusterConfigurations().add(server1CC1);
+
+ connectTo = new ArrayList<String>();
+ connectTo.add("other-cc2");
+
+ ClusterConnectionConfiguration server1CC2 = new
ClusterConnectionConfiguration("cc2",
+ "jms",
+ "local-cc2",
+ 250,
+ true,
+ false,
+ 1,
+ 1024,
+ connectTo,
+ false);
+
+ config1.getClusterConfigurations().add(server1CC2);
+
+ return createServer(false, config1);
+ }
+
+
+ private HornetQServer createServer2()
+ {
+ // Server1 with two acceptors, each acceptor on a different cluster connection
+ // talking to a different connector.
+ // i.e. two cluster connections isolated on the same node
+ Configuration config1 = createBasicConfig(3);
+
+ config1.setClustered(true);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.CLUSTER_CONNECTION, "cc1");
+
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME,
"3");
+
+ TransportConfiguration acceptor1VM1 = new
TransportConfiguration(INVM_ACCEPTOR_FACTORY, params, "acceptor-cc1");
+ config1.getAcceptorConfigurations().add(acceptor1VM1);
+
+ config1.getConnectorConfigurations().put("local-cc1",
createInVMTransportConnectorConfig(3, "local-cc1"));
+ config1.getConnectorConfigurations().put("local-cc2",
createInVMTransportConnectorConfig(4, "local-cc2"));
+
+ config1.getConnectorConfigurations().put("other-cc1",
createInVMTransportConnectorConfig(1, "other-cc1"));
+ config1.getConnectorConfigurations().put("other-cc2",
createInVMTransportConnectorConfig(2, "other-cc2"));
+
+
+ params = new HashMap<String, Object>();
+ params.put(TransportConstants.CLUSTER_CONNECTION, "cc2");
+
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME,
"4");
+
+ TransportConfiguration acceptor2VM1 = new
TransportConfiguration(INVM_ACCEPTOR_FACTORY, params, "acceptor-cc2");
+ config1.getAcceptorConfigurations().add(acceptor2VM1);
+
+ List<String> connectTo = new ArrayList<String>();
+ connectTo.add("other-cc1");
+
+ ClusterConnectionConfiguration server1CC1 = new
ClusterConnectionConfiguration("cc1",
+ "jms",
+ "local-cc1",
+ 250,
+ true,
+ false,
+ 1,
+ 1024,
+ connectTo,
+ false);
+
+ config1.getClusterConfigurations().add(server1CC1);
+
+ connectTo = new ArrayList<String>();
+ connectTo.add("other-cc2");
+
+ ClusterConnectionConfiguration server1CC2 = new
ClusterConnectionConfiguration("cc2",
+ "jms",
+ "local-cc2",
+ 250,
+ true,
+ false,
+ 1,
+ 1024,
+ connectTo,
+ false);
+
+ config1.getClusterConfigurations().add(server1CC2);
+
+ return createServer(false, config1);
+ }
+
+}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-27
20:06:33 UTC (rev 11433)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-27
20:42:05 UTC (rev 11434)
@@ -52,6 +52,7 @@
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
+import org.hornetq.utils.UUIDGenerator;
/**
*
@@ -149,6 +150,41 @@
}
+ protected void waitForTopology(final HornetQServer server, String
clusterConnectionName, final int nodes, final long timeout) throws Exception
+ {
+ log.debug("waiting for " + nodes + " on the topology for server =
" + server);
+
+ long start = System.currentTimeMillis();
+
+ ClusterConnection clusterConnection =
server.getClusterManager().getClusterConnection(clusterConnectionName);
+
+
+ Topology topology = clusterConnection.getTopology();
+
+ do
+ {
+ if (nodes == topology.getMembers().size())
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < timeout);
+
+ String msg = "Timed out waiting for cluster topology of " + nodes +
+ " (received " +
+ topology.getMembers().size() +
+ ") topology = " +
+ topology +
+ ")";
+
+ log.error(msg);
+
+ throw new Exception(msg);
+ }
+
+
protected static Map<String, Object> generateParams(final int node, final
boolean netty)
{
Map<String, Object> params = new HashMap<String, Object>();
@@ -537,6 +573,17 @@
protected ServerLocator createInVMLocator(final int serverID)
{
+ TransportConfiguration tnspConfig = createInVMTransportConnectorConfig(serverID,
UUIDGenerator.getInstance().generateStringUUID());
+
+ return HornetQClient.createServerLocatorWithHA(tnspConfig);
+ }
+
+ /**
+ * @param serverID
+ * @return
+ */
+ protected TransportConfiguration createInVMTransportConnectorConfig(final int
serverID, String name)
+ {
Map<String, Object> server1Params = new HashMap<String, Object>();
if (serverID != 0)
@@ -544,7 +591,8 @@
server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
}
- return HornetQClient.createServerLocatorWithHA(new
TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params));
+ TransportConfiguration tnspConfig = new
TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params, name);
+ return tnspConfig;
}
protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws
Exception