[hornetq-commits] JBoss hornetq SVN: r9825 - in branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration: jms/cluster and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 1 05:44:00 EDT 2010


Author: ataylor
Date: 2010-11-01 05:44:00 -0400 (Mon, 01 Nov 2010)
New Revision: 9825

Modified:
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
Log:
test fixes

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2010-10-30 13:58:39 UTC (rev 9824)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2010-11-01 09:44:00 UTC (rev 9825)
@@ -33,8 +33,10 @@
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
 import org.hornetq.core.server.cluster.Bridge;
 import org.hornetq.core.server.cluster.impl.BridgeImpl;
+import org.hornetq.core.server.impl.InVMNodeManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 
 /**
@@ -73,14 +75,15 @@
    // Fail bridge and reconnecting immediately
    public void testFailoverAndReconnectImmediately() throws Exception
    {
+      NodeManager nodeManager = new InVMNodeManager();
       Map<String, Object> server0Params = new HashMap<String, Object>();
-      HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
+      HornetQServer server0 = createHornetQServer(0, server0Params, isNetty(), nodeManager);
 
       Map<String, Object> server1Params = new HashMap<String, Object>();
       HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
 
       Map<String, Object> server2Params = new HashMap<String, Object>();
-      HornetQServer service2 = createHornetQServer(2, server2Params, isNetty(), true);
+      HornetQServer service2 = createBackupHornetQServer(2, server2Params, isNetty(), 0, nodeManager);
 
       TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
 
@@ -145,26 +148,34 @@
       server1.start();
       server0.start();
 
+
+
+      BridgeReconnectTest.log.info("** failing connection");
+      // Now we will simulate a failure of the bridge connection between server0 and server1
+      /*Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+      RemotingConnection forwardingConnection = getForwardingConnection(bridge);
+      forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));*/
+      server0.kill();
+
+      waitForServerStart(service2);
+
       ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server2tc);
-         ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
 
+      ClientSessionFactory csf0 = locator.createSessionFactory(server2tc);
+
       ClientSession session0 = csf0.createSession(false, true, true);
 
+
+      ClientProducer prod0 = session0.createProducer(testAddress);
+      
       ClientSessionFactory csf2 = locator.createSessionFactory(server2tc);
+
       ClientSession session2 = csf2.createSession(false, true, true);
 
-      ClientProducer prod0 = session0.createProducer(testAddress);
-
       ClientConsumer cons2 = session2.createConsumer(queueName0);
 
       session2.start();
-
-      BridgeReconnectTest.log.info("** failing connection");
-      // Now we will simulate a failure of the bridge connection between server0 and server1
-      Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
-      RemotingConnection forwardingConnection = getForwardingConnection(bridge);
-      forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
+      
       final int numMessages = 10;
 
       SimpleString propKey = new SimpleString("propkey");
@@ -198,17 +209,20 @@
       Assert.assertEquals(0, service2.getRemotingService().getConnections().size());
    }
 
+
    // Fail bridge and attempt failover a few times before succeeding
    public void testFailoverAndReconnectAfterAFewTries() throws Exception
    {
+      NodeManager nodeManager = new InVMNodeManager();
+
       Map<String, Object> server0Params = new HashMap<String, Object>();
-      HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
+      HornetQServer server0 = createHornetQServer(0, server0Params, isNetty(), nodeManager);
 
       Map<String, Object> server1Params = new HashMap<String, Object>();
       HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
 
       Map<String, Object> server2Params = new HashMap<String, Object>();
-      HornetQServer service2 = createHornetQServer(2, server2Params, isNetty(), true);
+      HornetQServer service2 = createBackupHornetQServer(2, server2Params, isNetty(), 0, nodeManager);
 
       TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
 
@@ -272,8 +286,13 @@
       service2.start();
       server1.start();
       server0.start();
-      ServerLocator locator = HornetQClient.createServerLocatorWithHA(server0tc, server2tc);
-      ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
+      // Now we will simulate a failure of the bridge connection between server0 and server1
+      server0.kill();
+
+
+      ServerLocator locator = HornetQClient.createServerLocatorWithHA(server2tc);
+      locator.setReconnectAttempts(100);
+      ClientSessionFactory csf0 = locator.createSessionFactory(server2tc);
       ClientSession session0 = csf0.createSession(false, true, true);
 
       ClientSessionFactory csf2 = locator.createSessionFactory(server2tc);
@@ -284,17 +303,8 @@
       ClientConsumer cons2 = session2.createConsumer(queueName0);
 
       session2.start();
+     
 
-      // Now we will simulate a failure of the bridge connection between server0 and server1
-      Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
-      RemotingConnection forwardingConnection = getForwardingConnection(bridge);
-      InVMConnector.failOnCreateConnection = true;
-      InVMConnector.numberOfFailures = reconnectAttempts - 1;
-      forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
-      forwardingConnection = getForwardingConnection(bridge);
-      forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
       final int numMessages = 10;
 
       SimpleString propKey = new SimpleString("propkey");

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java	2010-10-30 13:58:39 UTC (rev 9824)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java	2010-11-01 09:44:00 UTC (rev 9825)
@@ -23,8 +23,14 @@
 import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
 import org.hornetq.tests.util.UnitTestCase;
 
+import javax.management.MBeanServer;
+
 /**
  * A BridgeTestBase
  *
@@ -75,18 +81,22 @@
 
    protected HornetQServer createHornetQServer(final int id, final boolean netty, final Map<String, Object> params)
    {
-      return createHornetQServer(id, params, netty, false);
+      return createHornetQServer(id, params, netty, null);
    }
 
+    protected HornetQServer createHornetQServer(final int id, final boolean netty, final Map<String, Object> params, NodeManager nodeManager)
+   {
+      return createHornetQServer(id, params, netty, nodeManager);
+   }
+
    protected HornetQServer createHornetQServer(final int id,
                                                final Map<String, Object> params,
                                                final boolean netty,
-                                               final boolean backup)
+                                               final NodeManager nodeManager)
    {
       Configuration serviceConf = new ConfigurationImpl();
       serviceConf.setClustered(true);
       serviceConf.setSecurityEnabled(false);
-      serviceConf.setBackup(backup);
       serviceConf.setSharedStore(true);
       serviceConf.setJournalType(getDefaultJournalType());
       serviceConf.setBindingsDirectory(getBindingsDir(id, false));
@@ -111,11 +121,130 @@
          serviceConf.getAcceptorConfigurations()
                     .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
       }
-      HornetQServer service = HornetQServers.newHornetQServer(serviceConf, true);
+      HornetQServer service;
+      if(nodeManager == null)
+      {
+         service = HornetQServers.newHornetQServer(serviceConf, true);
+      }
+      else
+      {
+         service = new InVMNodeManagerServer(serviceConf, nodeManager);
+      }
 
       servers.add(service);
 
       return service;
    }
 
+   protected HornetQServer createBackupHornetQServer(final int id,
+                                               final Map<String, Object> params,
+                                               final boolean netty,
+                                               final int liveId,
+                                               final NodeManager nodeManager)
+   {
+      Configuration serviceConf = new ConfigurationImpl();
+      serviceConf.setClustered(true);
+      serviceConf.setSecurityEnabled(false);
+      serviceConf.setBackup(true);
+      serviceConf.setSharedStore(true);
+      serviceConf.setJournalType(getDefaultJournalType());
+      serviceConf.setBindingsDirectory(getBindingsDir(liveId, false));
+      serviceConf.setJournalMinFiles(2);
+      serviceConf.setJournalDirectory(getJournalDir(liveId, false));
+      serviceConf.setPagingDirectory(getPageDir(liveId, false));
+      serviceConf.setLargeMessagesDirectory(getLargeMessagesDir(liveId, false));
+      // these tests don't need any big storage so limiting the size of the journal files to speed up the test
+      serviceConf.setJournalFileSize(100 * 1024);
+
+      if (netty)
+      {
+         params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
+                    org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + id);
+         serviceConf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),
+                                                                                params));
+
+      }
+      else
+      {
+         params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, id);
+         serviceConf.getAcceptorConfigurations()
+                    .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
+      }
+      HornetQServer service;
+      if(nodeManager == null)
+      {
+         service = HornetQServers.newHornetQServer(serviceConf, true);
+      }
+      else
+      {
+         service = new InVMNodeManagerServer(serviceConf, nodeManager);
+      }
+
+      servers.add(service);
+
+      return service;
+   }
+
+
+   protected void waitForServerStart(HornetQServer server) throws Exception
+   {
+      long start = System.currentTimeMillis();
+      do
+      {
+         if (server.isInitialised())
+         {
+            return;
+         }
+         Thread.sleep(10);
+      }
+      while (System.currentTimeMillis() - start < 5000);
+
+      String msg = "Timed out waiting for server starting = " + server;
+
+
+      throw new IllegalStateException(msg);
+   }
+
+    // Inner classes -------------------------------------------------
+   class InVMNodeManagerServer extends HornetQServerImpl
+   {
+      final NodeManager nodeManager;
+      public InVMNodeManagerServer(NodeManager nodeManager)
+      {
+         super();
+         this.nodeManager = nodeManager;
+      }
+
+      public InVMNodeManagerServer(Configuration configuration, NodeManager nodeManager)
+      {
+         super(configuration);
+         this.nodeManager = nodeManager;
+      }
+
+      public InVMNodeManagerServer(Configuration configuration, MBeanServer mbeanServer, NodeManager nodeManager)
+      {
+         super(configuration, mbeanServer);
+         this.nodeManager = nodeManager;
+      }
+
+      public InVMNodeManagerServer(Configuration configuration, HornetQSecurityManager securityManager, NodeManager nodeManager)
+      {
+         super(configuration, securityManager);
+         this.nodeManager = nodeManager;
+      }
+
+      public InVMNodeManagerServer(Configuration configuration, MBeanServer mbeanServer, HornetQSecurityManager securityManager, NodeManager nodeManager)
+      {
+         super(configuration, mbeanServer, securityManager);
+         this.nodeManager = nodeManager;
+      }
+
+      @Override
+      protected NodeManager createNodeManager(String directory)
+      {
+         return nodeManager;
+      }
+
+   }
+
 }

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java	2010-10-30 13:58:39 UTC (rev 9824)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java	2010-11-01 09:44:00 UTC (rev 9825)
@@ -118,7 +118,7 @@
 
       jbcf.setReconnectAttempts(-1);
 
-      Connection conn = jbcf.createConnection();
+      Connection conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
 
       Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -140,7 +140,7 @@
 
       jbcf.setReconnectAttempts(-1);
 
-      Connection conn = jbcf.createConnection();
+      Connection conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
 
       Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -168,7 +168,7 @@
 
       jbcf.setConsumerWindowSize(numMessages * bodySize / 10);
 
-      Connection conn = jbcf.createConnection();
+      Connection conn = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
 
       MyExceptionListener listener = new MyExceptionListener();
 
@@ -336,7 +336,10 @@
       backupConf.getConnectorConfigurations().put(backuptc.getName(), backuptc);
       ArrayList<String> staticConnectors = new ArrayList<String>();
       staticConnectors.add(livetc.getName());
+      ClusterConnectionConfiguration cccBackup = new ClusterConnectionConfiguration("cluster1", "jms", backuptc.getName(), -1, false, false, 1, 1,
+               staticConnectors);
 
+      backupConf.getClusterConfigurations().add(cccBackup);
       backupConf.setSecurityEnabled(false);
       backupConf.setJournalType(getDefaultJournalType());
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2010-10-30 13:58:39 UTC (rev 9824)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2010-11-01 09:44:00 UTC (rev 9825)
@@ -27,11 +27,16 @@
 import junit.framework.Assert;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
+import org.hornetq.jms.client.HornetQConnection;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -39,6 +44,8 @@
 import org.hornetq.tests.util.RandomUtil;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -47,10 +54,8 @@
  * A JMSUtil
  *
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * Created 14 nov. 2008 13:48:08
- *
- *
+ *         <p/>
+ *         Created 14 nov. 2008 13:48:08
  */
 public class JMSUtil
 {
@@ -181,25 +186,25 @@
       }
    }
 
-    public static void waitForServer(HornetQServer server)
+   public static void waitForServer(HornetQServer server)
          throws InterruptedException
    {
-      long timetowait =System.currentTimeMillis() + 5000;
-      while(!server.isStarted())
+      long timetowait = System.currentTimeMillis() + 5000;
+      while (!server.isStarted())
       {
          Thread.sleep(100);
-         if(server.isStarted())
+         if (server.isStarted())
          {
             break;
          }
-         else if(System.currentTimeMillis() > timetowait)
+         else if (System.currentTimeMillis() > timetowait)
          {
             throw new IllegalStateException("server didnt start");
          }
       }
    }
 
-    public static void crash(HornetQServer server, ClientSession... sessions) throws Exception
+   public static void crash(HornetQServer server, ClientSession... sessions) throws Exception
    {
       final CountDownLatch latch = new CountDownLatch(sessions.length);
 
@@ -219,21 +224,16 @@
       {
          session.addFailureListener(new MyListener());
       }
-      Set<RemotingConnection> connections = server.getRemotingService().getConnections();
+      /*Set<RemotingConnection> connections = server.getRemotingService().getConnections();
       for (RemotingConnection remotingConnection : connections)
       {
          remotingConnection.destroy();
          server.getRemotingService().removeConnection(remotingConnection.getID());
-      }
+      }*/
 
       ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
       clusterManager.clear();
       server.kill();
-      // recreate the live.lock file (since it was deleted by the
-      // clean stop
-      File lockFile = new File(server.getConfiguration().getJournalDirectory(), "live.lock");
-      Assert.assertFalse(lockFile.exists());
-      lockFile.createNewFile();
 
 
       // Wait to be informed of failure
@@ -254,4 +254,55 @@
 
    // Inner classes -------------------------------------------------
 
+   public static HornetQConnection createConnectionAndWaitForTopology(HornetQConnectionFactory factory, int topologyMembers, int timeout) throws Exception
+   {
+      HornetQConnection conn;
+      CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+
+      ServerLocator locator = factory.getServerLocator();
+
+      locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+
+      conn = (HornetQConnection) factory.createConnection();
+
+      boolean ok = countDownLatch.await(timeout, TimeUnit.SECONDS);
+      if (!ok)
+      {
+         throw new IllegalStateException("timed out waiting for topology");
+      }
+      return conn;
+   }
+
+   static class LatchClusterTopologyListener implements ClusterTopologyListener
+   {
+      final CountDownLatch latch;
+      int liveNodes = 0;
+      int backUpNodes = 0;
+      List<String> liveNode = new ArrayList<String>();
+      List<String> backupNode = new ArrayList<String>();
+
+      public LatchClusterTopologyListener(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+      {
+         if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
+         {
+            liveNode.add(connectorPair.a.getName());
+            latch.countDown();
+         }
+         if (connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
+         {
+            backupNode.add(connectorPair.b.getName());
+            latch.countDown();
+         }
+      }
+
+      public void nodeDown(String nodeID)
+      {
+         //To change body of implemented methods use File | Settings | File Templates.
+      }
+   }
 }



More information about the hornetq-commits mailing list