Author: jmesnil
Date: 2009-11-27 08:41:41 -0500 (Fri, 27 Nov 2009)
New Revision: 8429
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FileStorageDiscoveryClusterWithBackupFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyDiscoveryClusterWithBackupFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
Log:
re-added failover tests for cluster with backup (w/ discovery groups)
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-27
11:54:46 UTC (rev 8428)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-27
13:41:41 UTC (rev 8429)
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -73,6 +74,69 @@
private static final Logger log = Logger.getLogger(FailoverManagerImpl.class);
+ // debug
+
+ private static Map<TransportConfiguration, Set<RemotingConnection>>
debugConns;
+
+ private static boolean debug = false;
+
+ public static void enableDebug()
+ {
+ debug = true;
+
+ debugConns = new ConcurrentHashMap<TransportConfiguration,
Set<RemotingConnection>>();
+ }
+
+ public static void disableDebug()
+ {
+ debug = false;
+
+ debugConns.clear();
+ debugConns = null;
+ }
+
+ private void checkAddDebug(final RemotingConnection conn)
+ {
+ Set<RemotingConnection> conns;
+
+ synchronized (debugConns)
+ {
+ conns = debugConns.get(connectorConfig);
+
+ if (conns == null)
+ {
+ conns = new HashSet<RemotingConnection>();
+
+ debugConns.put(connectorConfig, conns);
+ }
+
+ conns.add(conn);
+ }
+ }
+
+ public static void failAllConnectionsForConnector(final TransportConfiguration
config)
+ {
+ Set<RemotingConnection> conns;
+
+ synchronized (debugConns)
+ {
+ conns = debugConns.get(config);
+
+ if (conns != null)
+ {
+ conns = new HashSet<RemotingConnection>(debugConns.get(config));
+ }
+ }
+
+ if (conns != null)
+ {
+ for (RemotingConnection conn : conns)
+ {
+ conn.fail(new HornetQException(HornetQException.INTERNAL_ERROR,
"blah"));
+ }
+ }
+ }
+
// Attributes
//
-----------------------------------------------------------------------------------
@@ -773,6 +837,12 @@
}
else
{
+
+ if (debug)
+ {
+ checkAddDebug(theConnection);
+ }
+
return theConnection;
}
}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2009-11-27
13:41:41 UTC (rev 8429)
@@ -0,0 +1,243 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.Map;
+
+import org.hornetq.core.client.impl.FailoverManagerImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+
+/**
+ *
+ * A ClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 9 Mar 2009 16:31:21
+ *
+ *
+ */
+public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase
+{
+ private static final Logger log =
Logger.getLogger(ClusterWithBackupFailoverTestBase.class);
+
+
+ protected abstract void setupCluster(final boolean forwardWhenNoConsumers) throws
Exception;
+
+ protected abstract void setupServers() throws Exception;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ FailoverManagerImpl.enableDebug();
+
+ setupServers();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServers();
+
+ FailoverManagerImpl.disableDebug();
+
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+
+ public void testFailAllNodes() throws Exception
+ {
+ setupCluster();
+
+ startServers(3, 4, 5, 0, 1, 2);
+
+ setupSessionFactory(0, 3, isNetty(), false);
+ setupSessionFactory(1, 4, isNetty(), false);
+ setupSessionFactory(2, 5, isNetty(), false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ failNode(0);
+
+ // live nodes
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ // activated backup nodes
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+
+ // live nodes
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+ // activated backup nodes
+ waitForBindings(3, "queues.testaddress", 2, 2, false);
+
+ log.info("** now sending");
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ failNode(1);
+
+ // live nodes
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ // activated backup nodes
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ // live nodes
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+ // activated backup nodes
+ waitForBindings(3, "queues.testaddress", 2, 2, false);
+ waitForBindings(4, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ failNode(2);
+
+ // activated backup nodes
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+ waitForBindings(5, "queues.testaddress", 1, 1, true);
+
+ // activated backup nodes
+ waitForBindings(3, "queues.testaddress", 2, 2, false);
+ waitForBindings(4, "queues.testaddress", 2, 2, false);
+ waitForBindings(5, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+ removeConsumer(0);
+ removeConsumer(1);
+ removeConsumer(2);
+
+ stopServers();
+
+ log.info("*** test done");
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2, 3, 4, 5);
+ }
+
+ protected void failNode(int node) throws Exception
+ {
+ log.info("*** failing node " + node);
+
+ Map<String, Object> params = generateParams(node, isNetty());
+
+ TransportConfiguration serverTC;
+
+ if (isNetty())
+ {
+ serverTC = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+ }
+ else
+ {
+ serverTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+ }
+
+ HornetQServer server = getServer(node);
+
+ //Prevent remoting service taking any more connections
+ server.getRemotingService().freeze();
+
+ //Stop it broadcasting
+ for (BroadcastGroup group: server.getClusterManager().getBroadcastGroups())
+ {
+ group.stop();
+ }
+
+ FailoverManagerImpl.failAllConnectionsForConnector(serverTC);
+
+ server.stop();
+ }
+
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java 2009-11-27
13:41:41 UTC (rev 8429)
@@ -0,0 +1,76 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A DiscoveryClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DiscoveryClusterWithBackupFailoverTest extends
ClusterWithBackupFailoverTestBase
+{
+ private static final Logger log =
Logger.getLogger(DiscoveryClusterWithBackupFailoverTest.class);
+
+ protected static final String groupAddress = "230.1.2.3";
+
+ protected static final int groupPort = 6745;
+
+ @Override
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ // The lives
+
+ setupDiscoveryClusterConnection("cluster0", 0, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster1", 1, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster2", 2, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+
+ // The backups
+
+ setupDiscoveryClusterConnection("cluster0", 3, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster1", 4, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster2", 5, "dg1",
"queues", forwardWhenNoConsumers, 1, isNetty());
+ }
+
+ @Override
+ protected void setupServers() throws Exception
+ {
+ // The lives
+ setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(),
3);
+ setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(),
4);
+ setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(),
5);
+
+ // The backups
+ setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(),
true);
+ setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(),
true);
+ setupServerWithDiscovery(5, groupAddress, groupPort, isFileStorage(), isNetty(),
true);
+ }
+
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FileStorageDiscoveryClusterWithBackupFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FileStorageDiscoveryClusterWithBackupFailoverTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FileStorageDiscoveryClusterWithBackupFailoverTest.java 2009-11-27
13:41:41 UTC (rev 8429)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.failover;
+
+/**
+ * A FileStorageClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class FileStorageDiscoveryClusterWithBackupFailoverTest extends
DiscoveryClusterWithBackupFailoverTest
+{
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyDiscoveryClusterWithBackupFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyDiscoveryClusterWithBackupFailoverTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyDiscoveryClusterWithBackupFailoverTest.java 2009-11-27
13:41:41 UTC (rev 8429)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.failover;
+
+/**
+ * A NettyDiscoveryClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyDiscoveryClusterWithBackupFailoverTest extends
DiscoveryClusterWithBackupFailoverTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+}
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java 2009-11-27
13:41:41 UTC (rev 8429)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.failover;
+
+/**
+ * A NettyFileStorageDiscoveryClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageDiscoveryClusterWithBackupFailoverTest extends
DiscoveryClusterWithBackupFailoverTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+}
\ No newline at end of file