Author: clebert.suconic(a)jboss.com
Date: 2012-02-20 17:05:36 -0500 (Mon, 20 Feb 2012)
New Revision: 12150
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
Log:
fixing tests
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-20
22:01:15 UTC (rev 12149)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-20
22:05:36 UTC (rev 12150)
@@ -41,6 +41,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.config.BroadcastGroupConfiguration;
@@ -189,54 +190,76 @@
private static final int MAX_CONSUMERS = 100;
- private static class ConsumerHolder
- {
- final ClientConsumer consumer;
+ protected static class ConsumerHolder
+ {
+ final ClientConsumer consumer;
- final ClientSession session;
+ final ClientSession session;
- final int id;
+ final int id;
- ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession
session)
- {
- this.id = id;
+ final int node;
- this.consumer = consumer;
- this.session = session;
- }
+ public ClientConsumer getConsumer()
+ {
+ return consumer;
+ }
- void close()
- {
- if (consumer != null)
- {
- try
- {
- consumer.close();
- }
- catch (HornetQException e)
- {
- // ignore
- }
- }
- if (session != null) {
- try
- {
- session.close();
- }
- catch (HornetQException e)
- {
- // ignore
- }
- }
- }
+ public ClientSession getSession()
+ {
+ return session;
+ }
- @Override
- public String toString()
- {
- return "id=" + id + ", consumer=" + consumer + ",
session=" + session;
- }
- }
+ public int getId()
+ {
+ return id;
+ }
+ public int getNode()
+ {
+ return node;
+ }
+
+ ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession
session, int node)
+ {
+ this.id = id;
+ this.node = node;
+
+ this.consumer = consumer;
+ this.session = session;
+ }
+
+ void close()
+ {
+ if (consumer != null)
+ {
+ try
+ {
+ consumer.close();
+ } catch (HornetQException e)
+ {
+ // ignore
+ }
+ }
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ } catch (HornetQException e)
+ {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "id=" + id + ", consumer=" + consumer + ",
session=" + session;
+ }
+ }
+
protected ClientConsumer getConsumer(final int node)
{
return consumers[node].consumer;
@@ -576,7 +599,7 @@
session.start();
- consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session);
+ consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session,
node);
}
catch (Exception e)
{
@@ -1494,23 +1517,25 @@
Map<String, Object> params = generateParams(node, netty);
- TransportConfiguration serverTotc;
+ TransportConfiguration serverToTC;
if (netty)
{
- serverTotc = new TransportConfiguration(UnitTestCase.NETTY_CONNECTOR_FACTORY,
params);
+ serverToTC = new TransportConfiguration(UnitTestCase.NETTY_CONNECTOR_FACTORY,
params);
}
else
{
- serverTotc = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY,
params);
+ serverToTC = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY,
params);
}
- locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+ locators[node] = HornetQClient.createServerLocatorWithHA(serverToTC);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
locators[node].setReconnectAttempts(-1);
locators[node].setBlockOnNonDurableSend(blocking);
locators[node].setBlockOnDurableSend(blocking);
+
((ServerLocatorInternal)locators[node]).setIdentity("TestClientConnector,live="
+ node + ",backup=" + backupNode);
+
addServerLocator(locators[node]);
ClientSessionFactory sf = createSessionFactory(locators[node]);
sfs[node] = sf;
@@ -2028,12 +2053,10 @@
servers[node].start();
log.info("started server " + servers[node]);
- }
+ waitForServer(servers[node]);
- for (int node : nodes)
- {
- waitForServer(servers[node]);
}
+
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2012-02-20
22:01:15 UTC (rev 12149)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2012-02-20
22:05:36 UTC (rev 12150)
@@ -22,14 +22,14 @@
package org.hornetq.tests.integration.cluster.failover;
-import java.util.Set;
+import java.util.HashSet;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.cluster.BroadcastGroup;
-import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
-import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
/**
*
@@ -61,11 +61,17 @@
return false;
}
- public void testFailLiveNodes() throws Exception
+ public void testFailLiveNodes() throws Throwable
{
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
+ //startServers(0, 1, 2, 3, 4, 5);
+
+ for (int i = 0 ; i < 3; i++)
+ {
+ waitForTopology(servers[i], 3, 3);
+ }
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
@@ -80,8 +86,11 @@
createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(0, 0, QUEUE_NAME, null);
+ waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
addConsumer(1, 1, QUEUE_NAME, null);
+ waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
addConsumer(2, 2, QUEUE_NAME, null);
+ waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings();
@@ -94,6 +103,8 @@
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
Thread.sleep(1000);
+ log.info("######### Topology on client = " +
locators[0].getTopology().describe() + " locator = " + locators[0]);
+ log.info("######### Crashing it........., sfs[0] = " + sfs[0]);
failNode(0);
waitForFailoverTopology(4, 3, 1, 2);
@@ -191,6 +202,12 @@
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
+
+ for (int i = 0 ; i < 3; i++)
+ {
+ waitForTopology(servers[i], 3, 3);
+ }
+
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
@@ -263,42 +280,56 @@
{
setupCluster(false);
}
+
protected void failNode(final int node) throws Exception
{
+ failNode(node, node);
+ }
+
+
+ /**
+ *
+ * @param node The node which we should fail
+ * @param originalLiveNode The number of the original node, to locate session to fail
+ * @throws Exception
+ */
+ protected void failNode(final int node, final int originalLiveNode) throws Exception
+ {
ClusterWithBackupFailoverTestBase.log.info("*** failing node " + node);
HornetQServer server = getServer(node);
+
+ TestableServer tstServer = new SameProcessHornetQServer(server);
+
+ ClientSession[] sessionsArray = exploreSessions(originalLiveNode);
+
+ tstServer.crash(sessionsArray);
+ }
- // Prevent remoting service taking any more connections
- server.getRemotingService().freeze();
+ private ClientSession[] exploreSessions(final int node)
+ {
+ HashSet<ClientSession> sessions = new HashSet<ClientSession>();
- if (server.getClusterManager() != null)
- {
- // Stop it broadcasting
- for (BroadcastGroup group : server.getClusterManager().getBroadcastGroups())
- {
- group.stop();
- }
- }
- Set<RemotingConnection> connections =
server.getRemotingService().getConnections();
- for (RemotingConnection remotingConnection : connections)
- {
- remotingConnection.destroy();
- server.getRemotingService().removeConnection(remotingConnection.getID());
- }
+ for (ConsumerHolder holder : consumers)
+ {
+ if (holder != null && holder.getNode() == node && holder.getSession()
!= null)
+ {
+ sessions.add(holder.getSession());
+ }
+ }
- ClusterManagerImpl clusterManager = (ClusterManagerImpl)
server.getClusterManager();
- clusterManager.clear();
+ ClientSession[] sessionsArray = sessions.toArray(new ClientSession[sessions.size()]);
+ return sessionsArray;
+ }
- server.stop(true);
- }
-
public void testFailAllNodes() throws Exception
{
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
+
+
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
@@ -391,13 +422,14 @@
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
removeConsumer(1);
- failNode(4);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 0, false);
+ failNode(4, 1);
+
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 2);