[hornetq-commits] JBoss hornetq SVN: r8429 - in trunk: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 27 08:41:41 EST 2009


Author: jmesnil
Date: 2009-11-27 08:41:41 -0500 (Fri, 27 Nov 2009)
New Revision: 8429

Added:
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FileStorageDiscoveryClusterWithBackupFailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyDiscoveryClusterWithBackupFailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
Log:
re-added failover tests for cluster with backup (w/ discovery groups)

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-11-27 11:54:46 UTC (rev 8428)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-11-27 13:41:41 UTC (rev 8429)
@@ -19,6 +19,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -73,6 +74,69 @@
 
    private static final Logger log = Logger.getLogger(FailoverManagerImpl.class);
 
+   // debug
+
+   private static Map<TransportConfiguration, Set<RemotingConnection>> debugConns;
+
+   private static boolean debug = false;
+
+   public static void enableDebug()
+   {
+      debug = true;
+
+      debugConns = new ConcurrentHashMap<TransportConfiguration, Set<RemotingConnection>>();
+   }
+   
+   public static void disableDebug()
+   {
+      debug = false;
+
+      debugConns.clear();
+      debugConns = null;
+   }
+   
+   private void checkAddDebug(final RemotingConnection conn)
+   {
+      Set<RemotingConnection> conns;
+
+      synchronized (debugConns)
+      {
+         conns = debugConns.get(connectorConfig);
+
+         if (conns == null)
+         {
+            conns = new HashSet<RemotingConnection>();
+
+            debugConns.put(connectorConfig, conns);
+         }
+
+         conns.add(conn);
+      }
+   }
+
+   public static void failAllConnectionsForConnector(final TransportConfiguration config)
+   {
+      Set<RemotingConnection> conns;
+
+      synchronized (debugConns)
+      {
+         conns = debugConns.get(config);
+
+         if (conns != null)
+         {
+            conns = new HashSet<RemotingConnection>(debugConns.get(config));
+         }
+      }
+
+      if (conns != null)
+      {
+         for (RemotingConnection conn : conns)
+         {
+            conn.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "blah"));
+         }
+      }
+   }
+
    // Attributes
    // -----------------------------------------------------------------------------------
 
@@ -773,6 +837,12 @@
          }
          else
          {
+
+            if (debug)
+            {
+               checkAddDebug(theConnection);
+            }
+
             return theConnection;
          }
       }

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java	2009-11-27 13:41:41 UTC (rev 8429)
@@ -0,0 +1,243 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.Map;
+
+import org.hornetq.core.client.impl.FailoverManagerImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+
+/**
+ * 
+ * A ClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 9 Mar 2009 16:31:21
+ *
+ *
+ */
+public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase
+{
+   private static final Logger log = Logger.getLogger(ClusterWithBackupFailoverTestBase.class);
+
+
+   protected abstract void setupCluster(final boolean forwardWhenNoConsumers) throws Exception;
+
+   protected abstract void setupServers() throws Exception;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      FailoverManagerImpl.enableDebug();
+      
+      setupServers();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      stopServers();
+
+      FailoverManagerImpl.disableDebug();
+      
+      super.tearDown();
+   }
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return true;
+   }
+
+   public void testFailAllNodes() throws Exception
+   {
+      setupCluster();
+
+      startServers(3, 4, 5, 0, 1, 2);
+
+      setupSessionFactory(0, 3, isNetty(), false);
+      setupSessionFactory(1, 4, isNetty(), false);
+      setupSessionFactory(2, 5, isNetty(), false);
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      createQueue(2, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      addConsumer(2, 2, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      send(1, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+            
+      send(2, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+      
+      failNode(0);
+      
+      // live nodes
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+      // activated backup nodes
+      waitForBindings(3, "queues.testaddress", 1, 1, true);
+
+      // live nodes
+      waitForBindings(1, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+      // activated backup nodes
+      waitForBindings(3, "queues.testaddress", 2, 2, false);
+      
+      log.info("** now sending");
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      send(1, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+            
+      send(2, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      failNode(1);
+
+      // live nodes
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+      // activated backup nodes
+      waitForBindings(3, "queues.testaddress", 1, 1, true);
+      waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+      // live nodes
+      waitForBindings(2, "queues.testaddress", 2, 2, false);
+      // activated backup nodes
+      waitForBindings(3, "queues.testaddress", 2, 2, false);
+      waitForBindings(4, "queues.testaddress", 2, 2, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      send(1, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      send(2, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      failNode(2);
+
+      // activated backup nodes
+      waitForBindings(3, "queues.testaddress", 1, 1, true);
+      waitForBindings(4, "queues.testaddress", 1, 1, true);
+      waitForBindings(5, "queues.testaddress", 1, 1, true);
+
+      // activated backup nodes
+      waitForBindings(3, "queues.testaddress", 2, 2, false);
+      waitForBindings(4, "queues.testaddress", 2, 2, false);
+      waitForBindings(5, "queues.testaddress", 2, 2, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      send(1, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      send(2, "queues.testaddress", 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+      
+      removeConsumer(0);
+      removeConsumer(1);
+      removeConsumer(2);
+      
+      stopServers();
+      
+      log.info("*** test done");
+   }
+
+   protected void setupCluster() throws Exception
+   {
+      setupCluster(false);
+   }
+
+   protected void stopServers() throws Exception
+   {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      stopServers(0, 1, 2, 3, 4, 5);
+   }
+
+   protected void failNode(int node) throws Exception
+   {
+      log.info("*** failing node " + node);
+
+      Map<String, Object> params = generateParams(node, isNetty());
+
+      TransportConfiguration serverTC;
+
+      if (isNetty())
+      {
+         serverTC = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+      }
+      else
+      {
+         serverTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+      }
+      
+      HornetQServer server = getServer(node);
+
+      //Prevent remoting service taking any more connections
+      server.getRemotingService().freeze();
+      
+      //Stop it broadcasting
+      for (BroadcastGroup group: server.getClusterManager().getBroadcastGroups())
+      {
+         group.stop();
+      }
+      
+      FailoverManagerImpl.failAllConnectionsForConnector(serverTC);
+      
+      server.stop();
+   }
+
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java	2009-11-27 13:41:41 UTC (rev 8429)
@@ -0,0 +1,76 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A DiscoveryClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DiscoveryClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase
+{
+   private static final Logger log = Logger.getLogger(DiscoveryClusterWithBackupFailoverTest.class);
+
+   protected static final String groupAddress = "230.1.2.3";
+
+   protected static final int groupPort = 6745;
+
+   @Override
+   protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+   {
+      // The lives
+
+      setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      setupDiscoveryClusterConnection("cluster2", 2, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      // The backups
+
+      setupDiscoveryClusterConnection("cluster0", 3, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      setupDiscoveryClusterConnection("cluster1", 4, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+      setupDiscoveryClusterConnection("cluster2", 5, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+   }
+
+   @Override
+   protected void setupServers() throws Exception
+   {
+      // The lives
+      setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(), 3);
+      setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(), 4);
+      setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(), 5);
+
+      // The backups
+      setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+      setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+      setupServerWithDiscovery(5, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+   }
+
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FileStorageDiscoveryClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FileStorageDiscoveryClusterWithBackupFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FileStorageDiscoveryClusterWithBackupFailoverTest.java	2009-11-27 13:41:41 UTC (rev 8429)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.failover;
+
+/**
+ * A FileStorageClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class FileStorageDiscoveryClusterWithBackupFailoverTest extends DiscoveryClusterWithBackupFailoverTest
+{
+   protected boolean isFileStorage()
+   {
+      return true;
+   }
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyDiscoveryClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyDiscoveryClusterWithBackupFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyDiscoveryClusterWithBackupFailoverTest.java	2009-11-27 13:41:41 UTC (rev 8429)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.failover;
+
+/**
+ * A NettyDiscoveryClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyDiscoveryClusterWithBackupFailoverTest extends DiscoveryClusterWithBackupFailoverTest
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java	2009-11-27 13:41:41 UTC (rev 8429)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.failover;
+
+/**
+ * A NettyFileStorageDiscoveryClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageDiscoveryClusterWithBackupFailoverTest extends DiscoveryClusterWithBackupFailoverTest
+{
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+   protected boolean isFileStorage()
+   {
+      return true;
+   }
+}
\ No newline at end of file



More information about the hornetq-commits mailing list