Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 15:24:25 -0400 (Tue, 21 Jun 2011)
New Revision: 10872
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
Log:
Fixing bridge
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21
18:11:11 UTC (rev 10871)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21
19:24:25 UTC (rev 10872)
@@ -746,7 +746,7 @@
return;
}
- long timeout = (long)(this.retryCount * this.retryMultiplier *
this.retryMultiplier);
+ long timeout = (long)(this.retryInterval * Math.pow(this.retryMultiplier,
retryCount));
if (timeout == 0)
{
timeout = this.retryInterval;
@@ -755,6 +755,8 @@
{
timeout = maxRetryInterval;
}
+
+ log.debug("Bridge " + this + " retrying connection #" +
retryCount + ", maxRetry=" + reconnectAttemptsInUse + ", timeout=" +
timeout);
scheduleRetryConnectFixedTimeout(timeout);
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-21
18:11:11 UTC (rev 10871)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-21
19:24:25 UTC (rev 10872)
@@ -265,6 +265,7 @@
@Override
protected void afterConnect() throws Exception
{
+ super.afterConnect();
System.out.println("afterConnect");
setupNotificationConsumer();
}
@@ -275,7 +276,7 @@
super.stop();
}
- protected void failed(final boolean permanently)
+ protected void fail(final boolean permanently)
{
log.debug("Cluster Bridge " + this.getName() + " failed,
permanently=" + permanently);
super.fail(permanently);
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21
18:11:11 UTC (rev 10871)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21
19:24:25 UTC (rev 10872)
@@ -506,7 +506,8 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "receiving nodeUP for nodeID=" + nodeID +
+ String ClusterTestBase = "receiving nodeUP for nodeID=";
+ log.debug(this + ClusterTestBase + nodeID +
" connectionPair=" + connectorPair, new Exception
("trace"));
}
// discard notifications about ourselves unless its from our backup
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21
18:11:11 UTC (rev 10871)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21
19:24:25 UTC (rev 10872)
@@ -40,6 +40,7 @@
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -1786,6 +1787,52 @@
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
+ protected void setupClusterConnection(final String name,
+ final String address,
+ final boolean forwardWhenNoConsumers,
+ final int maxHops,
+ final int reconnectAttempts,
+ final long retryInterval,
+ final boolean netty,
+ final int nodeFrom,
+ final int... nodesTo)
+ {
+ HornetQServer serverFrom = servers[nodeFrom];
+
+ if (serverFrom == null)
+ {
+ throw new IllegalStateException("No server at node " + nodeFrom);
+ }
+
+ TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
+
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(),
connectorFrom);
+
+ List<String> pairs = new ArrayList<String>();
+ for (int element : nodesTo)
+ {
+ TransportConfiguration serverTotc = createTransportConfiguration(netty, false,
generateParams(element, netty));
+
serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(),
serverTotc);
+ pairs.add(serverTotc.getName());
+ }
+ ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
+ address,
+ connectorFrom.getName(),
+ ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+ ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+ retryInterval,
+ ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+ ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+ reconnectAttempts,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ 1024,
+ pairs,
+ false);
+
+ serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
+ }
+
/**
* @param name
* @param address
@@ -1796,23 +1843,23 @@
* @return
*/
protected ClusterConnectionConfiguration createClusterConfig(final String name,
- final String address,
- final boolean
forwardWhenNoConsumers,
- final int maxHops,
- TransportConfiguration
connectorFrom,
- List<String> pairs)
- {
- ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
-
address,
-
connectorFrom.getName(),
-
250,
-
true,
-
forwardWhenNoConsumers,
-
maxHops,
-
1024,
-
pairs, false);
- return clusterConf;
- }
+ final String address,
+ final boolean
forwardWhenNoConsumers,
+ final int maxHops,
+ TransportConfiguration
connectorFrom,
+ List<String>
pairs)
+ {
+ ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
+
address,
+
connectorFrom.getName(),
+
250,
+
true,
+
forwardWhenNoConsumers,
+
maxHops,
+
1024,
+
pairs, false);
+ return clusterConf;
+ }
protected void setupClusterConnectionWithBackups(final String name,
final String address,
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-21
18:11:11 UTC (rev 10871)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-21
19:24:25 UTC (rev 10872)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
/**
* A SimpleSymmetricClusterTest
@@ -32,6 +33,7 @@
// Constants -----------------------------------------------------
static final Logger log = Logger.getLogger(SimpleSymmetricClusterTest.class);
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -39,13 +41,12 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
- public void setUp() throws Exception
+
+ public void setUp() throws Exception
{
super.setUp();
}
-
-
+
/**
* @param name
* @param address
@@ -56,11 +57,11 @@
* @return
*/
protected ClusterConnectionConfiguration createClusterConfig(final String name,
- final String address,
- final boolean
forwardWhenNoConsumers,
- final int maxHops,
- TransportConfiguration
connectorFrom,
- List<String> pairs)
+ final String address,
+ final boolean
forwardWhenNoConsumers,
+ final int maxHops,
+ TransportConfiguration
connectorFrom,
+ List<String>
pairs)
{
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
address,
@@ -70,61 +71,65 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, false);
+
pairs,
+
false);
return clusterConf;
}
-
public void tearDown() throws Exception
{
stopServers(0, 1, 2);
super.tearDown();
}
-
+
public boolean isNetty()
{
return false;
}
-
+
public void testSimple() throws Exception
{
setupServer(0, true, isNetty());
setupServer(1, true, isNetty());
setupServer(2, true, isNetty());
-
+
setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 2, 0);
setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 2, 0, 1);
-
+
startServers(0, 1, 2);
-
+
Thread.sleep(1000);
-
- for (int i = 0; i < 10; i++)
log.info("****************************");
+
+ for (int i = 0; i < 10; i++)
+ log.info("****************************");
for (int i = 0; i <= 2; i++)
{
- log.info("*************************************\n " + servers[i] +
" topology:\n" + servers[i].getClusterManager().getTopology().describe());
+ log.info("*************************************\n " + servers[i] +
+ " topology:\n" +
+ servers[i].getClusterManager().getTopology().describe());
}
- for (int i = 0; i < 10; i++)
log.info("****************************");
+ for (int i = 0; i < 10; i++)
+ log.info("****************************");
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
-
- //Thread.sleep(1500);
-
+
+ // Thread.sleep(1500);
+
createQueue(0, "queues.testaddress", "queue0", null, false);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
createQueue(1, "queues.testaddress", "queue0", null, false);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
createQueue(2, "queues.testaddress", "queue0", null, false);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
addConsumer(0, 0, "queue0", null);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
addConsumer(1, 1, "queue0", null);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
addConsumer(2, 2, "queue0", null);
- //Thread.sleep(1500);
+ // Thread.sleep(1500);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -136,6 +141,85 @@
}
+ public void testSimpleRoundRobbin() throws Exception
+ {
+ setupServer(0, true, isNetty());
+ setupServer(1, true, isNetty());
+ setupServer(2, true, isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, 10, 100,
isNetty(), 0, 1, 2);
+ setupClusterConnection("cluster1", "queues", false, 1, 10, 100,
isNetty(), 1, 2, 0);
+ setupClusterConnection("cluster1", "queues", false, 1, 10, 100,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ // Thread.sleep(1500);
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+
+ addConsumer(0, 0, "queue0", null);
+ // Thread.sleep(1500);
+ addConsumer(1, 1, "queue0", null);
+ // Thread.sleep(1500);
+ addConsumer(2, 2, "queue0", null);
+ // Thread.sleep(1500);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 33, true, null);
+
+ verifyReceiveRoundRobin(33, 0, 1, 2);
+
+ stopServers(2);
+
+ Thread.sleep(5000);
+
+ send(0, "queues.testaddress", 100, true, null);
+
+ verifyReceiveRoundRobin(100, 0, 1);
+
+ sfs[2] = null;
+ consumers[2] = null;
+
+
+ startServers(2);
+
+ setupSessionFactory(2, isNetty());
+
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 33, true, null);
+
+ verifyReceiveRoundRobin(33, 0, 1, 2);
+
+
+
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------