Author: clebert.suconic(a)jboss.com
Date: 2011-06-18 00:40:18 -0400 (Sat, 18 Jun 2011)
New Revision: 10852
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
Log:
tweaks
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-17
16:47:24 UTC (rev 10851)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-18
04:40:18 UTC (rev 10852)
@@ -1321,9 +1321,15 @@
private void forceReturnChannel1()
{
- Channel channel1 = connection.getChannel(1, -1);
-
- channel1.returnBlocking();
+ if (connection != null)
+ {
+ Channel channel1 = connection.getChannel(1, -1);
+
+ if (channel1 != null)
+ {
+ channel1.returnBlocking();
+ }
+ }
}
private void checkTransportKeys(final ConnectorFactory factory, final Map<String,
Object> params)
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-17
16:47:24 UTC (rev 10851)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-18
04:40:18 UTC (rev 10852)
@@ -244,17 +244,17 @@
protected void waitForBindings(final int node,
final String address,
- final int count,
- final int consumerCount,
+ final int expectedBindingCount,
+ final int expectedConsumerCount,
final boolean local) throws Exception
{
log.debug("waiting for bindings on node " + node +
" address " +
address +
- " count " +
- count +
+ " expectedBindingCount " +
+ expectedBindingCount +
" consumerCount " +
- consumerCount +
+ expectedConsumerCount +
" local " +
local);
@@ -285,6 +285,14 @@
{
if (binding instanceof LocalQueueBinding && local || binding
instanceof RemoteQueueBinding && !local)
{
+ if (local)
+ {
+ log.debug("found binding " + binding + " on node "
+ server);
+ }
+ else
+ {
+ log.debug("found remote binding " + binding + " on node
" + server);
+ }
QueueBinding qBinding = (QueueBinding)binding;
bindingCount++;
@@ -293,7 +301,7 @@
}
}
- if (bindingCount == count && totConsumers == consumerCount)
+ if (bindingCount == expectedBindingCount && totConsumers ==
expectedConsumerCount)
{
return;
}
@@ -302,9 +310,9 @@
}
while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
- String msg = "Timed out waiting for bindings (bindingCount = " +
bindingCount + " (expecting " + count + ") "+
+ String msg = "Timed out waiting for bindings (bindingCount = " +
bindingCount + " (expecting " + expectedConsumerCount + ") "+
", totConsumers = " +
- totConsumers + " (expecting " + consumerCount +
")" +
+ totConsumers + " (expecting " + expectedConsumerCount +
")" +
")";
ClusterTestBase.log.error(msg);
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-17
16:47:24 UTC (rev 10851)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-18
04:40:18 UTC (rev 10852)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.logging.Logger;
/**
* A SimpleSymmetricClusterTest
@@ -30,6 +31,7 @@
// Constants -----------------------------------------------------
+ static final Logger log = Logger.getLogger(SimpleSymmetricClusterTest.class);
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -41,13 +43,6 @@
public void setUp() throws Exception
{
super.setUp();
- setupServer(0, true, isNetty());
- setupServer(1, true, isNetty());
-
- setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1);
- setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0);
-
- startServers(1, 0);
}
@@ -93,7 +88,43 @@
public void testSimple() throws Exception
{
+ setupServer(0, true, isNetty());
+ setupServer(1, true, isNetty());
+ setupServer(2, true, isNetty());
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 2, 0);
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ Thread.sleep(1000);
+
+ for (int i = 0; i < 10; i++)
log.info("****************************");
+ for (int i = 0; i <= 2; i++)
+ {
+ log.info("*************************************\n " + servers[i] +
" topology:\n" + servers[i].getClusterManager().getTopology().describe());
+ }
+ for (int i = 0; i < 10; i++)
log.info("****************************");
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
}
// Package protected ---------------------------------------------