[hornetq-commits] JBoss hornetq SVN: r10980 - in branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests: integration/replication and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 13 11:35:15 EDT 2011


Author: borges
Date: 2011-07-13 11:35:14 -0400 (Wed, 13 Jul 2011)
New Revision: 10980

Added:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java
Modified:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
HORNETQ-720 Fix tests, add some utility methods

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -23,16 +23,22 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.client.impl.DelegatingSession;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
 
 /**
  * A MultiThreadFailoverTest
- * 
+ *
  * Test Failover where failure is prompted by another thread
  *
  * @author Tim Fox
@@ -47,7 +53,7 @@
 
    private volatile ClientSessionFactoryInternal sf;
 
-   private Object lockFail = new Object();
+   private final Object lockFail = new Object();
 
    class MyListener implements SessionFailureListener
    {
@@ -170,7 +176,7 @@
             locator.setBlockOnNonDurableSend(true);
             locator.setBlockOnDurableSend(true);
             locator.setReconnectAttempts(-1);
-            sf = (ClientSessionFactoryInternal) createSessionFactoryAndWaitForTopology(locator, 2);
+            sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
 
             ClientSession createSession = sf.createSession(true, true);
@@ -198,7 +204,7 @@
             // Simulate failure on connection
             synchronized (lockFail)
             {
-               crash((ClientSession) createSession);
+               crash(createSession);
             }
 
             /*if (listener != null)
@@ -226,7 +232,7 @@
             Assert.assertEquals(0, sf.numSessions());
 
             locator.close();
-            
+
             Assert.assertEquals(0, sf.numConnections());
 
             if (i != numIts - 1)
@@ -243,7 +249,7 @@
          DelegatingSession.debug = false;
       }
    }
-   
+
    protected void addPayload(ClientMessage msg)
    {
    }
@@ -278,7 +284,7 @@
                   message.getBodyBuffer().writeString("message" + i);
 
                   message.putIntProperty("counter", i);
-                  
+
                   addPayload(message);
 
                   producer.send(message);
@@ -408,7 +414,7 @@
                      message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("id:" + i +
                                                                                                     ",exec:" +
                                                                                                     executionId));
-                     
+
                      addPayload(message);
 
 
@@ -439,13 +445,13 @@
             }
             while (retry);
 
-            
-            
+
+
             boolean blocked = false;
 
             retry = false;
-            
-            ClientConsumer consumer = null; 
+
+            ClientConsumer consumer = null;
             do
             {
                ArrayList<Integer> msgs = new ArrayList<Integer>();
@@ -473,7 +479,7 @@
                   }
 
                   session.commit();
-                  
+
                   if (blocked)
                   {
                      assertTrue("msgs.size is expected to be 0 or "  + numMessages + " but it was " + msgs.size(), msgs.size() == 0 || msgs.size() == numMessages);
@@ -535,13 +541,13 @@
    @Override
    protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
    {
-      return getInVMTransportAcceptorConfiguration(live);
+      return TransportConfigurationUtils.getInVMAcceptor(live);
    }
 
    @Override
    protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
    {
-      return getInVMConnectorTransportConfiguration(live);
+      return TransportConfigurationUtils.getInVMConnector(live);
    }
 
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -13,23 +13,29 @@
 
 package org.hornetq.tests.integration.cluster.failover;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import junit.framework.Assert;
+
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.server.impl.InVMNodeManager;
 import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.utils.ReusableLatch;
+import org.hornetq.tests.util.TransportConfigurationUtils;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
  *         Date: Dec 21, 2010
@@ -193,6 +199,7 @@
       Assert.assertEquals(0, sf.numConnections());
    }
 
+   @Override
    protected void createConfigs() throws Exception
    {
       nodeManager = new InVMNodeManager();
@@ -234,13 +241,13 @@
    @Override
    protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
    {
-      return getInVMTransportAcceptorConfiguration(live);
+      return TransportConfigurationUtils.getInVMAcceptor(live);
    }
 
    @Override
    protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
    {
-      return getInVMConnectorTransportConfiguration(live);
+      return TransportConfigurationUtils.getInVMConnector(live);
    }
 
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java	2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -13,23 +13,30 @@
 
 package org.hornetq.tests.integration.cluster.failover;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import junit.framework.Assert;
+
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.server.impl.InVMNodeManager;
 import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
  *         Date: Dec 21, 2010
@@ -133,6 +140,7 @@
    }
 
 
+   @Override
    protected void createConfigs() throws Exception
    {
       nodeManager = new InVMNodeManager();
@@ -176,13 +184,13 @@
    @Override
    protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
    {
-      return getInVMTransportAcceptorConfiguration(live);
+      return TransportConfigurationUtils.getInVMAcceptor(live);
    }
 
    @Override
    protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
    {
-      return getInVMConnectorTransportConfiguration(live);
+      return TransportConfigurationUtils.getInVMConnector(live);
    }
 
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java	2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -28,6 +28,7 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
 
 /**
  * A FailoverOnFlowControlTest
@@ -49,8 +50,8 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-   
-   
+
+
    public void testOverflowSend() throws Exception
    {
       ServerLocator locator = getServerLocator();
@@ -65,11 +66,11 @@
          public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
          {
             System.out.println("Intercept..." + packet.getClass().getName());
-            
+
             if (packet instanceof SessionProducerCreditsMessage )
             {
                SessionProducerCreditsMessage credit = (SessionProducerCreditsMessage)packet;
-               
+
                System.out.println("Credits: " + credit.getCredits());
                if (count.incrementAndGet() == 2)
                {
@@ -87,7 +88,7 @@
             return true;
          }
       };
-      
+
       locator.addInterceptor(interceptorClient);
 
       ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
@@ -97,27 +98,28 @@
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-      
 
+
       final int numMessages = 10;
 
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(true);
-         
+
          message.getBodyBuffer().writeBytes(new byte[5000]);
 
          message.putIntProperty("counter", i);
 
          producer.send(message);
       }
-      
+
       session.close();
-      
+
       locator.close();
    }
 
 
+   @Override
    protected void createConfigs() throws Exception
    {
       super.createConfigs();
@@ -125,6 +127,7 @@
       backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
    }
 
+   @Override
    protected ServerLocatorInternal getServerLocator() throws Exception
    {
       ServerLocatorInternal locator = super.getServerLocator();
@@ -140,13 +143,13 @@
    @Override
    protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
    {
-      return getInVMTransportAcceptorConfiguration(live);
+      return TransportConfigurationUtils.getInVMAcceptor(live);
    }
 
    @Override
    protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
    {
-      return getInVMConnectorTransportConfiguration(live);
+      return TransportConfigurationUtils.getInVMConnector(live);
    }
 
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -46,6 +46,7 @@
 import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
 import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.TransportConfigurationUtils;
 
 /**
  *
@@ -1875,13 +1876,13 @@
    @Override
    protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
    {
-      return getInVMTransportAcceptorConfiguration(live);
+      return TransportConfigurationUtils.getInVMAcceptor(live);
    }
 
    @Override
    protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
    {
-      return getInVMConnectorTransportConfiguration(live);
+      return TransportConfigurationUtils.getInVMConnector(live);
    }
 
    /**

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -35,19 +35,16 @@
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnector;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.server.NodeManager;
 import org.hornetq.core.server.impl.InVMNodeManager;
 import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.ReplicatedBackupUtils;
 import org.hornetq.tests.util.ServiceTestBase;
 
 /**
@@ -60,8 +57,8 @@
    // Constants -----------------------------------------------------
 
    protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-   private static final String LIVE_NODE_NAME = "hqLIVE";
 
+
    // Attributes ----------------------------------------------------
 
    protected TestableServer liveServer;
@@ -121,19 +118,6 @@
       return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager));
    }
 
-   private ClusterConnectionConfiguration createClusterConnectionConf(String name, String... connectors)
-   {
-      List<String> conn = new ArrayList<String>(connectors.length);
-      for (String iConn : connectors)
-      {
-         conn.add(iConn);
-      }
-      return new ClusterConnectionConfiguration("cluster1", "jms", name, -1, false, false, 1, 1, conn, false);
-   }
-
-   /**
-    * @throws Exception
-    */
    protected void createConfigs() throws Exception
    {
       nodeManager = new InVMNodeManager();
@@ -149,8 +133,8 @@
       TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
       backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
       backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
-      backupConfig.getClusterConfigurations().add(createClusterConnectionConf(backupConnector.getName(),
-                                                                              liveConnector.getName()));
+      ReplicatedBackupUtils.createClusterConnectionConf(backupConfig, backupConnector.getName(),
+                                                        liveConnector.getName());
       backupServer = createBackupServer();
 
       liveConfig = super.createDefaultConfig();
@@ -159,7 +143,7 @@
       liveConfig.setSecurityEnabled(false);
       liveConfig.setSharedStore(true);
       liveConfig.setClustered(true);
-      liveConfig.getClusterConfigurations().add(createClusterConnectionConf(liveConnector.getName()));
+      ReplicatedBackupUtils.createClusterConnectionConf(liveConfig, liveConnector.getName());
       liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
       liveServer = createLiveServer();
    }
@@ -172,42 +156,26 @@
       nodeManager = new InVMNodeManager();
 
       backupConfig = super.createDefaultConfig();
+      liveConfig = super.createDefaultConfig();
+      TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false);
+      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig,
+                                                     liveConnector);
+
       backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + "_backup");
       backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + "_backup");
       backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + "_backup");
       backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() + "_backup");
-      backupConfig.getAcceptorConfigurations().clear();
-      backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
-
-      backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
-      backupConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
-      backupConfig.getClusterConfigurations().add(createClusterConnectionConf(backupConnector.getName(),
-                                                                              backupConnector.getName()));
-
       backupConfig.setSecurityEnabled(false);
-      backupConfig.setSharedStore(false);
-      backupConfig.setBackup(true);
-      backupConfig.setLiveConnectorName(LIVE_NODE_NAME);
-      backupConfig.setClustered(true);
 
       backupServer = createBackupServer();
       backupServer.getServer().setIdentity("idBackup");
 
-      liveConfig = super.createDefaultConfig();
+
       liveConfig.getAcceptorConfigurations().clear();
       liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
 
-      liveConfig.setName(LIVE_NODE_NAME);
-      liveConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
-      liveConfig.setSecurityEnabled(false);
-      liveConfig.setSharedStore(false);
-      liveConfig.setClustered(true);
-      liveConfig.getClusterConfigurations().add(createClusterConnectionConf(LIVE_NODE_NAME, LIVE_NODE_NAME));
       liveServer = createLiveServer();
       liveServer.getServer().setIdentity("idLive");
-
-      //liveServer.start();
-      //backupServer.start();
    }
 
    @Override
@@ -261,8 +229,7 @@
       return sf;
    }
 
-   protected void waitForBackup(ClientSessionFactoryInternal sf, long seconds)
-         throws Exception
+   protected static void waitForBackup(ClientSessionFactoryInternal sf, long seconds) throws Exception
    {
       long time = System.currentTimeMillis();
       long toWait = seconds * 1000;
@@ -288,38 +255,6 @@
       System.out.println("sf.getBackupConnector() = " + sf.getBackupConnector());
    }
 
-   protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
-   {
-      if (live)
-      {
-         return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName());
-      }
-      else
-      {
-         Map<String, Object> server1Params = new HashMap<String, Object>();
-
-         server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-         return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), server1Params);
-      }
-   }
-
-   protected TransportConfiguration getInVMTransportAcceptorConfiguration(final boolean live)
-   {
-      if (live)
-      {
-         return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
-      }
-      else
-      {
-         Map<String, Object> server1Params = new HashMap<String, Object>();
-
-         server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-         return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), server1Params);
-      }
-   }
-
    protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
    {
       if (live)

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -14,12 +14,9 @@
 package org.hornetq.tests.integration.cluster.failover;
 
 import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
@@ -28,17 +25,16 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
 
 /**
  * A PagingFailoverTest
- * 
+ *
  * TODO: validate replication failover also
  *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -135,10 +131,10 @@
          {
             crash(session);
          }
-         
-         
+
+
          session.close();
-         
+
          session = sf.createSession(!transacted, !transacted, 0);
 
          session.start();
@@ -161,9 +157,9 @@
          }
 
          session.commit();
-         
+
          cons.close();
-         
+
          Thread.sleep(1000);
 
          if (!failBeforeConsume)
@@ -173,7 +169,7 @@
          }
 
          session.close();
-         
+
          session = sf.createSession(true, true, 0);
 
          cons = session.createConsumer(PagingFailoverTest.ADDRESS);
@@ -202,23 +198,6 @@
       }
    }
 
-   /**
-    * @param session
-    * @param latch
-    * @throws InterruptedException
-    */
-   private void failSession(final ClientSession session, final CountDownLatch latch) throws InterruptedException
-   {
-      RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
-
-      // Simulate failure on connection
-      conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
-      // Wait to be informed of failure
-
-      Assert.assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -226,13 +205,13 @@
    @Override
    protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
    {
-      return getInVMTransportAcceptorConfiguration(live);
+      return TransportConfigurationUtils.getInVMAcceptor(live);
    }
 
    @Override
    protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
    {
-      return getInVMConnectorTransportConfiguration(live);
+      return TransportConfigurationUtils.getInVMConnector(live);
    }
 
    @Override

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -14,6 +14,7 @@
 package org.hornetq.tests.integration.replication;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -56,15 +57,19 @@
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.replication.impl.ReplicatedJournal;
 import org.hornetq.core.replication.impl.ReplicationManagerImpl;
+import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.ReplicatedBackupUtils;
 import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.TransportConfigurationUtils;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.OrderedExecutorFactory;
@@ -76,23 +81,18 @@
 public class ReplicationTest extends ServiceTestBase
 {
 
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
    private ThreadFactory tFactory;
-
    private ExecutorService executor;
-
    private ExecutorFactory factory;
-
    private ScheduledExecutorService scheduledExecutor;
 
-   private HornetQServerImpl server;
+   private HornetQServerImpl backupServer;
+   /** This field is not always used. */
+   private HornetQServerImpl liveServer;
 
    private ServerLocator locator;
 
-   private ReplicationManagerImpl manager;
+   private ReplicationManager manager;
 
    // Static --------------------------------------------------------
 
@@ -100,32 +100,57 @@
 
    // Public --------------------------------------------------------
 
-   public void testBasicConnection() throws Exception
+   private void setupServer(boolean backup, boolean netty, String... interceptors) throws Exception
    {
-      boolean backup = true;
-      boolean netty = false;
-      setupServer(backup, netty);
-      manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-      manager.start();
-      manager.stop();
+      assert backup; // XXX
+
+      Configuration backupConfig = createDefaultConfig(netty);
+      Configuration liveConfig = createDefaultConfig(netty);
+      backupConfig.setBackup(backup);
+      if (interceptors.length > 0)
+      {
+         List<String> interceptorsList = Arrays.asList(interceptors);
+         backupConfig.setInterceptorClassNames(interceptorsList);
+      }
+
+      TransportConfiguration liveConnector = TransportConfigurationUtils.getInVMConnector(true);
+      TransportConfiguration backupConnector = TransportConfigurationUtils.getInVMConnector(false);
+      TransportConfiguration backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
+
+      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig,
+                                                     liveConnector);
+      if (backup)
+      {
+         liveServer = new HornetQServerImpl(liveConfig);
+         liveServer.start();
+         waitForComponent(liveServer);
+      }
+
+      backupServer = new HornetQServerImpl(backupConfig);
+      locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      backupServer.start();
+      Thread.sleep(200); // XXX improve this
+      waitForComponent(backupServer);
    }
 
-   private void setupServer(boolean backup, boolean netty) throws Exception
+   private static void waitForComponent(HornetQComponent component) throws Exception
    {
-      Configuration config = createDefaultConfig(netty);
-      config.setBackup(backup);
-      server = new HornetQServerImpl(config);
-      locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      server.start();
+      waitForComponent(component, 3);
    }
 
+   public void testBasicConnection() throws Exception
+   {
+      setupServer(true, false);
+      waitForComponent(liveServer.getReplicationManager());
+   }
+
    public void testInvalidJournal() throws Exception
    {
 
       setupServer(true, false);
+      manager = liveServer.getReplicationManager();
+      waitForComponent(manager);
 
-      manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-      manager.start();
       try
       {
          manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(2, 2),
@@ -134,7 +159,8 @@
       }
       catch (HornetQException e)
       {
-         e.printStackTrace();
+         if (e.getCode() != HornetQException.ILLEGAL_STATE)
+            e.printStackTrace();
          Assert.assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
       }
 
@@ -148,10 +174,8 @@
 
       setupServer(true, false);
 
-      manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-
-      manager.start();
-
+      manager = liveServer.getReplicationManager();
+      waitForComponent(manager);
       try
       {
          ReplicationManagerImpl manager2 =
@@ -162,17 +186,19 @@
       }
       catch (Exception e)
       {
+         // expected
       }
+
    }
 
    public void testConnectIntoNonBackup() throws Exception
    {
       setupServer(false, false);
 
-      manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-
       try
       {
+
+         manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
          manager.start();
          Assert.fail("Exception was expected");
       }
@@ -188,8 +214,8 @@
 
       StorageManager storage = getStorage();
 
-      manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-      manager.start();
+      manager = liveServer.getReplicationManager();
+      waitForComponent(manager);
 
       Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
 
@@ -226,8 +252,8 @@
       blockOnReplication(storage, manager);
 
       PagingManager pagingManager =
-               createPageManager(server.getStorageManager(), server.getConfiguration(), server.getExecutorFactory(),
-                                 server.getAddressSettingsRepository());
+               createPageManager(backupServer.getStorageManager(), backupServer.getConfiguration(), backupServer.getExecutorFactory(),
+                                 backupServer.getAddressSettingsRepository());
 
       PagingStore store = pagingManager.getPageStore(dummy);
       store.start();
@@ -266,25 +292,10 @@
    public void testSendPacketsWithFailure() throws Exception
    {
 
-      Configuration config = createDefaultConfig(false);
+      setupServer(true, false, TestInterceptor.class.getName());
 
-      config.setBackup(true);
-
-      ArrayList<String> intercepts = new ArrayList<String>();
-
-      intercepts.add(TestInterceptor.class.getName());
-
-      config.setInterceptorClassNames(intercepts);
-
-      server = new HornetQServerImpl(config);
-
-      server.start();
-
-      locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-
       StorageManager storage = getStorage();
-      manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-      manager.start();
+      manager = liveServer.getReplicationManager();
 
       Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
 
@@ -309,7 +320,7 @@
          }
       });
 
-      server.stop();
+      backupServer.stop();
 
       Assert.assertTrue(latch.await(50, TimeUnit.SECONDS));
 
@@ -408,7 +419,7 @@
     * @param manager
     * @return
     */
-   private void blockOnReplication(final StorageManager storage, final ReplicationManagerImpl manager) throws Exception
+   private void blockOnReplication(final StorageManager storage, final ReplicationManager manager) throws Exception
    {
       final CountDownLatch latch = new CountDownLatch(1);
       storage.afterCompleteOperations(new IOAsyncTask()
@@ -427,29 +438,29 @@
       Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
    }
 
-   public void testNoServer() throws Exception
-   {
-      locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+//   public void testNoServer() throws Exception
+//   {
+//      locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+//
+//      try
+//      {
+//         manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+//         manager.start();
+//         Assert.fail("Exception expected");
+//      }
+//      catch (HornetQException expected)
+//      {
+//         Assert.assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
+//      }
+//   }
 
-      try
-      {
-         manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-         manager.start();
-         Assert.fail("Exception expected");
-      }
-      catch (HornetQException expected)
-      {
-         Assert.assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
-      }
-   }
-
    public void testNoActions() throws Exception
    {
 
       setupServer(true, false);
       StorageManager storage = getStorage();
-      manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-      manager.start();
+      manager = liveServer.getReplicationManager();
+      waitForComponent(manager);
 
       Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
 
@@ -482,9 +493,7 @@
       final ArrayList<Integer> executions = new ArrayList<Integer>();
 
       StorageManager storage = getStorage();
-      manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-      manager.start();
-
+      manager = liveServer.getReplicationManager();
       Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
 
       int numberOfAdds = 200;
@@ -568,19 +577,13 @@
    @Override
    protected void tearDown() throws Exception
    {
-      if (manager != null)
-      {
-         if (manager.isStarted())
-            manager.stop();
-         manager = null;
-      }
 
-      if (server != null)
-      {
-         if (server.isStarted())
-            server.stop();
-         server = null;
-      }
+      stopComponent(manager);
+      manager = null;
+      stopComponent(liveServer);
+      liveServer = null;
+      stopComponent(backupServer);
+      backupServer = null;
 
       executor.shutdown();
 
@@ -593,6 +596,7 @@
 
    }
 
+
    protected
             PagingManager
             createPageManager(final StorageManager storageManager,
@@ -613,7 +617,7 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
-   public static class TestInterceptor implements Interceptor
+   public static final class TestInterceptor implements Interceptor
    {
       static AtomicBoolean value = new AtomicBoolean(true);
 

Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -0,0 +1,71 @@
+/**
+ *
+ */
+package org.hornetq.tests.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+
+public final class ReplicatedBackupUtils
+{
+   private static final String LIVE_NODE_NAME = "hqLIVE";
+   private ReplicatedBackupUtils()
+   {
+      // Utility class
+   }
+
+   /**
+    * Creates a {@link ClusterConnectionConfiguration} and adds it to the {@link Configuration}.
+    * @param configuration
+    * @param name
+    * @param connectors
+    */
+   public static void createClusterConnectionConf(Configuration configuration, String name, String... connectors)
+   {
+
+      List<String> conn = new ArrayList<String>(connectors.length);
+      for (String iConn : connectors)
+      {
+         conn.add(iConn);
+      }
+      ClusterConnectionConfiguration clusterConfig =
+               new ClusterConnectionConfiguration("cluster1", "jms", name, -1, false, false, 1, 1, conn, false);
+      configuration.getClusterConfigurations().add(clusterConfig);
+   }
+
+   public static void configureReplicationPair(Configuration backupConfig,
+                                               TransportConfiguration backupConnector,
+                                               TransportConfiguration backupAcceptor,
+                                               Configuration liveConfig,
+                                               TransportConfiguration liveConnector)
+   {
+      if (backupAcceptor != null)
+      {
+         Set<TransportConfiguration> backupAcceptorSet = backupConfig.getAcceptorConfigurations();
+         backupAcceptorSet.clear();
+         backupAcceptorSet.add(backupAcceptor);
+      }
+
+      backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+      backupConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
+      ReplicatedBackupUtils.createClusterConnectionConf(backupConfig, backupConnector.getName(),
+                                                        backupConnector.getName());
+
+      backupConfig.setSharedStore(false);
+      backupConfig.setBackup(true);
+      backupConfig.setLiveConnectorName(LIVE_NODE_NAME);
+      backupConfig.setClustered(true);
+
+      liveConfig.setName(LIVE_NODE_NAME);
+      liveConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
+      liveConfig.setSecurityEnabled(false);
+      liveConfig.setSharedStore(false);
+      liveConfig.setClustered(true);
+      ReplicatedBackupUtils.createClusterConnectionConf(liveConfig, LIVE_NODE_NAME, LIVE_NODE_NAME);
+   }
+}

Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java	2011-07-13 15:35:14 UTC (rev 10980)
@@ -0,0 +1,38 @@
+package org.hornetq.tests.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
+
+public final class TransportConfigurationUtils
+{
+
+   public static TransportConfiguration getInVMAcceptor(final boolean live)
+   {
+      if (live)
+      {
+         return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
+      }
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+      server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), server1Params);
+   }
+
+   public static TransportConfiguration getInVMConnector(final boolean live)
+   {
+      if (live)
+      {
+         return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName());
+      }
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+      server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), server1Params);
+   }
+
+}



More information about the hornetq-commits mailing list