Author: jmesnil
Date: 2010-08-30 10:18:38 -0400 (Mon, 30 Aug 2010)
New Revision: 9611
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
add tests to check topology updates on HA clients
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-30
08:13:19 UTC (rev 9610)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -579,6 +579,11 @@
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available
servers.");
}
+ if (topologyArray == null && initialConnectors != null
&& attempts == initialConnectors.length)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available
servers.");
+ }
retry = true;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2010-08-30
08:13:19 UTC (rev 9610)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -143,7 +143,14 @@
clearIO();
try
{
+ if (configuration.getStaticConnectors() == null)
+ {
+ return null;
+ }
+ else
+ {
return configuration.getStaticConnectors().toArray(new String[0]);
+ }
}
finally
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-30
08:13:19 UTC (rev 9610)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -68,7 +68,8 @@
interceptors,
config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
.getExecutor()
-
: null);
+
: null,
+
server.getNodeID());
Channel channel1 = rc.getChannel(1, -1);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-08-30
08:13:19 UTC (rev 9610)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -25,10 +25,12 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -88,6 +90,8 @@
private final Executor executor;
private volatile boolean executing;
+
+ private final SimpleString nodeID;
// Constructors
// ---------------------------------------------------------------------------------
@@ -99,7 +103,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection, blockingCallTimeout, interceptors, true, null);
+ this(transportConnection, blockingCallTimeout, interceptors, true, null, null);
}
/*
@@ -107,17 +111,19 @@
*/
public RemotingConnectionImpl(final Connection transportConnection,
final List<Interceptor> interceptors,
- final Executor executor)
+ final Executor executor,
+ final SimpleString nodeID)
{
- this(transportConnection, -1, interceptors, false, executor);
+ this(transportConnection, -1, interceptors, false, executor, nodeID);
}
private RemotingConnectionImpl(final Connection transportConnection,
final long blockingCallTimeout,
final List<Interceptor> interceptors,
final boolean client,
- final Executor executor)
+ final Executor executor,
+ final SimpleString nodeID)
{
this.transportConnection = transportConnection;
@@ -129,6 +135,8 @@
this.client = client;
this.executor = executor;
+
+ this.nodeID = nodeID;
}
// RemotingConnection implementation
@@ -299,6 +307,10 @@
channel.flushConfirmations();
}
+ if (nodeID != null)
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID.toString()));
+ }
channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-30
08:13:19 UTC (rev 9610)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -227,53 +227,4 @@
return serverLocator.createSessionFactory(connector);
}
-
- @Override
- public void connectionFailed(HornetQException me)
- {
- if (!session.isClosed())
- {
- try
- {
- session.cleanUp(false);
- }
- catch (Exception e)
- {
- log.warn("Unable to clean up the session after a connection
failure", e);
- }
- serverLocator.notifyNodeDown(targetNodeID);
- if (serverLocator.getDiscoveryAddress() == null)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- ClientSessionFactory sf = null;
- do
- {
- try
- {
- sf = serverLocator.createSessionFactory(connector);
- }
- catch (HornetQException e)
- {
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- continue;
- }
- }
- catch (Exception e)
- {
- break;
- }
- }
- while (sf == null);
- }
- });
- }
- }
- super.connectionFailed(me);
- }
-
}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class HAClientTopologyTest extends TopologyClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(HAClientTopologyTest.class);
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupClusterConnection("cluster0", "queues",
forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
+ setupClusterConnection("cluster1", "queues",
forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
+ setupClusterConnection("cluster2", "queues",
forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
+ setupClusterConnection("cluster3", "queues",
forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
+ setupClusterConnection("cluster4", "queues",
forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
+ }
+
+ protected void setupServers() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+ setupServer(3, isFileStorage(), isNetty());
+ setupServer(4, isFileStorage(), isNetty());
+ }
+
+ @Override
+ protected ServerLocator createHAServerLocator()
+ {
+ TransportConfiguration tc = createTransportConfiguration(isNetty(), false,
generateParams(0, isNetty()));
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(tc);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ return locator;
+ }
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class HAClientTopologyWithDiscoveryTest extends TopologyClusterTestBase
+{
+ private static final Logger log =
Logger.getLogger(HAClientTopologyWithDiscoveryTest.class);
+
+ protected static final String groupAddress = "230.1.2.3";
+
+ protected static final int groupPort = 6745;
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupDiscoveryClusterConnection("cluster0", 0, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster1", 1, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster2", 2, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster3", 3, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster4", 4, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+ }
+
+ protected void setupServers() throws Exception
+ {
+ setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(),
false);
+ setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(),
false);
+ setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(),
false);
+ setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(),
false);
+ setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(),
false);
+ }
+
+ @Override
+ protected ServerLocator createHAServerLocator()
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(groupAddress,
groupPort);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ return locator;
+ }
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+/**
+ * A NettyHAClientTopologyTest
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class NettyHAClientTopologyTest extends HAClientTopologyTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+/**
+ * A NettyHAClientTopologyWithDiscoveryTest
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class NettyHAClientTopologyWithDiscoveryTest extends
HAClientTopologyWithDiscoveryTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-08-30
14:18:38 UTC (rev 9611)
@@ -0,0 +1,409 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A TopologyClusterTestBase
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public abstract class TopologyClusterTestBase extends ClusterTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(TopologyClusterTestBase.class);
+
+ private static final long WAIT_TIMEOUT = 30000;
+
+ abstract protected ServerLocator createHAServerLocator();
+
+ abstract protected void setupServers() throws Exception;
+
+ abstract protected void setupCluster() throws Exception;
+
+ abstract protected boolean isNetty() throws Exception;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServers();
+
+ setupCluster();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServers(0, 1, 2, 3, 4);
+
+ super.tearDown();
+ }
+
+ /**
+ * Check that the actual list of received nodeIDs correspond to the expected order of
nodes
+ */
+ protected void checkOrder(int[] expected, String[] nodeIDs, List<String>
actual)
+ {
+ assertEquals(expected.length, actual.size());
+ for (int i = 0; i < expected.length; i++)
+ {
+ assertEquals("did not receive expected nodeID at " + i,
nodeIDs[expected[i]], actual.get(i));
+ }
+ }
+
+ protected void checkContains(int[] expected, String[] nodeIDs, List<String>
actual)
+ {
+ long start = System.currentTimeMillis();
+ do
+ {
+ if (expected.length != actual.size())
+ {
+ continue;
+ }
+ boolean ok = true;
+ for (int i = 0; i < expected.length; i++)
+ {
+ ok = (ok && actual.contains(nodeIDs[expected[i]]));
+ }
+ if (ok)
+ {
+ return;
+ }
+ } while(System.currentTimeMillis() - start < 5000);
+ fail("did not contain all expected node ID: " + actual);
+ }
+
+ protected String[] getNodeIDs(int... nodes)
+ {
+ String[] nodeIDs = new String[nodes.length];
+ for (int i = 0; i < nodes.length; i++)
+ {
+ nodeIDs[i] = servers[i].getNodeID().toString();
+ }
+ return nodeIDs;
+ }
+
+ protected ClientSession checkSessionOrReconnect(ClientSession session, ServerLocator
locator) throws Exception
+ {
+ try
+ {
+ String rand = RandomUtil.randomString();
+ session.createQueue(rand, rand);
+ session.deleteQueue(rand);
+ return session;
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.OBJECT_CLOSED || e.getCode() ==
HornetQException.UNBLOCKED)
+ {
+ ClientSessionFactory sf = locator.createSessionFactory();
+ return sf.createSession();
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+ protected void waitForClusterConnections(final int node, final int count) throws
Exception
+ {
+ HornetQServer server = servers[node];
+
+ if (server == null)
+ {
+ throw new IllegalArgumentException("No server at " + node);
+ }
+
+ ClusterManager clusterManager = server.getClusterManager();
+
+ long start = System.currentTimeMillis();
+
+ do
+ {
+ int nodesCount = 0;
+
+ for (ClusterConnection clusterConn : clusterManager.getClusterConnections())
+ {
+ nodesCount += clusterConn.getNodes().size();
+ }
+
+ if (nodesCount == count)
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
+
+ log.error(clusterDescription(servers[node]));
+ throw new IllegalStateException("Timed out waiting for cluster connections
");
+ }
+
+ public void testReceiveNotificationsWhenOtherNodesAreStartedAndStopped() throws
Throwable
+ {
+ startServers(0);
+
+ ServerLocator locator = createHAServerLocator();
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
+ {
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last,
+ int distance)
+ {
+ if(!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
+ }
+ });
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ startServers(1, 4, 3, 2);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10,
SECONDS));
+ checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
+
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ stopServers(2, 3, 1, 4);
+
+ waitForClusterConnections(0, 0);
+
+ assertTrue("Was not notified that all servers are DOWN",
downLatch.await(10, SECONDS));
+ checkContains(new int[] { 0 }, nodeIDs, nodes);
+
+ sf.close();
+
+ stopServers(0);
+ }
+
+ public void testReceiveNotifications() throws Throwable
+ {
+ startServers(0, 1, 2, 3, 4);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+ ServerLocator locator = createHAServerLocator();
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
+ {
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last,
+ int distance)
+ {
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
+ }
+ });
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10,
SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ ClientSession session = sf.createSession();
+
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ stopServers(0);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ stopServers(2);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+
+ stopServers(4);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+
+ stopServers(3);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1 }, nodeIDs, nodes);
+
+ stopServers(1);
+
+ assertTrue("Was not notified that all servers are DOWN",
downLatch.await(10, SECONDS));
+ checkContains(new int[] {}, nodeIDs, nodes);
+
+ sf.close();
+ }
+
+ public void testStopNodes() throws Throwable
+ {
+ startServers(0, 1, 2, 3, 4);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+ ServerLocator locator = createHAServerLocator();
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
+ {
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last,
+ int distance)
+ {
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ }
+ }
+ });
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10,
SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ ClientSession session = sf.createSession();
+
+ stopServers(0);
+ assertFalse(servers[0].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ stopServers(2);
+ assertFalse(servers[2].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+
+ stopServers(4);
+ assertFalse(servers[4].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+
+ stopServers(3);
+ assertFalse(servers[3].isStarted());
+
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1 }, nodeIDs, nodes);
+
+ stopServers(1);
+ assertFalse(servers[1].isStarted());
+ try
+ {
+ session = checkSessionOrReconnect(session, locator);
+ fail();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}