[jboss-cvs] JBoss Messaging SVN: r5654 - branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Jan 18 18:20:22 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-18 18:20:21 -0500 (Sun, 18 Jan 2009)
New Revision: 5654

Added:
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Modified:
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
Log:
Refactoring on MultiThreadRandomFailoverTest, to allow reusing it on Paging and Chunks

Added: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java	                        (rev 0)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java	2009-01-18 23:20:21 UTC (rev 5654)
@@ -0,0 +1,148 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+
+/**
+ * A LargeMessageMuliThreadFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Jan 18, 2009 4:52:09 PM
+ *
+ *
+ */
+public class LargeMessageMuliThreadFailoverTest extends MultiThreadRandomFailoverTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   
+   protected ClientSessionFactoryInternal createSessionFactory()
+   {
+      ClientSessionFactoryInternal sf = super.createSessionFactory();
+      
+      sf.setMinLargeMessageSize(1024);
+      
+      return sf;
+
+   }
+
+
+   @Override
+   protected void start() throws Exception
+   {
+
+      deleteDirectory(new File(getTestDir()));
+
+      Configuration backupConf = new ConfigurationImpl();
+
+      backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+      backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+      backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+      backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+      backupConf.setJournalFileSize(100 * 1024);
+
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+      backupConf.setBackup(true);
+
+      backupService = MessagingServiceImpl.newMessagingService(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+
+      liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+      liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+      liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+      liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+      liveConf.setJournalFileSize(100 * 1024);
+
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveService = MessagingServiceImpl.newMessagingService(liveConf);
+
+      liveService.start();
+
+   }
+
+   @Override
+   protected void setBody(final ClientMessage message) throws Exception
+   {
+      message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(4 * 1024)));
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+    */
+   @Override
+   protected void checkSize(ClientMessage message)
+   {
+      assertEquals(4 * 1024, message.getBodySize());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java	2009-01-17 00:18:22 UTC (rev 5653)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java	2009-01-18 23:20:21 UTC (rev 5654)
@@ -12,1503 +12,31 @@
 
 package org.jboss.messaging.tests.integration.cluster.failover;
 
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
-import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.jms.client.JBossBytesMessage;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * A MultiThreadRandomFailoverTest
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
-public class MultiThreadRandomFailoverTest extends UnitTestCase
+public class MultiThreadRandomFailoverTest extends MultiThreadRandomFailoverTestBase
 {
-   /**
-    * 
-    */
-   private static final int DEFAULT_MESSAGE_SIZE = 1024;
-
-   private static final Logger log = Logger.getLogger(MultiThreadRandomFailoverTest.class);
-
-   // Constants -----------------------------------------------------
-
-   private static final int RECEIVE_TIMEOUT = 30000;
-
-   private static final int NUM_THREADS = 10;
-
-   // Attributes ----------------------------------------------------
-   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-
-   private MessagingService liveService;
-
-   private MessagingService backupService;
-
-   private final Map<String, Object> backupParams = new HashMap<String, Object>();
-
-   private Timer timer;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testA() throws Exception
+   @Override
+   protected void start() throws Exception
    {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestA(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testB() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestB(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testC() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestC(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testD() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestD(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testE() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestE(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testF() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestF(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testG() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestG(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testH() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestH(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testI() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestI(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testJ() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestJ(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testK() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestK(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testL() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestL(sf);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   // public void testM() throws Exception
-   // {
-   // runTestMultipleThreads(new RunnableT()
-   // {
-   // public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-   // {
-   // doTestM(sf, threadNum);
-   // }
-   // }, NUM_THREADS);
-   // }
-
-   public void testN() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestN(sf, threadNum);
-         }
-      }, NUM_THREADS, false);
-   }
-
-   public void testLargeMessage() throws Exception
-   {
-      runTestMultipleThreads(new RunnableT()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestLargeMessage(sf, threadNum);
-         }
-      }, NUM_THREADS, true);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         sessConsume.start();
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      Set<MyHandler> handlers = new HashSet<MyHandler>();
-
-      for (ClientConsumer consumer : consumers)
-      {
-         MyHandler handler = new MyHandler(threadNum, numMessages);
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers)
-      {
-         boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
-
-         if (!ok)
-         {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
-                                " threadnum " +
-                                threadNum);
-         }
-
-         if (handler.failure != null)
-         {
-            throw new Exception("Handler failed: " + handler.failure);
-         }
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-   }
-
-   protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      for (ClientSession session : sessions)
-      {
-         session.start();
-      }
-
-      Set<MyHandler> handlers = new HashSet<MyHandler>();
-
-      for (ClientConsumer consumer : consumers)
-      {
-         MyHandler handler = new MyHandler(threadNum, numMessages);
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers)
-      {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         if (!ok)
-         {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
-                                " threadnum " +
-                                threadNum);
-         }
-
-         if (handler.failure != null)
-         {
-            throw new Exception("Handler failed: " + handler.failure);
-         }
-      }
-
-      sessSend.close();
-
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-
-   }
-
-   protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, false, false);
-
-         sessConsume.start();
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, false, false);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      sessSend.rollback();
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      sessSend.commit();
-
-      Set<MyHandler> handlers = new HashSet<MyHandler>();
-
-      for (ClientConsumer consumer : consumers)
-      {
-         MyHandler handler = new MyHandler(threadNum, numMessages);
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers)
-      {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         if (!ok)
-         {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
-                                " threadnum " +
-                                threadNum);
-         }
-
-         if (handler.failure != null)
-         {
-            throw new Exception("Handler failed: " + handler.failure);
-         }
-      }
-
-      for (ClientSession session : sessions)
-      {
-         session.rollback();
-      }
-
-      for (MyHandler handler : handlers)
-      {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         assertTrue(ok);
-      }
-
-      for (ClientSession session : sessions)
-      {
-         session.commit();
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-   }
-
-   protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + " sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, false, false);
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, false, false);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      sessSend.rollback();
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      sessSend.commit();
-
-      for (ClientSession session : sessions)
-      {
-         session.start();
-      }
-
-      Set<MyHandler> handlers = new HashSet<MyHandler>();
-
-      for (ClientConsumer consumer : consumers)
-      {
-         MyHandler handler = new MyHandler(threadNum, numMessages);
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers)
-      {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         if (!ok)
-         {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
-                                " threadnum " +
-                                threadNum);
-         }
-
-         if (handler.failure != null)
-         {
-            throw new Exception("Handler failed: " + handler.failure);
-         }
-      }
-
-      handlers.clear();
-
-      // Set handlers to null
-      for (ClientConsumer consumer : consumers)
-      {
-         consumer.setMessageHandler(null);
-      }
-
-      for (ClientSession session : sessions)
-      {
-         session.rollback();
-      }
-
-      // New handlers
-      for (ClientConsumer consumer : consumers)
-      {
-         MyHandler handler = new MyHandler(threadNum, numMessages);
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers)
-      {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         if (!ok)
-         {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
-                                " threadnum " +
-                                threadNum);
-         }
-
-         if (handler.failure != null)
-         {
-            throw new Exception("Handler failed on rollback: " + handler.failure);
-         }
-      }
-
-      for (ClientSession session : sessions)
-      {
-         session.commit();
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + " sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-   }
-
-   // Now with synchronous receive()
-
-   protected void doTestE(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         sessConsume.start();
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      consumeMessages(consumers, numMessages, threadNum);
-
-      sessSend.close();
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-   }
-
-   protected void doTestF(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      for (ClientSession session : sessions)
-      {
-         session.start();
-      }
-
-      consumeMessages(consumers, numMessages, threadNum);
-
-      sessSend.close();
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-   }
-
-   protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, false, false);
-
-         sessConsume.start();
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, false, false);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      sessSend.rollback();
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      sessSend.commit();
-
-      consumeMessages(consumers, numMessages, threadNum);
-
-      for (ClientSession session : sessions)
-      {
-         session.rollback();
-      }
-
-      consumeMessages(consumers, numMessages, threadNum);
-
-      for (ClientSession session : sessions)
-      {
-         session.commit();
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-   }
-
-   protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, false, false);
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, false, false);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      sessSend.rollback();
-
-      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
-
-      sessSend.commit();
-
-      for (ClientSession session : sessions)
-      {
-         session.start();
-      }
-
-      consumeMessages(consumers, numMessages, threadNum);
-
-      for (ClientSession session : sessions)
-      {
-         session.rollback();
-      }
-
-      consumeMessages(consumers, numMessages, threadNum);
-
-      for (ClientSession session : sessions)
-      {
-         session.commit();
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-   }
-
-   protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      ClientSession sessCreate = sf.createSession(false, true, true);
-
-      sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
-
-      ClientSession sess = sf.createSession(false, true, true);
-
-      sess.start();
-
-      ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
-
-      ClientProducer producer = sess.createProducer(ADDRESS);
-
-      ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
-                                                       false,
-                                                       0,
-                                                       System.currentTimeMillis(),
-                                                       (byte)1);
-      message.getBody().flip();
-
-      producer.send(message);
-
-      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
-
-      assertNotNull(message2);
-
-      message2.acknowledge();
-
-      sess.close();
-
-      sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
-
-      sessCreate.close();
-   }
-
-   protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      ClientSession sessCreate = sf.createSession(false, true, true);
-
-      sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
-
-      ClientSession sess = sf.createSession(false, true, true);
-
-      sess.start();
-
-      ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
-
-      ClientProducer producer = sess.createProducer(ADDRESS);
-
-      ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
-                                                       false,
-                                                       0,
-                                                       System.currentTimeMillis(),
-                                                       (byte)1);
-      message.getBody().flip();
-
-      producer.send(message);
-
-      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
-
-      assertNotNull(message2);
-
-      message2.acknowledge();
-
-      sess.close();
-
-      sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
-
-      sessCreate.close();
-   }
-
-   protected void doTestK(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      ClientSession s = sf.createSession(false, false, false);
-
-      s.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
-
-      final int numConsumers = 100;
-
-      for (int i = 0; i < numConsumers; i++)
-      {
-         ClientConsumer consumer = s.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
-
-         consumer.close();
-      }
-
-      s.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
-
-      s.close();
-   }
-
-   protected void doTestL(final ClientSessionFactory sf) throws Exception
-   {
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numSessions = 100;
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         ClientSession session = sf.createSession(false, false, false);
-
-         session.close();
-      }
-
-      s.close();
-   }
-
-   // Browsers
-   // FIXME - this test won't work until we use a proper iterator for browsing a queue.
-   // Making a copy of the queue for a browser consumer doesn't work well with replication since
-   // When replicating the create consumer (browser) to the backup, when executed on the backup the
-   // backup may have different messages in its queue since been added on different threads.
-   // So when replicating deliveries they may not be found.
-   // https://jira.jboss.org/jira/browse/JBMESSAGING-1433
-   // protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
-   // {
-   // long start = System.currentTimeMillis();
-   //
-   // ClientSession sessSend = sf.createSession(false, true, true, false);
-   //      
-   // ClientSession sessConsume = sf.createSession(false, true, true, false);
-   //      
-   // sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
-   //
-   // final int numMessages = 100;
-   //
-   // ClientProducer producer = sessSend.createProducer(ADDRESS);
-   //
-   // sendMessages(sessSend, producer, numMessages, threadNum);
-   //      
-   // ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
-   // null, false, true);
-   //      
-   // Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
-   //      
-   // for (int i = 0; i < numMessages; i++)
-   // {
-   // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
-   //
-   // assertNotNull(msg);
-   //
-   // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-   // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-   //
-   // Integer c = consumerCounts.get(tn);
-   // if (c == null)
-   // {
-   // c = new Integer(cnt);
-   // }
-   //
-   // if (cnt != c.intValue())
-   // {
-   // throw new Exception("Invalid count, expected " + c + " got " + cnt);
-   // }
-   //         
-   // c++;
-   //         
-   // //Wrap
-   // if (c == numMessages)
-   // {
-   // c = 0;
-   // }
-   //         
-   // consumerCounts.put(tn, c);
-   //
-   // msg.acknowledge();
-   // }
-   //
-   // sessConsume.close();
-   //      
-   // sessConsume = sf.createSession(false, true, true, false);
-   //      
-   // browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
-   // null, false, true);
-   //      
-   // //Messages should still be there
-   //      
-   // consumerCounts.clear();
-   //      
-   // for (int i = 0; i < numMessages; i++)
-   // {
-   // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
-   //
-   // assertNotNull(msg);
-   //
-   // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-   // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-   //
-   // Integer c = consumerCounts.get(tn);
-   // if (c == null)
-   // {
-   // c = new Integer(cnt);
-   // }
-   //
-   // if (cnt != c.intValue())
-   // {
-   // throw new Exception("Invalid count, expected " + c + " got " + cnt);
-   // }
-   //         
-   // c++;
-   //         
-   // //Wrap
-   // if (c == numMessages)
-   // {
-   // c = 0;
-   // }
-   //         
-   // consumerCounts.put(tn, c);
-   //
-   // msg.acknowledge();
-   // }
-   //      
-   // sessConsume.close();
-   //      
-   // sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
-   //      
-   // sessSend.close();
-   //
-   // long end = System.currentTimeMillis();
-   //
-   // log.info("duration " + (end - start));
-   // }
-
-   protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      ClientSession sessCreate = sf.createSession(false, true, true);
-
-      sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
-
-      ClientSession sess = sf.createSession(false, true, true);
-
-      sess.stop();
-
-      sess.start();
-
-      sess.stop();
-
-      ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
-
-      ClientProducer producer = sess.createProducer(ADDRESS);
-
-      ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
-                                                       false,
-                                                       0,
-                                                       System.currentTimeMillis(),
-                                                       (byte)1);
-      message.getBody().flip();
-
-      producer.send(message);
-
-      sess.start();
-
-      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
-
-      assertNotNull(message2);
-
-      message2.acknowledge();
-
-      sess.stop();
-
-      sess.start();
-
-      sess.close();
-
-      sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
-
-      sessCreate.close();
-   }
-
-   protected void doTestLargeMessage(final ClientSessionFactory sf, final int threadNum) throws Exception
-   {
-      long start = System.currentTimeMillis();
-
-      sf.setMinLargeMessageSize(1024);
-      
-      sf.setSendWindowSize(1024*1024);
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int messageSize = 4 * 1024;
-      
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
-      Set<ClientSession> sessions = new HashSet<ClientSession>();
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         sessConsume.start();
-
-         sessConsume.createQueue(ADDRESS, subName, null, false, false);
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      sendMessages(sessSend, producer, numMessages, messageSize, threadNum);
-
-      Set<MyHandler> handlers = new HashSet<MyHandler>();
-
-      for (ClientConsumer consumer : consumers)
-      {
-         MyHandler handler = new MyHandler(threadNum, numMessages, messageSize);
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers)
-      {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         if (!ok)
-         {
-            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
-                                " threadnum " +
-                                threadNum);
-         }
-
-         if (handler.failure != null)
-         {
-            throw new Exception("Handler failed: " + handler.failure);
-         }
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions)
-      {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++)
-      {
-         SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      log.info("duration " + (end - start));
-   }
-
-   protected int getNumIterations()
-   {
-      return 20;
-   }
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-
-      log.info("************ Starting test " + this.getName());
-
-      timer = new Timer();
-   }
-
-   protected void tearDown() throws Exception
-   {
-      log.info("************* Ending test " + this.getName());
-
-      if (liveService != null && liveService.isStarted())
-      {
-         liveService.stop();
-      }
-      if (backupService != null && backupService.isStarted())
-      {
-         backupService.stop();
-      }
-      timer.cancel();
-
-      super.tearDown();
-   }
-
-   // Private -------------------------------------------------------
-
-   private void runTestMultipleThreads(final RunnableT runnable, final int numThreads, final boolean fileBased) throws Exception
-   {
-      final int numIts = getNumIterations();
-
-      for (int its = 0; its < numIts; its++)
-      {
-         log.info("************ ITERATION: " + its);
-         if (fileBased)
-         {
-            startFileBased();
-         }
-         else
-         {
-            start();
-         }
-
-         final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
-                                                                              new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                                                         backupParams),
-                                                                              0,
-                                                                              1,
-                                                                              ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
-                                                                              ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
-
-         sf.setSendWindowSize(32 * 1024);
-
-         ClientSession session = sf.createSession(false, false, false);
-
-         Failer failer = startFailer(1000, session);
-
-         class Runner extends Thread
-         {
-            private volatile Throwable throwable;
-
-            private final RunnableT test;
-
-            private final int threadNum;
-
-            Runner(final RunnableT test, final int threadNum)
-            {
-               this.test = test;
-
-               this.threadNum = threadNum;
-            }
-
-            public void run()
-            {
-               try
-               {
-                  test.run(sf, threadNum);
-               }
-               catch (Throwable t)
-               {
-                  throwable = t;
-
-                  log.error("Failed to run test", t);
-               }
-            }
-         }
-
-         do
-         {
-            List<Runner> threads = new ArrayList<Runner>();
-
-            for (int i = 0; i < numThreads; i++)
-            {
-               Runner runner = new Runner(runnable, i);
-
-               threads.add(runner);
-
-               runner.start();
-            }
-
-            for (Runner thread : threads)
-            {
-               thread.join();
-
-               if (thread.throwable != null)
-               {
-                  throw new Exception ("Exception on thread " + thread, thread.throwable);
-               }
-            }
-
-            runnable.checkFail();
-         }
-         while (!failer.isExecuted());
-
-         InVMConnector.resetFailures();
-
-         session.close();
-
-         assertEquals(0, sf.numSessions());
-
-         assertEquals(0, sf.numConnections());
-
-         stop();
-      }
-   }
-
-   private Failer startFailer(final long time, final ClientSession session)
-   {
-      Failer failer = new Failer(session);
-
-      timer.schedule(failer, (long)(time * Math.random()), 100);
-
-      return failer;
-   }
-
-   private void startFileBased() throws Exception
-   {
-
-      deleteDirectory(new File(getTestDir()));
-
       Configuration backupConf = new ConfigurationImpl();
-
-      backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
-      backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
-      backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
-      backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
-      backupConf.setJournalFileSize(100 * 1024);
-
       backupConf.setSecurityEnabled(false);
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
       backupConf.getAcceptorConfigurations()
-                .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
-      backupConf.setBackup(true);
-
-      backupService = MessagingServiceImpl.newMessagingService(backupConf);
-      backupService.start();
-
-      Configuration liveConf = new ConfigurationImpl();
-
-      liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
-      liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
-      liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
-      liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
-
-      liveConf.setJournalFileSize(100 * 1024);
-
-      liveConf.setSecurityEnabled(false);
-      liveConf.getAcceptorConfigurations()
-              .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
-
-      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-
-      TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
-                                                                   backupParams,
-                                                                   "backup-connector");
-      connectors.put(backupTC.getName(), backupTC);
-      liveConf.setConnectorConfigurations(connectors);
-      liveConf.setBackupConnectorName(backupTC.getName());
-      liveService = MessagingServiceImpl.newMessagingService(liveConf);
-
-      liveService.start();
-
-   }
-
-   private void start() throws Exception
-   {
-      Configuration backupConf = new ConfigurationImpl();
-      backupConf.setSecurityEnabled(false);
-      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-      backupConf.getAcceptorConfigurations()
                 .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
                                                 backupParams));
       backupConf.setBackup(true);
@@ -1534,243 +62,22 @@
       liveService.start();
    }
 
-   private void stop() throws Exception
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#setBody(org.jboss.messaging.core.client.ClientMessage)
+    */
+   @Override
+   protected void setBody(ClientMessage message) throws Exception
    {
-      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
-
-      backupService.stop();
-
-      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
-
-      liveService.stop();
-
-      assertEquals(0, InVMRegistry.instance.size());
+      message.getBody().flip();
    }
 
-   private void sendMessages(final ClientSession sessSend,
-                             final ClientProducer producer,
-                             final int numMessages,
-                             final int sizeOfMessage,
-                             final int threadNum) throws Exception
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+    */
+   @Override
+   protected void checkSize(ClientMessage message)
    {
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = sessSend.createClientMessage(JBossBytesMessage.TYPE,
-                                                              false,
-                                                              0,
-                                                              System.currentTimeMillis(),
-                                                              (byte)1);
-         message.putIntProperty(new SimpleString("threadnum"), threadNum);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(sizeOfMessage)));
-         //message.getBody().flip();
-         producer.send(message);
-      }
+      assertEquals(0, message.getBody().limit());
    }
 
-   private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages, final int threadNum) throws Exception
-   {
-      // We make sure the messages arrive in the order they were sent from a particular producer
-      Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
-
-      for (int i = 0; i < numMessages; i++)
-      {
-         for (ClientConsumer consumer : consumers)
-         {
-            Map<Integer, Integer> consumerCounts = counts.get(consumer);
-
-            if (consumerCounts == null)
-            {
-               consumerCounts = new HashMap<Integer, Integer>();
-               counts.put(consumer, consumerCounts);
-            }
-
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
-
-            assertNotNull(msg);
-
-            int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-            int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-
-            // log.info("Got message " + tn + ":" + cnt);
-
-            Integer c = consumerCounts.get(tn);
-            if (c == null)
-            {
-               c = new Integer(cnt);
-            }
-
-            if (tn == threadNum && cnt != c.intValue())
-            {
-               throw new Exception("Invalid count, expected " + tn + ": " + c + " got " + cnt);
-            }
-
-            c++;
-
-            // Wrap
-            if (c == numMessages)
-            {
-               c = 0;
-            }
-
-            consumerCounts.put(tn, c);
-
-            msg.acknowledge();
-         }
-      }
-   }
-
-   // Inner classes -------------------------------------------------
-
-   private class Failer extends TimerTask
-   {
-      private final ClientSession session;
-
-      private boolean executed;
-
-      public Failer(final ClientSession session)
-      {
-         this.session = session;
-      }
-
-      public synchronized void run()
-      {
-         log.info("** Failing connection");
-
-         RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
-
-         InVMConnector.numberOfFailures = 1;
-         InVMConnector.failOnCreateConnection = true;
-         conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
-
-         log.info("** Fail complete");
-
-         cancel();
-
-         executed = true;
-      }
-
-      public synchronized boolean isExecuted()
-      {
-         return executed;
-      }
-   }
-
-   private abstract class RunnableT extends Thread
-   {
-      private volatile String failReason;
-
-      private volatile Throwable throwable;
-
-      public void setFailed(final String reason, final Throwable throwable)
-      {
-         this.failReason = reason;
-         this.throwable = throwable;
-      }
-
-      public void checkFail()
-      {
-         if (throwable != null)
-         {
-            log.error("Test failed: " + failReason, throwable);
-         }
-         if (failReason != null)
-         {
-            fail(failReason);
-         }
-      }
-
-      public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
-   }
-
-   private class MyHandler implements MessageHandler
-   {
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      private Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
-
-      volatile String failure;
-
-      final int tn;
-
-      final int numMessages;
-      
-      final int messageSize;
-
-      volatile boolean done;
-
-      MyHandler(final int threadNum, final int numMessages, final int messageSize)
-      {
-         this.tn = threadNum;
-
-         this.numMessages = numMessages;
-         
-         this.messageSize = messageSize;
-      }
-
-
-      MyHandler(final int threadNum, final int numMessages)
-      {
-         this(threadNum, numMessages, DEFAULT_MESSAGE_SIZE);
-      }
-
-      public void onMessage(ClientMessage message)
-      {
-         try
-         {
-            message.acknowledge();
-         }
-         catch (MessagingException me)
-         {
-            log.error("Failed to process", me);
-         }
-
-         if (done)
-         {
-            return;
-         }
-
-         int threadNum = (Integer)message.getProperty(new SimpleString("threadnum"));
-         int cnt = (Integer)message.getProperty(new SimpleString("count"));
-
-         Integer c = counts.get(threadNum);
-         if (c == null)
-         {
-            c = new Integer(cnt);
-         }
-
-         // log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
-
-         if (tn == threadNum && cnt != c.intValue())
-         {
-            failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
-            log.error(failure);
-
-            latch.countDown();
-         }
-         
-         if (message.getBody().limit() != messageSize)
-         {
-            failure = "Invalid Message size, expected " + messageSize + " but it was " + message.getBody().limit();
-            
-            latch.countDown();
-         }
-
-         if (tn == threadNum && c == numMessages - 1)
-         {
-            done = true;
-            latch.countDown();
-         }
-
-         c++;
-         // Wrap around at numMessages
-         if (c == numMessages)
-         {
-            c = 0;
-         }
-
-         counts.put(threadNum, c);
-
-      }
-   }
 }

Added: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	                        (rev 0)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-01-18 23:20:21 UTC (rev 5654)
@@ -0,0 +1,1591 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A MultiThreadRandomFailoverTestBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ *
+ */
+public abstract class MultiThreadRandomFailoverTestBase extends UnitTestCase
+{
+
+   private static final Logger log = Logger.getLogger(MultiThreadRandomFailoverTest.class);
+
+   // Constants -----------------------------------------------------
+
+   private static final int RECEIVE_TIMEOUT = 30000;
+
+   private static final int NUM_THREADS = 10;
+
+   // Attributes ----------------------------------------------------
+   protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   protected MessagingService liveService;
+
+   protected MessagingService backupService;
+
+   protected final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   protected Timer timer;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testA() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestA(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testB() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestB(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testC() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestC(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testD() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestD(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testE() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestE(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testF() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestF(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testG() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestG(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testH() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestH(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testI() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestI(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testJ() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestJ(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testK() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestK(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   public void testL() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestL(sf);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   // public void testM() throws Exception
+   // {
+   // runTestMultipleThreads(new RunnableT()
+   // {
+   // public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+   // {
+   // doTestM(sf, threadNum);
+   // }
+   // }, NUM_THREADS);
+   // }
+
+   public void testN() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestN(sf, threadNum);
+         }
+      }, NUM_THREADS, false);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected abstract void start() throws Exception;
+
+   protected abstract void setBody(ClientMessage message) throws Exception;
+   
+   protected abstract void checkSize(ClientMessage message);          
+
+
+   protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler(threadNum, numMessages);
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler(threadNum, numMessages);
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
+      }
+
+      sessSend.close();
+
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+   }
+
+   protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, false);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.rollback();
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.commit();
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler(threadNum, numMessages);
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.rollback();
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, false);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.rollback();
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.commit();
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler(threadNum, numMessages);
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
+      }
+
+      handlers.clear();
+
+      // Set handlers to null
+      for (ClientConsumer consumer : consumers)
+      {
+         consumer.setMessageHandler(null);
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.rollback();
+      }
+
+      // New handlers
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler(threadNum, numMessages);
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed on rollback: " + handler.failure);
+         }
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   // Now with synchronous receive()
+
+   protected void doTestE(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      consumeMessages(consumers, numMessages, threadNum);
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestF(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      consumeMessages(consumers, numMessages, threadNum);
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, false);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.rollback();
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.commit();
+
+      consumeMessages(consumers, numMessages, threadNum);
+
+      for (ClientSession session : sessions)
+      {
+         session.rollback();
+      }
+
+      consumeMessages(consumers, numMessages, threadNum);
+
+      for (ClientSession session : sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, false);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.rollback();
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.commit();
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      consumeMessages(consumers, numMessages, threadNum);
+
+      for (ClientSession session : sessions)
+      {
+         session.rollback();
+      }
+
+      consumeMessages(consumers, numMessages, threadNum);
+
+      for (ClientSession session : sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.start();
+
+      ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
+
+      ClientProducer producer = sess.createProducer(ADDRESS);
+
+      ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+      message.getBody().flip();
+
+      producer.send(message);
+
+      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+      assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.close();
+
+      sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
+
+      sessCreate.close();
+   }
+
+   protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.start();
+
+      ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
+
+      ClientProducer producer = sess.createProducer(ADDRESS);
+
+      ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+      message.getBody().flip();
+
+      producer.send(message);
+
+      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+      assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.close();
+
+      sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
+
+      sessCreate.close();
+   }
+
+   protected void doTestK(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      ClientSession s = sf.createSession(false, false, false);
+
+      s.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
+
+      final int numConsumers = 100;
+
+      for (int i = 0; i < numConsumers; i++)
+      {
+         ClientConsumer consumer = s.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
+
+         consumer.close();
+      }
+
+      s.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
+
+      s.close();
+   }
+
+   protected void doTestL(final ClientSessionFactory sf) throws Exception
+   {
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numSessions = 100;
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.close();
+      }
+
+      s.close();
+   }
+
+   // Browsers
+   // FIXME - this test won't work until we use a proper iterator for browsing a queue.
+   // Making a copy of the queue for a browser consumer doesn't work well with replication since
+   // When replicating the create consumer (browser) to the backup, when executed on the backup the
+   // backup may have different messages in its queue since been added on different threads.
+   // So when replicating deliveries they may not be found.
+   // https://jira.jboss.org/jira/browse/JBMESSAGING-1433
+   // protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
+   // {
+   // long start = System.currentTimeMillis();
+   //
+   // ClientSession sessSend = sf.createSession(false, true, true, false);
+   //      
+   // ClientSession sessConsume = sf.createSession(false, true, true, false);
+   //      
+   // sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
+   //
+   // final int numMessages = 100;
+   //
+   // ClientProducer producer = sessSend.createProducer(ADDRESS);
+   //
+   // sendMessages(sessSend, producer, numMessages, threadNum);
+   //      
+   // ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
+   // null, false, true);
+   //      
+   // Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
+   //      
+   // for (int i = 0; i < numMessages; i++)
+   // {
+   // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
+   //
+   // assertNotNull(msg);
+   //
+   // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+   // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+   //
+   // Integer c = consumerCounts.get(tn);
+   // if (c == null)
+   // {
+   // c = new Integer(cnt);
+   // }
+   //
+   // if (cnt != c.intValue())
+   // {
+   // throw new Exception("Invalid count, expected " + c + " got " + cnt);
+   // }
+   //         
+   // c++;
+   //         
+   // //Wrap
+   // if (c == numMessages)
+   // {
+   // c = 0;
+   // }
+   //         
+   // consumerCounts.put(tn, c);
+   //
+   // msg.acknowledge();
+   // }
+   //
+   // sessConsume.close();
+   //      
+   // sessConsume = sf.createSession(false, true, true, false);
+   //      
+   // browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
+   // null, false, true);
+   //      
+   // //Messages should still be there
+   //      
+   // consumerCounts.clear();
+   //      
+   // for (int i = 0; i < numMessages; i++)
+   // {
+   // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
+   //
+   // assertNotNull(msg);
+   //
+   // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+   // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+   //
+   // Integer c = consumerCounts.get(tn);
+   // if (c == null)
+   // {
+   // c = new Integer(cnt);
+   // }
+   //
+   // if (cnt != c.intValue())
+   // {
+   // throw new Exception("Invalid count, expected " + c + " got " + cnt);
+   // }
+   //         
+   // c++;
+   //         
+   // //Wrap
+   // if (c == numMessages)
+   // {
+   // c = 0;
+   // }
+   //         
+   // consumerCounts.put(tn, c);
+   //
+   // msg.acknowledge();
+   // }
+   //      
+   // sessConsume.close();
+   //      
+   // sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
+   //      
+   // sessSend.close();
+   //
+   // long end = System.currentTimeMillis();
+   //
+   // log.info("duration " + (end - start));
+   // }
+
+   protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.stop();
+
+      sess.start();
+
+      sess.stop();
+
+      ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
+
+      ClientProducer producer = sess.createProducer(ADDRESS);
+
+      ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+      message.getBody().flip();
+
+      producer.send(message);
+
+      sess.start();
+
+      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+      assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.stop();
+
+      sess.start();
+
+      sess.close();
+
+      sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
+
+      sessCreate.close();
+   }
+
+   protected int getNumIterations()
+   {
+      return 20;
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      log.info("************ Starting test " + this.getName());
+
+      timer = new Timer();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      log.info("************* Ending test " + this.getName());
+
+      if (liveService != null && liveService.isStarted())
+      {
+         liveService.stop();
+      }
+      if (backupService != null && backupService.isStarted())
+      {
+         backupService.stop();
+      }
+      timer.cancel();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   private void runTestMultipleThreads(final RunnableT runnable, final int numThreads, final boolean fileBased) throws Exception
+   {
+      final int numIts = getNumIterations();
+
+      for (int its = 0; its < numIts; its++)
+      {
+         log.info("************ ITERATION: " + its);
+
+         start();
+
+         final ClientSessionFactoryInternal sf = createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         Failer failer = startFailer(1000, session);
+
+         class Runner extends Thread
+         {
+            private volatile Throwable throwable;
+
+            private final RunnableT test;
+
+            private final int threadNum;
+
+            Runner(final RunnableT test, final int threadNum)
+            {
+               this.test = test;
+
+               this.threadNum = threadNum;
+            }
+
+            public void run()
+            {
+               try
+               {
+                  test.run(sf, threadNum);
+               }
+               catch (Throwable t)
+               {
+                  throwable = t;
+
+                  log.error("Failed to run test", t);
+               }
+            }
+         }
+
+         do
+         {
+            List<Runner> threads = new ArrayList<Runner>();
+
+            for (int i = 0; i < numThreads; i++)
+            {
+               Runner runner = new Runner(runnable, i);
+
+               threads.add(runner);
+
+               runner.start();
+            }
+
+            for (Runner thread : threads)
+            {
+               thread.join();
+
+               if (thread.throwable != null)
+               {
+                  throw new Exception("Exception on thread " + thread, thread.throwable);
+               }
+            }
+
+            runnable.checkFail();
+         }
+         while (!failer.isExecuted());
+
+         InVMConnector.resetFailures();
+
+         session.close();
+
+         assertEquals(0, sf.numSessions());
+
+         assertEquals(0, sf.numConnections());
+
+         stop();
+      }
+   }
+
+
+   private Failer startFailer(final long time, final ClientSession session)
+   {
+      Failer failer = new Failer(session);
+
+      timer.schedule(failer, (long)(time * Math.random()), 100);
+
+      return failer;
+   }
+
+   /**
+    * @return
+    */
+   protected ClientSessionFactoryInternal createSessionFactory()
+   {
+      final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                           new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                      backupParams),
+                                                                           0,
+                                                                           1,
+                                                                           ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                           ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
+
+      sf.setSendWindowSize(32 * 1024);
+      return sf;
+   }
+   
+
+
+   private void stop() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   private void sendMessages(final ClientSession sessSend,
+                             final ClientProducer producer,
+                             final int numMessages,
+                             final int threadNum) throws Exception
+   {
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossBytesMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("threadnum"), threadNum);
+         message.putIntProperty(new SimpleString("count"), i);
+         setBody(message);
+         producer.send(message);
+      }
+   }
+   
+   private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages, final int threadNum) throws Exception
+   {
+      // We make sure the messages arrive in the order they were sent from a particular producer
+      Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            Map<Integer, Integer> consumerCounts = counts.get(consumer);
+
+            if (consumerCounts == null)
+            {
+               consumerCounts = new HashMap<Integer, Integer>();
+               counts.put(consumer, consumerCounts);
+            }
+
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+            int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+
+            // log.info("Got message " + tn + ":" + cnt);
+
+            Integer c = consumerCounts.get(tn);
+            if (c == null)
+            {
+               c = new Integer(cnt);
+            }
+
+            if (tn == threadNum && cnt != c.intValue())
+            {
+               throw new Exception("Invalid count, expected " + tn + ": " + c + " got " + cnt);
+            }
+
+            c++;
+
+            // Wrap
+            if (c == numMessages)
+            {
+               c = 0;
+            }
+
+            consumerCounts.put(tn, c);
+
+            msg.acknowledge();
+         }
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+
+   private class Failer extends TimerTask
+   {
+      private final ClientSession session;
+
+      private boolean executed;
+
+      public Failer(final ClientSession session)
+      {
+         this.session = session;
+      }
+
+      public synchronized void run()
+      {
+         log.info("** Failing connection");
+
+         RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+         InVMConnector.numberOfFailures = 1;
+         InVMConnector.failOnCreateConnection = true;
+         conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+         log.info("** Fail complete");
+
+         cancel();
+
+         executed = true;
+      }
+
+      public synchronized boolean isExecuted()
+      {
+         return executed;
+      }
+   }
+
+   private abstract class RunnableT extends Thread
+   {
+      private volatile String failReason;
+
+      private volatile Throwable throwable;
+
+      public void setFailed(final String reason, final Throwable throwable)
+      {
+         this.failReason = reason;
+         this.throwable = throwable;
+      }
+
+      public void checkFail()
+      {
+         if (throwable != null)
+         {
+            log.error("Test failed: " + failReason, throwable);
+         }
+         if (failReason != null)
+         {
+            fail(failReason);
+         }
+      }
+
+      public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
+   }
+
+   private class MyHandler implements MessageHandler
+   {
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      private Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
+
+      volatile String failure;
+
+      final int tn;
+
+      final int numMessages;
+
+      volatile boolean done;
+
+      MyHandler(final int threadNum, final int numMessages)
+      {
+         this.tn = threadNum;
+
+         this.numMessages = numMessages;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException me)
+         {
+            log.error("Failed to process", me);
+         }
+
+         if (done)
+         {
+            return;
+         }
+
+         int threadNum = (Integer)message.getProperty(new SimpleString("threadnum"));
+         int cnt = (Integer)message.getProperty(new SimpleString("count"));
+
+         Integer c = counts.get(threadNum);
+         if (c == null)
+         {
+            c = new Integer(cnt);
+         }
+
+         // log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
+
+         if (tn == threadNum && cnt != c.intValue())
+         {
+            failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
+            log.error(failure);
+
+            latch.countDown();
+         }
+
+         checkSize(message);
+
+         if (tn == threadNum && c == numMessages - 1)
+         {
+            done = true;
+            latch.countDown();
+         }
+
+         c++;
+         // Wrap around at numMessages
+         if (c == numMessages)
+         {
+            c = 0;
+         }
+
+         counts.put(threadNum, c);
+
+      }
+   }
+}
\ No newline at end of file




More information about the jboss-cvs-commits mailing list