Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 18:43:06 -0400 (Tue, 21 Jun 2011)
New Revision: 10876
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
Log:
Tweaks
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21
21:57:15 UTC (rev 10875)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21
22:43:06 UTC (rev 10876)
@@ -549,7 +549,7 @@
public void beforeReconnect(final HornetQException exception)
{
log.warn(name + "::Connection failed before reconnect ", exception);
- fail(true);
+ fail(false);
}
// Package protected ---------------------------------------------
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21
21:57:15 UTC (rev 10875)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21
22:43:06 UTC (rev 10876)
@@ -508,7 +508,7 @@
{
String ClusterTestBase = "receiving nodeUP for nodeID=";
log.debug(this + ClusterTestBase + nodeID +
- " connectionPair=" + connectorPair, new Exception
("trace"));
+ " connectionPair=" + connectorPair);
}
// discard notifications about ourselves unless its from our backup
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21
21:57:15 UTC (rev 10875)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-21
22:43:06 UTC (rev 10876)
@@ -312,7 +312,7 @@
}
while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
- String msg = "Timed out waiting for bindings (bindingCount = " +
bindingCount + " (expecting " + expectedConsumerCount + ") "+
+ String msg = "Timed out waiting for bindings (bindingCount = " +
bindingCount + " (expecting " + expectedBindingCount + ") "+
", totConsumers = " +
totConsumers + " (expecting " + expectedConsumerCount +
")" +
")";
@@ -951,21 +951,32 @@
for (int i = 0; i < numMessages; i++)
{
- ConsumerHolder holder = consumers[consumerIDs[count]];
+ // We may use a negative number in some tests to ignore the consumer, case we
know the server is down
+ if (consumerIDs[count] >= 0)
+ {
+ ConsumerHolder holder = consumers[consumerIDs[count]];
+
+ if (holder == null)
+ {
+ throw new IllegalArgumentException("No consumer at " +
consumerIDs[i]);
+ }
+
+ ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
+
+ message.acknowledge();
+
+ consumers[consumerIDs[count]].session.commit();
+
+ System.out.println("Msg: " + message);
+
+ Assert.assertNotNull("consumer " + consumerIDs[count] + " did
not receive message " + i, message);
+
+ Assert.assertEquals("consumer " + consumerIDs[count] + "
message " + i,
+ i,
+ message.getObjectProperty(ClusterTestBase.COUNT_PROP));
- if (holder == null)
- {
- throw new IllegalArgumentException("No consumer at " +
consumerIDs[i]);
}
-
- ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
- Assert.assertNotNull("consumer " + consumerIDs[count] + " did not
receive message " + i, message);
-
- Assert.assertEquals("consumer " + consumerIDs[count] + " message
" + i,
- i,
- message.getObjectProperty(ClusterTestBase.COUNT_PROP));
-
+
count++;
if (count == consumerIDs.length)
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-21
21:57:15 UTC (rev 10875)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-21
22:43:06 UTC (rev 10876)
@@ -140,7 +140,7 @@
waitForBindings(2, "queues.testaddress", 2, 2, false);
}
-
+
public void testSimpleRoundRobbin() throws Exception
{
setupServer(0, true, isNetty());
@@ -187,8 +187,11 @@
stopServers(2);
- Thread.sleep(5000);
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+
send(0, "queues.testaddress", 100, true, null);
verifyReceiveRoundRobin(100, 0, 1);
@@ -220,6 +223,90 @@
}
+
+ public void testSimpleRoundRobbinNoFailure() throws Exception
+ {
+ setupServer(0, true, isNetty());
+ setupServer(1, true, isNetty());
+ setupServer(2, true, isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, -1,
1000, isNetty(), 0, 1, 2);
+ setupClusterConnection("cluster1", "queues", false, 1, -1,
1000, isNetty(), 1, 2, 0);
+ setupClusterConnection("cluster1", "queues", false, 1, -1,
1000, isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ // Thread.sleep(1500);
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+ // Thread.sleep(1500);
+
+ addConsumer(0, 0, "queue0", null);
+ // Thread.sleep(1500);
+ addConsumer(1, 1, "queue0", null);
+ // Thread.sleep(1500);
+ addConsumer(2, 2, "queue0", null);
+ // Thread.sleep(1500);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 33, true, null);
+
+ verifyReceiveRoundRobin(33, 0, 1, 2);
+
+ Thread.sleep(1000);
+
+ // TODO: need to make sure the shutdown won't be send, what will affect the
test
+ stopServers(2);
+//
+// Thread.sleep(5000);
+//
+// waitForBindings(0, "queues.testaddress", 2, 2, false);
+// waitForBindings(1, "queues.testaddress", 2, 2, false);
+
+
+ send(0, "queues.testaddress", 100, true, null);
+
+ verifyReceiveRoundRobin(100, 0, 1, -1);
+
+ sfs[2] = null;
+ consumers[2] = null;
+
+ startServers(2);
+
+ setupSessionFactory(2, isNetty());
+
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ verifyReceiveRoundRobin(100, -1, -1, 2);
+
+
+
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------