[hornetq-commits] JBoss hornetq SVN: r11434 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/remoting/impl/netty and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 27 16:42:06 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-27 16:42:05 -0400 (Tue, 27 Sep 2011)
New Revision: 11434

Added:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/TransportConstants.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
Parameterizing the cluster-connection on the acceptors

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/TransportConstants.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/TransportConstants.java	2011-09-27 20:06:33 UTC (rev 11433)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/TransportConstants.java	2011-09-27 20:42:05 UTC (rev 11434)
@@ -34,12 +34,13 @@
    {
       Set<String> allowableAcceptorKeys = new HashSet<String>();
       allowableAcceptorKeys.add(TransportConstants.SERVER_ID_PROP_NAME);
+      allowableAcceptorKeys.add(org.hornetq.core.remoting.impl.netty.TransportConstants.CLUSTER_CONNECTION);
 
-      ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
-
+      ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
+      
       Set<String> allowableConnectorKeys = new HashSet<String>();
       allowableConnectorKeys.add(TransportConstants.SERVER_ID_PROP_NAME);
 
-      ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
+      ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
    }
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java	2011-09-27 20:06:33 UTC (rev 11433)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/TransportConstants.java	2011-09-27 20:42:05 UTC (rev 11434)
@@ -74,6 +74,8 @@
    
    public static final String DIRECT_DELIVER = "direct-deliver";
    
+   public static final String CLUSTER_CONNECTION = "cluster-connection";
+   
    public static final boolean DEFAULT_SSL_ENABLED = false;
 
    public static final boolean DEFAULT_USE_NIO_SERVER = false;
@@ -151,6 +153,7 @@
       allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
       allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
+      allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
 
       ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-09-27 20:06:33 UTC (rev 11433)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-09-27 20:42:05 UTC (rev 11434)
@@ -39,6 +39,7 @@
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.impl.ServerSessionImpl;
 import org.hornetq.core.server.management.ManagementService;
@@ -207,9 +208,10 @@
             ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
 
             ProtocolManager manager = protocolMap.get(protocol);
-
-            // TODO: parameterize the cluster connection
-            Acceptor acceptor = factory.createAcceptor(clusterManager.getDefaultConnection(),
+            
+            ClusterConnection clusterConnection = lookupClusterConnection(info);
+            
+            Acceptor acceptor = factory.createAcceptor(clusterConnection,
                                                        info.getParams(),
                                                        new DelegatingBufferHandler(),
                                                        manager,
@@ -480,6 +482,24 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
+   
+   private ClusterConnection lookupClusterConnection(TransportConfiguration config)
+   {
+      String clusterConnectionName = (String)config.getParams().get(org.hornetq.core.remoting.impl.netty.TransportConstants.CLUSTER_CONNECTION);
+      
+      ClusterConnection clusterConnection = null;
+      if (clusterConnectionName != null)
+      {
+         clusterConnection = clusterManager.getClusterConnection(clusterConnectionName);
+      }
+      
+      if (clusterConnection == null)
+      {
+         clusterConnection = clusterManager.getDefaultConnection();
+      }
+      
+      return clusterConnection;
+   }
 
    // Inner classes -------------------------------------------------
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-09-27 20:06:33 UTC (rev 11433)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-09-27 20:42:05 UTC (rev 11434)
@@ -40,7 +40,7 @@
     */
    ClusterConnection getDefaultConnection();
 
-   ClusterConnection getClusterConnection(SimpleString name);
+   ClusterConnection getClusterConnection(String name);
 
    Set<BroadcastGroup> getBroadcastGroups();
    

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-09-27 20:06:33 UTC (rev 11433)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-09-27 20:42:05 UTC (rev 11434)
@@ -57,6 +57,7 @@
  * A ClusterManagerImpl
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author Clebert Suconic
  * 
  * Created 18 Nov 2008 09:23:49
  *
@@ -302,9 +303,9 @@
       return new HashSet<BroadcastGroup>(broadcastGroups.values());
    }
 
-   public ClusterConnection getClusterConnection(final SimpleString name)
+   public ClusterConnection getClusterConnection(final String name)
    {
-      return clusterConnections.get(name.toString());
+      return clusterConnections.get(name);
    }
 
    // backup node becomes live

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java	2011-09-27 20:42:05 UTC (rev 11434)
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A IsolatedTopologyTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class IsolatedTopologyTest extends ServiceTestBase
+{
+   
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+   
+   
+   public void testIsolatedClusters() throws Exception
+   {
+      
+      HornetQServer server1 = createServer1();
+
+      HornetQServer server2 = createServer2();
+      
+      try
+      {
+         server1.start();
+         Thread.sleep(500);
+         server2.start();
+         
+         Thread.sleep(2000);
+         
+         System.out.println(server1.getClusterManager().getClusterConnection("cc1").getTopology().describe());
+         
+         System.out.println(server1.getClusterManager().getClusterConnection("cc2").getTopology().describe());
+         
+         System.out.println(server2.getClusterManager().getClusterConnection("cc1").getTopology().describe());
+         
+         System.out.println(server2.getClusterManager().getClusterConnection("cc2").getTopology().describe());
+         
+         waitForTopology(server1, "cc1", 2, 5000);
+         
+         waitForTopology(server1, "cc2", 2, 5000);
+         
+         waitForTopology(server2, "cc1", 2, 5000);
+         
+         waitForTopology(server2, "cc2", 2, 5000);
+      }
+      finally
+      {
+         try
+         {
+            server1.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+         
+         try
+         {
+            server2.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+
+   private HornetQServer createServer1()
+   {
+      // Server1 with two acceptors, each acceptor on a different cluster connection
+      // talking to a different connector.
+      // i.e. two cluster connections isolated on the same node
+      Configuration config1 = createBasicConfig(0);
+      
+      config1.setClustered(true);
+      
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.CLUSTER_CONNECTION, "cc1");
+      params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, "1");
+      
+      TransportConfiguration acceptor1VM1 = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params, "acceptor-cc1");
+      config1.getAcceptorConfigurations().add(acceptor1VM1);
+      
+      config1.getConnectorConfigurations().put("local-cc1", createInVMTransportConnectorConfig(1, "local-cc1"));
+      config1.getConnectorConfigurations().put("local-cc2", createInVMTransportConnectorConfig(2, "local-cc2"));
+
+      config1.getConnectorConfigurations().put("other-cc1", createInVMTransportConnectorConfig(3, "other-cc1"));
+      config1.getConnectorConfigurations().put("other-cc2", createInVMTransportConnectorConfig(4, "other-cc2"));
+
+      
+      params = new HashMap<String, Object>();
+      params.put(TransportConstants.CLUSTER_CONNECTION, "cc2");
+      params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, "2");
+      
+      TransportConfiguration acceptor2VM1 = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params, "acceptor-cc2");
+      config1.getAcceptorConfigurations().add(acceptor2VM1);
+      
+      List<String> connectTo = new ArrayList<String>();
+      connectTo.add("other-cc1");
+      
+      ClusterConnectionConfiguration server1CC1 = new ClusterConnectionConfiguration("cc1",
+                                     "jms",
+                                     "local-cc1",
+                                     250,
+                                     true,
+                                     false,
+                                     1,
+                                     1024,
+                                     connectTo,
+                                     false);
+      
+      config1.getClusterConfigurations().add(server1CC1);
+      
+      connectTo = new ArrayList<String>();
+      connectTo.add("other-cc2");
+      
+      ClusterConnectionConfiguration server1CC2 = new ClusterConnectionConfiguration("cc2",
+                                     "jms",
+                                     "local-cc2",
+                                     250,
+                                     true,
+                                     false,
+                                     1,
+                                     1024,
+                                     connectTo,
+                                     false);
+      
+      config1.getClusterConfigurations().add(server1CC2);
+      
+      return createServer(false, config1);
+   }
+
+
+   private HornetQServer createServer2()
+   {
+      // Server1 with two acceptors, each acceptor on a different cluster connection
+      // talking to a different connector.
+      // i.e. two cluster connections isolated on the same node
+      Configuration config1 = createBasicConfig(3);
+      
+      config1.setClustered(true);
+      
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.CLUSTER_CONNECTION, "cc1");
+      params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, "3");
+      
+      TransportConfiguration acceptor1VM1 = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params, "acceptor-cc1");
+      config1.getAcceptorConfigurations().add(acceptor1VM1);
+      
+      config1.getConnectorConfigurations().put("local-cc1", createInVMTransportConnectorConfig(3, "local-cc1"));
+      config1.getConnectorConfigurations().put("local-cc2", createInVMTransportConnectorConfig(4, "local-cc2"));
+
+      config1.getConnectorConfigurations().put("other-cc1", createInVMTransportConnectorConfig(1, "other-cc1"));
+      config1.getConnectorConfigurations().put("other-cc2", createInVMTransportConnectorConfig(2, "other-cc2"));
+
+      
+      params = new HashMap<String, Object>();
+      params.put(TransportConstants.CLUSTER_CONNECTION, "cc2");
+      params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, "4");
+      
+      TransportConfiguration acceptor2VM1 = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params, "acceptor-cc2");
+      config1.getAcceptorConfigurations().add(acceptor2VM1);
+      
+      List<String> connectTo = new ArrayList<String>();
+      connectTo.add("other-cc1");
+      
+      ClusterConnectionConfiguration server1CC1 = new ClusterConnectionConfiguration("cc1",
+                                     "jms",
+                                     "local-cc1",
+                                     250,
+                                     true,
+                                     false,
+                                     1,
+                                     1024,
+                                     connectTo,
+                                     false);
+      
+      config1.getClusterConfigurations().add(server1CC1);
+      
+      connectTo = new ArrayList<String>();
+      connectTo.add("other-cc2");
+      
+      ClusterConnectionConfiguration server1CC2 = new ClusterConnectionConfiguration("cc2",
+                                     "jms",
+                                     "local-cc2",
+                                     250,
+                                     true,
+                                     false,
+                                     1,
+                                     1024,
+                                     connectTo,
+                                     false);
+      
+      config1.getClusterConfigurations().add(server1CC2);
+      
+      return createServer(false, config1);
+   }
+
+}

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-09-27 20:06:33 UTC (rev 11433)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-09-27 20:42:05 UTC (rev 11434)
@@ -52,6 +52,7 @@
 import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.spi.core.security.HornetQSecurityManager;
 import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
+import org.hornetq.utils.UUIDGenerator;
 
 /**
  * 
@@ -149,6 +150,41 @@
    }
 
 
+   protected void waitForTopology(final HornetQServer server, String clusterConnectionName, final int nodes, final long timeout) throws Exception
+   {
+      log.debug("waiting for " + nodes + " on the topology for server = " + server);
+
+      long start = System.currentTimeMillis();
+      
+      ClusterConnection clusterConnection = server.getClusterManager().getClusterConnection(clusterConnectionName);
+
+      
+      Topology topology = clusterConnection.getTopology();
+
+      do
+      {
+         if (nodes == topology.getMembers().size())
+         {
+            return;
+         }
+
+         Thread.sleep(10);
+      }
+      while (System.currentTimeMillis() - start < timeout);
+
+      String msg = "Timed out waiting for cluster topology of " + nodes +
+                   " (received " +
+                   topology.getMembers().size() +
+                   ") topology = " +
+                   topology +
+                   ")";
+
+      log.error(msg);
+
+      throw new Exception(msg);
+   }
+
+
    protected static Map<String, Object> generateParams(final int node, final boolean netty)
    {
       Map<String, Object> params = new HashMap<String, Object>();
@@ -537,6 +573,17 @@
    
    protected ServerLocator createInVMLocator(final int serverID)
    {
+      TransportConfiguration tnspConfig = createInVMTransportConnectorConfig(serverID, UUIDGenerator.getInstance().generateStringUUID());
+      
+      return HornetQClient.createServerLocatorWithHA(tnspConfig);
+   }
+
+   /**
+    * @param serverID
+    * @return
+    */
+   protected TransportConfiguration createInVMTransportConnectorConfig(final int serverID, String name)
+   {
       Map<String, Object> server1Params = new HashMap<String, Object>();
 
       if (serverID != 0)
@@ -544,7 +591,8 @@
          server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
       }
 
-      return HornetQClient.createServerLocatorWithHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params));
+      TransportConfiguration tnspConfig = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params, name);
+      return tnspConfig;
    }
  
    protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws Exception



More information about the hornetq-commits mailing list