Author: clebert.suconic(a)jboss.com
Date: 2011-06-28 21:07:24 -0400 (Tue, 28 Jun 2011)
New Revision: 10895
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
Log:
tweaks on clustering
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-29
00:59:58 UTC (rev 10894)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-29
01:07:24 UTC (rev 10895)
@@ -37,6 +37,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
@@ -84,7 +85,7 @@
TransportConstants.DEFAULT_PORT + 8,
TransportConstants.DEFAULT_PORT + 9, };
- private static final long WAIT_TIMEOUT = 5000;
+ private static final long WAIT_TIMEOUT = 10000;
@Override
protected void setUp() throws Exception
@@ -243,7 +244,37 @@
throw new IllegalStateException(msg);
}
+
+ protected void waitForTopology(final HornetQServer server, final int nodes) throws
Exception
+ {
+ log.debug("waiting for " + nodes + " on the topology for server =
" + server);
+
+ long start = System.currentTimeMillis();
+
+ Topology topology = server.getClusterManager().getTopology();
+
+ do
+ {
+ if (nodes == topology.getMembers().size())
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
+
+ String msg = "Timed out waiting for cluster topology of " + nodes +
" (received " + topology.getMembers().size() + ") nodes on server = "
+ server + ")\n Current topology:" + topology.describe();
+
+ ClusterTestBase.log.error(msg);
+
+ throw new Exception (msg);
+
+
+
+ }
+
protected void waitForBindings(final int node,
final String address,
final int expectedBindingCount,
@@ -287,14 +318,6 @@
{
if (binding instanceof LocalQueueBinding && local || binding
instanceof RemoteQueueBinding && !local)
{
- if (local)
- {
- log.debug("found binding " + binding + " on node "
+ server);
- }
- else
- {
- log.debug("found remote binding " + binding + " on node
" + server);
- }
QueueBinding qBinding = (QueueBinding)binding;
bindingCount++;
@@ -1995,7 +2018,7 @@
{
for (int node : nodes)
{
- if (servers[node].isStarted())
+ if (servers[node] != null && servers[node].isStarted())
{
try
{
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-29
00:59:58 UTC (rev 10894)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-29
01:07:24 UTC (rev 10895)
@@ -18,7 +18,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.HornetQServer;
/**
* A SimpleSymmetricClusterTest
@@ -78,7 +77,8 @@
public void tearDown() throws Exception
{
- stopServers(0, 1, 2);
+ log.info("#test tearDown " + loopNumber);
+ stopServers(0, 1, 2, 3, 4);
super.tearDown();
}
@@ -114,8 +114,6 @@
// startServers(3, 4, 5, 0, 1, 2);
startServers(0, 1, 2, 3, 4, 5);
-
- Thread.sleep(1000);
log.info("");
for (int i = 0; i <= 5; i++)
@@ -148,7 +146,7 @@
}
-
+
public void testSimple() throws Exception
{
setupServer(0, true, isNetty());
@@ -157,12 +155,10 @@
setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 2, 0);
- setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 2, 0, 1);
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
startServers(0, 1, 2);
- Thread.sleep(1000);
-
for (int i = 0; i < 10; i++)
log.info("****************************");
for (int i = 0; i <= 2; i++)
@@ -202,7 +198,84 @@
waitForBindings(2, "queues.testaddress", 2, 2, false);
}
+ static int loopNumber;
+ public void _testLoop() throws Throwable
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ loopNumber = i;
+ log.info("#test " + i);
+ testSimple2();
+ if (i + 1 < 1000)
+ {
+ tearDown();
+ setUp();
+ }
+ }
+ }
+
+
+
+ public void testSimple2() throws Exception
+ {
+ setupServer(0, true, isNetty());
+ setupServer(1, true, isNetty());
+ setupServer(2, true, isNetty());
+ setupServer(3, true, isNetty());
+ setupServer(4, true, isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2, 3, 4);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2, 3, 4);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1, 3, 4);
+
+ setupClusterConnection("cluster3", "queues", false, 1,
isNetty(), 3, 0, 1, 2, 4);
+
+ setupClusterConnection("cluster4", "queues", false, 1,
isNetty(), 4, 0, 1, 2, 3);
+
+ startServers(0, 1, 2, 3, 4);
+
+ for (int i = 0 ; i <= 4; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
+
+ log.info("All the servers have been started already!");
+
+ for (int i = 0; i <= 4; i++)
+ {
+ log.info("*************************************\n " + servers[i] +
+ " topology:\n" +
+ servers[i].getClusterManager().getTopology().describe());
+ }
+
+ for (int i = 0; i <= 4; i++)
+ {
+ setupSessionFactory(i, isNetty());
+ }
+
+ for (int i = 0 ; i <= 4; i++)
+ {
+ createQueue(i, "queues.testaddress", "queue0", null,
false);
+ }
+
+ for (int i = 0 ; i <= 4; i++)
+ {
+ addConsumer(i, i, "queue0", null);
+ }
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+
+ }
+
public void testSimpleRoundRobbin() throws Exception
{
setupServer(0, true, isNetty());
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-06-29
00:59:58 UTC (rev 10894)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-06-29
01:07:24 UTC (rev 10895)
@@ -42,6 +42,7 @@
@Override
protected void tearDown() throws Exception
{
+ log.info("#test tearDown");
stopServers();
super.tearDown();
@@ -51,6 +52,20 @@
{
return false;
}
+
+ public void _testLoop() throws Throwable
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ log.info("#test " + i);
+ testStopAllStartAll();
+ if (i + 1 < 1000)
+ {
+ tearDown();
+ setUp();
+ }
+ }
+ }
public void testStopAllStartAll() throws Throwable
{