[hornetq-commits] JBoss hornetq SVN: r12150 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster: failover and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 20 17:05:37 EST 2012


Author: clebert.suconic at jboss.com
Date: 2012-02-20 17:05:36 -0500 (Mon, 20 Feb 2012)
New Revision: 12150

Modified:
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
Log:
fixing tests

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2012-02-20 22:01:15 UTC (rev 12149)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2012-02-20 22:05:36 UTC (rev 12150)
@@ -41,6 +41,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.config.BroadcastGroupConfiguration;
@@ -189,54 +190,76 @@
 
    private static final int MAX_CONSUMERS = 100;
 
-   private static class ConsumerHolder
-   {
-      final ClientConsumer consumer;
+	protected static class ConsumerHolder
+	{
+		final ClientConsumer consumer;
 
-      final ClientSession session;
+		final ClientSession session;
 
-      final int id;
+		final int id;
 
-      ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession session)
-      {
-         this.id = id;
+		final int node;
 
-         this.consumer = consumer;
-         this.session = session;
-      }
+		public ClientConsumer getConsumer()
+		{
+			return consumer;
+		}
 
-      void close()
-      {
-         if (consumer != null)
-         {
-            try
-            {
-               consumer.close();
-            }
-            catch (HornetQException e)
-            {
-               // ignore
-            }
-         }
-         if (session != null) {
-            try
-            {
-               session.close();
-            }
-            catch (HornetQException e)
-            {
-               // ignore
-            }
-      }
-      }
+		public ClientSession getSession()
+		{
+			return session;
+		}
 
-      @Override
-      public String toString()
-      {
-         return "id=" + id + ", consumer=" + consumer + ", session=" + session;
-      }
-   }
+		public int getId()
+		{
+			return id;
+		}
 
+		public int getNode()
+		{
+			return node;
+		}
+
+		ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession session, int node)
+		{
+			this.id = id;
+			this.node = node;
+
+			this.consumer = consumer;
+			this.session = session;
+		}
+
+		void close()
+		{
+			if (consumer != null)
+			{
+				try
+				{
+					consumer.close();
+				} catch (HornetQException e)
+				{
+					// ignore
+				}
+			}
+			if (session != null)
+			{
+				try
+				{
+					session.close();
+				} catch (HornetQException e)
+				{
+					// ignore
+				}
+			}
+		}
+
+		@Override
+		public String toString()
+		{
+			return "id=" + id + ", consumer=" + consumer + ", session=" + session;
+		}
+	}
+
    protected ClientConsumer getConsumer(final int node)
    {
       return consumers[node].consumer;
@@ -576,7 +599,7 @@
 
          session.start();
 
-         consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session);
+         consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session, node);
       }
       catch (Exception e)
       {
@@ -1494,23 +1517,25 @@
 
       Map<String, Object> params = generateParams(node, netty);
 
-      TransportConfiguration serverTotc;
+      TransportConfiguration serverToTC;
 
       if (netty)
       {
-         serverTotc = new TransportConfiguration(UnitTestCase.NETTY_CONNECTOR_FACTORY, params);
+    	  serverToTC = new TransportConfiguration(UnitTestCase.NETTY_CONNECTOR_FACTORY, params);
       }
       else
       {
-         serverTotc = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params);
+    	  serverToTC = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params);
       }
 
-      locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+      locators[node] = HornetQClient.createServerLocatorWithHA(serverToTC);
       locators[node].setRetryInterval(100);
       locators[node].setRetryIntervalMultiplier(1d);
       locators[node].setReconnectAttempts(-1);
       locators[node].setBlockOnNonDurableSend(blocking);
       locators[node].setBlockOnDurableSend(blocking);
+   	  ((ServerLocatorInternal)locators[node]).setIdentity("TestClientConnector,live=" + node + ",backup=" + backupNode);
+
       addServerLocator(locators[node]);
       ClientSessionFactory sf = createSessionFactory(locators[node]);
       sfs[node] = sf;
@@ -2028,12 +2053,10 @@
          servers[node].start();
 
          log.info("started server " + servers[node]);
-      }
+         waitForServer(servers[node]);
 
-      for (int node : nodes)
-      {
-         waitForServer(servers[node]);
       }
+      
 
    }
 

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java	2012-02-20 22:01:15 UTC (rev 12149)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java	2012-02-20 22:05:36 UTC (rev 12150)
@@ -22,14 +22,14 @@
 
 package org.hornetq.tests.integration.cluster.failover;
 
-import java.util.Set;
+import java.util.HashSet;
 
+import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.cluster.BroadcastGroup;
-import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
-import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
 
 /**
  *
@@ -61,11 +61,17 @@
       return false;
    }
 
-   public void testFailLiveNodes() throws Exception
+   public void testFailLiveNodes() throws Throwable
    {
          setupCluster();
 
          startServers(3, 4, 5, 0, 1, 2);
+         //startServers(0, 1, 2, 3, 4, 5);
+         
+         for (int i = 0 ; i < 3; i++)
+         {
+             waitForTopology(servers[i], 3, 3);
+         }
 
          waitForFailoverTopology(3, 0, 1, 2);
          waitForFailoverTopology(4, 0, 1, 2);
@@ -80,8 +86,11 @@
          createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
 
          addConsumer(0, 0, QUEUE_NAME, null);
+         waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
          addConsumer(1, 1, QUEUE_NAME, null);
+         waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
          addConsumer(2, 2, QUEUE_NAME, null);
+         waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
 
          waitForBindings();
 
@@ -94,6 +103,8 @@
          send(2, QUEUES_TESTADDRESS, 10, false, null);
          verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
          Thread.sleep(1000);
+         log.info("######### Topology on client = " + locators[0].getTopology().describe() + " locator = " + locators[0]);
+         log.info("######### Crashing it........., sfs[0] = " +  sfs[0]);
          failNode(0);
 
          waitForFailoverTopology(4, 3, 1, 2);
@@ -191,6 +202,12 @@
       setupCluster();
 
       startServers(3, 4, 5, 0, 1, 2);
+      
+      for (int i = 0 ; i < 3; i++)
+      {
+          waitForTopology(servers[i], 3, 3);
+      }
+      
 
       setupSessionFactory(0, 3, isNetty(), false);
       setupSessionFactory(1, 4, isNetty(), false);
@@ -263,42 +280,56 @@
    {
       setupCluster(false);
    }
+   
 
    protected void failNode(final int node) throws Exception
    {
+	   failNode(node, node);
+   }
+
+
+   /**
+    * 
+    * @param node The node which we should fail
+    * @param originalLiveNode The number of the original node, to locate session to fail
+    * @throws Exception
+    */
+   protected void failNode(final int node, final int originalLiveNode) throws Exception
+   {
       ClusterWithBackupFailoverTestBase.log.info("*** failing node " + node);
 
       HornetQServer server = getServer(node);
+      
+      TestableServer tstServer = new SameProcessHornetQServer(server);
+      
+      ClientSession[] sessionsArray = exploreSessions(originalLiveNode);
+      
+      tstServer.crash(sessionsArray);
+   }
 
-      // Prevent remoting service taking any more connections
-      server.getRemotingService().freeze();
+	private ClientSession[] exploreSessions(final int node)
+	{
+		HashSet<ClientSession> sessions = new HashSet<ClientSession>();
 
-      if (server.getClusterManager() != null)
-      {
-         // Stop it broadcasting
-         for (BroadcastGroup group : server.getClusterManager().getBroadcastGroups())
-         {
-            group.stop();
-         }
-      }
-      Set<RemotingConnection> connections = server.getRemotingService().getConnections();
-      for (RemotingConnection remotingConnection : connections)
-      {
-         remotingConnection.destroy();
-         server.getRemotingService().removeConnection(remotingConnection.getID());
-      }
+		for (ConsumerHolder holder : consumers)
+		{
+			if (holder != null && holder.getNode() == node && holder.getSession() != null)
+			{
+				sessions.add(holder.getSession());
+			}
+		}
 
-      ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
-      clusterManager.clear();
+		ClientSession[] sessionsArray = sessions.toArray(new ClientSession[sessions.size()]);
+		return sessionsArray;
+	}
 
-      server.stop(true);
-   }
-
    public void testFailAllNodes() throws Exception
    {
       setupCluster();
 
       startServers(3, 4, 5, 0, 1, 2);
+      
+      
 
       setupSessionFactory(0, 3, isNetty(), false);
       setupSessionFactory(1, 4, isNetty(), false);
@@ -391,13 +422,14 @@
       verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
 
       removeConsumer(1);
-      failNode(4);
 
       // live nodes
       waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
       // live nodes
       waitForBindings(2, QUEUES_TESTADDRESS, 1, 0, false);
 
+      failNode(4, 1);
+
       send(2, QUEUES_TESTADDRESS, 10, false, null);
       verifyReceiveRoundRobinInSomeOrder(true, 10, 2);
 



More information about the hornetq-commits mailing list