[hornetq-commits] JBoss hornetq SVN: r10872 - in branches/Branch_2_2_EAP_cluster_clean2: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 21 15:24:26 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-21 15:24:25 -0400 (Tue, 21 Jun 2011)
New Revision: 10872

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
Log:
Fixing bridge

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-21 19:24:25 UTC (rev 10872)
@@ -746,7 +746,7 @@
          return;
       }
 
-      long timeout = (long)(this.retryCount * this.retryMultiplier * this.retryMultiplier);
+      long timeout = (long)(this.retryInterval * Math.pow(this.retryMultiplier, retryCount));
       if (timeout == 0)
       {
          timeout = this.retryInterval;
@@ -755,6 +755,8 @@
       {
          timeout = maxRetryInterval;
       }
+      
+      log.debug("Bridge " + this + " retrying connection #" + retryCount + ", maxRetry=" + reconnectAttemptsInUse + ", timeout=" + timeout);
 
       scheduleRetryConnectFixedTimeout(timeout);
    }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-06-21 19:24:25 UTC (rev 10872)
@@ -265,6 +265,7 @@
    @Override
    protected void afterConnect() throws Exception
    {
+      super.afterConnect();
       System.out.println("afterConnect");
       setupNotificationConsumer();
    }
@@ -275,7 +276,7 @@
       super.stop();
    }
 
-   protected void failed(final boolean permanently)
+   protected void fail(final boolean permanently)
    {
       log.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
       super.fail(permanently);

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-21 19:24:25 UTC (rev 10872)
@@ -506,7 +506,8 @@
    {
       if (log.isDebugEnabled())
       {
-         log.debug(this + "receiving nodeUP for nodeID=" + nodeID + 
+         String ClusterTestBase = "receiving nodeUP for nodeID=";
+         log.debug(this + ClusterTestBase + nodeID + 
                    " connectionPair=" + connectorPair, new Exception ("trace"));
       }
       // discard notifications about ourselves unless its from our backup

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-21 19:24:25 UTC (rev 10872)
@@ -40,6 +40,7 @@
 import org.hornetq.core.config.BroadcastGroupConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -1786,6 +1787,52 @@
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
 
+   protected void setupClusterConnection(final String name,
+                                         final String address,
+                                         final boolean forwardWhenNoConsumers,
+                                         final int maxHops,
+                                         final int reconnectAttempts,
+                                         final long retryInterval,
+                                         final boolean netty,
+                                         final int nodeFrom,
+                                         final int... nodesTo)
+   {
+      HornetQServer serverFrom = servers[nodeFrom];
+
+      if (serverFrom == null)
+      {
+         throw new IllegalStateException("No server at node " + nodeFrom);
+      }
+
+      TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+      serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
+      
+      List<String> pairs = new ArrayList<String>();
+      for (int element : nodesTo)
+      {
+         TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
+         serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
+         pairs.add(serverTotc.getName());
+      }
+      ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+           address,
+           connectorFrom.getName(),
+           ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+           ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+           retryInterval,
+           ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+           ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+           reconnectAttempts,
+           true,
+           forwardWhenNoConsumers,
+           maxHops,
+           1024,
+           pairs,
+           false);
+
+      serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
+   }
+
    /**
     * @param name
     * @param address
@@ -1796,23 +1843,23 @@
     * @return
     */
    protected ClusterConnectionConfiguration createClusterConfig(final String name,
-                                                              final String address,
-                                                              final boolean forwardWhenNoConsumers,
-                                                              final int maxHops,
-                                                              TransportConfiguration connectorFrom,
-                                                              List<String> pairs)
-   {
-      ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-                                                                                      address,
-                                                                                      connectorFrom.getName(),
-                                                                                      250,
-                                                                                      true,
-                                                                                      forwardWhenNoConsumers,
-                                                                                      maxHops,
-                                                                                      1024,
-                                                                                      pairs, false);
-      return clusterConf;
-   }
+                                                                final String address,
+                                                                final boolean forwardWhenNoConsumers,
+                                                                final int maxHops,
+                                                                TransportConfiguration connectorFrom,
+                                                                List<String> pairs)
+     {
+        ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+                                                                                        address,
+                                                                                        connectorFrom.getName(),
+                                                                                        250,
+                                                                                        true,
+                                                                                        forwardWhenNoConsumers,
+                                                                                        maxHops,
+                                                                                        1024,
+                                                                                        pairs, false);
+        return clusterConf;
+     }
 
    protected void setupClusterConnectionWithBackups(final String name,
                                                     final String address,

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-06-21 18:11:11 UTC (rev 10871)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-06-21 19:24:25 UTC (rev 10872)
@@ -18,6 +18,7 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
 
 /**
  * A SimpleSymmetricClusterTest
@@ -32,6 +33,7 @@
    // Constants -----------------------------------------------------
 
    static final Logger log = Logger.getLogger(SimpleSymmetricClusterTest.class);
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
@@ -39,13 +41,12 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-   
-   public void setUp() throws Exception 
+
+   public void setUp() throws Exception
    {
       super.setUp();
    }
-   
-   
+
    /**
     * @param name
     * @param address
@@ -56,11 +57,11 @@
     * @return
     */
    protected ClusterConnectionConfiguration createClusterConfig(final String name,
-                                                              final String address,
-                                                              final boolean forwardWhenNoConsumers,
-                                                              final int maxHops,
-                                                              TransportConfiguration connectorFrom,
-                                                              List<String> pairs)
+                                                                final String address,
+                                                                final boolean forwardWhenNoConsumers,
+                                                                final int maxHops,
+                                                                TransportConfiguration connectorFrom,
+                                                                List<String> pairs)
    {
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
                                                                                       address,
@@ -70,61 +71,65 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, false);
+                                                                                      pairs,
+                                                                                      false);
       return clusterConf;
    }
 
-
    public void tearDown() throws Exception
    {
       stopServers(0, 1, 2);
       super.tearDown();
    }
-   
+
    public boolean isNetty()
    {
       return false;
    }
-   
+
    public void testSimple() throws Exception
    {
       setupServer(0, true, isNetty());
       setupServer(1, true, isNetty());
       setupServer(2, true, isNetty());
-      
+
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 2, 0);
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 2, 0, 1);
- 
+
       startServers(0, 1, 2);
-      
+
       Thread.sleep(1000);
-      
-      for (int i = 0; i < 10; i++) log.info("****************************");
+
+      for (int i = 0; i < 10; i++)
+         log.info("****************************");
       for (int i = 0; i <= 2; i++)
       {
-         log.info("*************************************\n " + servers[i] + " topology:\n" + servers[i].getClusterManager().getTopology().describe());
+         log.info("*************************************\n " + servers[i] +
+                  " topology:\n" +
+                  servers[i].getClusterManager().getTopology().describe());
       }
-      for (int i = 0; i < 10; i++) log.info("****************************");
+      for (int i = 0; i < 10; i++)
+         log.info("****************************");
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
-      
-      //Thread.sleep(1500);
-      
+
+      // Thread.sleep(1500);
+
       createQueue(0, "queues.testaddress", "queue0", null, false);
-      //Thread.sleep(1500);
+      // Thread.sleep(1500);
       createQueue(1, "queues.testaddress", "queue0", null, false);
-      //Thread.sleep(1500);
+      // Thread.sleep(1500);
       createQueue(2, "queues.testaddress", "queue0", null, false);
-      //Thread.sleep(1500);
+      // Thread.sleep(1500);
 
       addConsumer(0, 0, "queue0", null);
-      //Thread.sleep(1500);
+      // Thread.sleep(1500);
       addConsumer(1, 1, "queue0", null);
-      //Thread.sleep(1500);
+      // Thread.sleep(1500);
       addConsumer(2, 2, "queue0", null);
-      //Thread.sleep(1500);
+      // Thread.sleep(1500);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -136,6 +141,85 @@
 
    }
 
+   public void testSimpleRoundRobbin() throws Exception
+   {
+      setupServer(0, true, isNetty());
+      setupServer(1, true, isNetty());
+      setupServer(2, true, isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, 10, 100, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster1", "queues", false, 1, 10, 100, isNetty(), 1, 2, 0);
+      setupClusterConnection("cluster1", "queues", false, 1, 10, 100, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      // Thread.sleep(1500);
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      // Thread.sleep(1500);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      // Thread.sleep(1500);
+      createQueue(2, "queues.testaddress", "queue0", null, true);
+      // Thread.sleep(1500);
+
+      addConsumer(0, 0, "queue0", null);
+      // Thread.sleep(1500);
+      addConsumer(1, 1, "queue0", null);
+      // Thread.sleep(1500);
+      addConsumer(2, 2, "queue0", null);
+      // Thread.sleep(1500);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+      send(0, "queues.testaddress", 33, true, null);
+
+      verifyReceiveRoundRobin(33, 0, 1, 2);
+
+      stopServers(2);
+
+      Thread.sleep(5000);
+
+      send(0, "queues.testaddress", 100, true, null);
+
+      verifyReceiveRoundRobin(100, 0, 1);
+      
+      sfs[2] = null;
+      consumers[2] = null;
+      
+      
+      startServers(2);
+      
+      setupSessionFactory(2, isNetty());
+
+      addConsumer(2, 2, "queue0", null);
+      
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+      send(0, "queues.testaddress", 33, true, null);
+
+      verifyReceiveRoundRobin(33, 0, 1, 2);
+
+      
+      
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list