[hornetq-commits] JBoss hornetq SVN: r10479 - in trunk/tests: soak-tests and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Apr 12 05:47:15 EDT 2011


Author: ataylor
Date: 2011-04-12 05:47:15 -0400 (Tue, 12 Apr 2011)
New Revision: 10479

Added:
   trunk/tests/soak-tests/
   trunk/tests/soak-tests/pom.xml
   trunk/tests/soak-tests/src/
   trunk/tests/soak-tests/src/test/
   trunk/tests/soak-tests/src/test/java/
   trunk/tests/soak-tests/src/test/java/org/
   trunk/tests/soak-tests/src/test/java/org/hornetq/
   trunk/tests/soak-tests/src/test/java/org/hornetq/tests/
   trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/
Removed:
   trunk/tests/src/org/hornetq/tests/soak/
Modified:
   trunk/tests/hornetq-tests.iml
   trunk/tests/pom.xml
   trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java
   trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java
   trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
   trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java
   trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
Log:
mavenised soak tests

Modified: trunk/tests/hornetq-tests.iml
===================================================================
--- trunk/tests/hornetq-tests.iml	2011-04-11 23:03:32 UTC (rev 10478)
+++ trunk/tests/hornetq-tests.iml	2011-04-12 09:47:15 UTC (rev 10479)
@@ -8,6 +8,7 @@
       <sourceFolder url="file://$MODULE_DIR$/integration-tests/src/test/java" isTestSource="true" />
       <sourceFolder url="file://$MODULE_DIR$/unit-tests/src/main/java" isTestSource="true" />
       <sourceFolder url="file://$MODULE_DIR$/integration-tests/src/main/java" isTestSource="true" />
+      <sourceFolder url="file://$MODULE_DIR$/soak-tests/src/test/java" isTestSource="true" />
       <excludeFolder url="file://$MODULE_DIR$/jms-tests" />
       <excludeFolder url="file://$MODULE_DIR$/joram-tests" />
       <excludeFolder url="file://$MODULE_DIR$/logs" />

Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml	2011-04-11 23:03:32 UTC (rev 10478)
+++ trunk/tests/pom.xml	2011-04-12 09:47:15 UTC (rev 10479)
@@ -28,5 +28,6 @@
         <module>integration-tests</module>
         <module>jms-tests</module>
         <module>joram-tests</module>
+        <module>soak-tests</module>
     </modules>
 </project>

Added: trunk/tests/soak-tests/pom.xml
===================================================================
--- trunk/tests/soak-tests/pom.xml	                        (rev 0)
+++ trunk/tests/soak-tests/pom.xml	2011-04-12 09:47:15 UTC (rev 10479)
@@ -0,0 +1,140 @@
+<!--
+  ~ Copyright 2009 Red Hat, Inc.
+  ~  Red Hat licenses this file to you under the Apache License, version
+  ~  2.0 (the "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+  ~  implied.  See the License for the specific language governing
+  ~  permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+   <parent>
+      <groupId>org.hornetq.tests</groupId>
+      <artifactId>hornetq-tests-pom</artifactId>
+      <version>2.2.3-SNAPSHOT</version>
+   </parent>
+
+   <groupId>org.hornetq.tests</groupId>
+   <artifactId>soak-tests</artifactId>
+   <packaging>jar</packaging>
+   <name>HornetQ soak Tests</name>
+
+    <dependencies>
+      <dependency>
+         <groupId>org.hornetq.tests</groupId>
+         <artifactId>unit-tests</artifactId>
+         <version>${project.version}</version>
+         <scope>compile</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.hornetq.tests</groupId>
+         <artifactId>integration-tests</artifactId>
+         <version>${project.version}</version>
+         <scope>compile</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.hornetq.tests</groupId>
+         <artifactId>jms-tests</artifactId>
+         <version>${project.version}</version>
+         <scope>compile</scope>
+      </dependency>
+        <dependency>
+            <groupId>org.hornetq</groupId>
+            <artifactId>hornetq-jms</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hornetq</groupId>
+            <artifactId>hornetq-ra</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hornetq</groupId>
+            <artifactId>hornetq-bootstrap</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.javaee</groupId>
+            <artifactId>jboss-jca-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.security</groupId>
+            <artifactId>jboss-security-spi</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.security</groupId>
+            <artifactId>jbosssx</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.naming</groupId>
+            <artifactId>jnpserver</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>jboss.jbossts</groupId>
+            <artifactId>jbossts-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>apache-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.javaee</groupId>
+            <artifactId>jboss-transaction-api</artifactId>
+        </dependency>
+        <!--this specifically for the JMS Bridge-->
+        <dependency>
+            <groupId>org.jboss.integration</groupId>
+            <artifactId>jboss-transaction-spi</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.javaee</groupId>
+            <artifactId>jboss-jaspi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.javaee</groupId>
+            <artifactId>jboss-jms-api</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <testResources>
+            <testResource>
+                <directory>config</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
+              <artifactId>maven-surefire-plugin</artifactId>
+              <configuration>
+                  <includes>
+                      <include>**/*Test.java</include>
+                  </includes>
+              </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.6</source>
+                    <target>1.6</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java	2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java	2011-04-12 09:47:15 UTC (rev 10479)
@@ -25,7 +25,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
 
 /**
  * A ClientSoakTest

Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java	2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java	2011-04-12 09:47:15 UTC (rev 10479)
@@ -27,7 +27,7 @@
 import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
 
 /**
  * A ClientSoakTest

Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java	2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java	2011-04-12 09:47:15 UTC (rev 10479)
@@ -25,7 +25,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
 
 /**
  * A ClientSoakTest

Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java	2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java	2011-04-12 09:47:15 UTC (rev 10479)
@@ -13,8 +13,27 @@
 
 package org.hornetq.tests.soak.failover;
 
-import org.hornetq.tests.integration.cluster.reattach.RandomReattachTest;
+import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
 
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
  * A RandomFailoverSoakTest
  *
@@ -24,10 +43,1515 @@
  *
  *
  */
-public class RandomFailoverSoakTest extends RandomReattachTest
+public class RandomFailoverSoakTest extends UnitTestCase
 {
+   private static final Logger log = Logger.getLogger(RandomFailoverSoakTest.class);
 
+   // Constants -----------------------------------------------------
+
+   private static final int RECEIVE_TIMEOUT = 10000;
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private HornetQServer liveService;
+
+   private Timer timer;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testA() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestA(sf);
+         }
+      });
+   }
+
+   public void testB() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestB(sf);
+         }
+      });
+   }
+
+   public void testC() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestC(sf);
+         }
+      });
+   }
+
+   public void testD() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestD(sf);
+         }
+      });
+   }
+
+   public void testE() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestE(sf);
+         }
+      });
+   }
+
+   public void testF() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestF(sf);
+         }
+      });
+   }
+
+   public void testG() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestG(sf);
+         }
+      });
+   }
+
+   public void testH() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestH(sf);
+         }
+      });
+   }
+
+   public void testI() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestI(sf);
+         }
+      });
+   }
+
+   public void testJ() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestJ(sf);
+         }
+      });
+   }
+
+   public void testK() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestK(sf);
+         }
+      });
+   }
+
+   public void testL() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestL(sf);
+         }
+      });
+   }
+
+   public void testN() throws Exception
+   {
+      runTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception
+         {
+            doTestN(sf);
+         }
+      });
+   }
+
+   public void runTest(final RunnableT runnable) throws Exception
+   {
+      final int numIts = getNumIterations();
+
+      for (int its = 0; its < numIts; its++)
+      {
+         RandomFailoverSoakTest.log.info("####" + getName() + " iteration #" + its);
+         start();
+         ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+
+         locator.setReconnectAttempts(-1);
+         locator.setConfirmationWindowSize(1024 * 1024);
+
+         ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl) locator.createSessionFactory();
+
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         Failer failer = startFailer(1000, session);
+
+         do
+         {
+            runnable.run(sf);
+         }
+         while (!failer.isExecuted());
+
+         session.close();
+
+         locator.close();
+
+         Assert.assertEquals(0, sf.numSessions());
+
+         Assert.assertEquals(0, sf.numConnections());
+
+         stop();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void doTestA(final ClientSessionFactory sf) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      class MyHandler extends AssertionCheckMessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessageAssert(final ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               Assert.fail("Too many messages");
+            }
+
+            Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+            count++;
+
+            try
+            {
+               message.acknowledge();
+            }
+            catch (HornetQException me)
+            {
+               RandomFailoverSoakTest.log.error("Failed to process", me);
+            }
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+         
+         handler.checkAssertions();
+
+         Assert.assertTrue("Didn't receive all messages", ok);
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      RandomFailoverSoakTest.log.info("duration " + (end - start));
+   }
+
+   protected void doTestB(final ClientSessionFactory sf) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 50;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      class MyHandler extends AssertionCheckMessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessageAssert(final ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               Assert.fail("Too many messages");
+            }
+
+            Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+            count++;
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+         
+         handler.checkAssertions();
+
+         Assert.assertTrue(ok);
+      }
+
+      sessSend.close();
+
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      RandomFailoverSoakTest.log.info("duration " + (end - start));
+
+   }
+
+   protected void doTestC(final ClientSessionFactory sf) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 1;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, true);
+
+      ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.rollback();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.commit();
+
+      class MyHandler extends AssertionCheckMessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessageAssert(final ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               Assert.fail("Too many messages, expected " + count);
+            }
+
+            Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+            count++;
+            
+            try
+            {
+               message.acknowledge();
+            }
+            catch (HornetQException e)
+            {
+               e.printStackTrace();
+               throw new RuntimeException (e.getMessage(), e);
+            }
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         Assert.assertTrue(ok);
+         
+         handler.checkAssertions();
+      }
+
+      handlers.clear();
+
+      // New handlers
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.rollback();
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         Assert.assertTrue(ok);
+         
+         handler.checkAssertions();
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      RandomFailoverSoakTest.log.info("duration " + (end - start));
+   }
+
+   protected void doTestD(final ClientSessionFactory sf) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, true);
+
+      ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.rollback();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.commit();
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      class MyHandler extends AssertionCheckMessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessageAssert(final ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               Assert.fail("Too many messages, " + count);
+            }
+
+            Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+            count++;
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
+
+         Assert.assertTrue(ok);
+         
+         handler.checkAssertions();
+      }
+
+      handlers.clear();
+
+      // New handlers
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.rollback();
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         Assert.assertTrue(ok);
+         
+         handler.checkAssertions();
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      RandomFailoverSoakTest.log.info("duration " + (end - start));
+   }
+
+   // Now with synchronous receive()
+
+   protected void doTestE(final ClientSessionFactory sf) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+            Assert.assertNotNull(msg);
+
+            Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            Assert.assertNull(msg);
+         }
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      RandomFailoverSoakTest.log.info("duration " + (end - start));
+   }
+
+   protected void doTestF(final ClientSessionFactory sf) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+            if (msg == null)
+            {
+               throw new IllegalStateException("Failed to receive message " + i);
+            }
+
+            Assert.assertNotNull(msg);
+
+            Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            Assert.assertNull(msg);
+         }
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      Assert.assertEquals(1, ((ClientSessionFactoryImpl)sf).numSessions());
+
+      long end = System.currentTimeMillis();
+
+      RandomFailoverSoakTest.log.info("duration " + (end - start));
+   }
+
+   protected void doTestG(final ClientSessionFactory sf) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, false);
+
+      ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.rollback();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.commit();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+            Assert.assertNotNull(msg);
+
+            Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (ClientConsumer consumer : consumers)
+      {
+         ClientMessage msg = consumer.receiveImmediate();
+
+         Assert.assertNull(msg);
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.rollback();
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+            Assert.assertNotNull(msg);
+
+            Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            Assert.assertNull(msg);
+         }
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      RandomFailoverSoakTest.log.info("duration " + (end - start));
+   }
+
+   protected void doTestH(final ClientSessionFactory sf) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, false);
+
+      ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.rollback();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.commit();
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+            Assert.assertNotNull(msg);
+
+            Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            Assert.assertNull(msg);
+         }
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.rollback();
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+            Assert.assertNotNull(msg);
+
+            Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            Assert.assertNull(msg);
+         }
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      RandomFailoverSoakTest.log.info("duration " + (end - start));
+   }
+
+   protected void doTestI(final ClientSessionFactory sf) throws Exception
+   {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS, RandomFailoverSoakTest.ADDRESS, null, false);
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.start();
+
+      ClientConsumer consumer = sess.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+      ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+      producer.send(message);
+
+      ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+      Assert.assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.close();
+
+      sessCreate.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+      sessCreate.close();
+   }
+
+   protected void doTestJ(final ClientSessionFactory sf) throws Exception
+   {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS, RandomFailoverSoakTest.ADDRESS, null, false);
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.start();
+
+      ClientConsumer consumer = sess.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+      ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+      producer.send(message);
+
+      ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+      Assert.assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.close();
+
+      sessCreate.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+      sessCreate.close();
+   }
+
+   protected void doTestK(final ClientSessionFactory sf) throws Exception
+   {
+      ClientSession s = sf.createSession(false, false, false);
+
+      s.createQueue(RandomFailoverSoakTest.ADDRESS, RandomFailoverSoakTest.ADDRESS, null, false);
+
+      final int numConsumers = 100;
+
+      for (int i = 0; i < numConsumers; i++)
+      {
+         ClientConsumer consumer = s.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+         consumer.close();
+      }
+
+      s.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+      s.close();
+   }
+
+   protected void doTestL(final ClientSessionFactory sf) throws Exception
+   {
+      final int numSessions = 10;
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.close();
+      }
+   }
+
+   protected void doTestN(final ClientSessionFactory sf) throws Exception
+   {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS,
+                             new SimpleString(RandomFailoverSoakTest.ADDRESS.toString()),
+                             null,
+                             false);
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.stop();
+
+      sess.start();
+
+      sess.stop();
+
+      ClientConsumer consumer = sess.createConsumer(new SimpleString(RandomFailoverSoakTest.ADDRESS.toString()));
+
+      ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+      ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+      producer.send(message);
+
+      sess.start();
+
+      ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+      Assert.assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.stop();
+
+      sess.start();
+
+      sess.close();
+
+      sessCreate.deleteQueue(new SimpleString(RandomFailoverSoakTest.ADDRESS.toString()));
+
+      sessCreate.close();
+   }
+   
    @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      timer = new Timer(true);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      timer.cancel();
+
+      InVMRegistry.instance.clear();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   private Failer startFailer(final long time, final ClientSession session)
+   {
+      Failer failer = new Failer((ClientSessionInternal)session);
+
+      timer.schedule(failer, (long)(time * Math.random()), 100);
+
+      return failer;
+   }
+
+   private void start() throws Exception
+   {
+      Configuration liveConf = createDefaultConfig();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveService = HornetQServers.newHornetQServer(liveConf, false);
+      liveService.start();
+   }
+
+   private void stop() throws Exception
+   {
+      liveService.stop();
+
+      Assert.assertEquals(0, InVMRegistry.instance.size());
+
+      liveService = null;
+   }
+
+   // Inner classes -------------------------------------------------
+
+   class Failer extends TimerTask
+   {
+      private final ClientSessionInternal session;
+
+      private boolean executed;
+
+      public Failer(final ClientSessionInternal session)
+      {
+         this.session = session;
+      }
+
+      @Override
+      public synchronized void run()
+      {
+         RandomFailoverSoakTest.log.info("** Failing connection");
+
+         session.getConnection().fail(new HornetQException(HornetQException.NOT_CONNECTED, "oops"));
+
+         RandomFailoverSoakTest.log.info("** Fail complete");
+
+         cancel();
+
+         executed = true;
+      }
+
+      public synchronized boolean isExecuted()
+      {
+         return executed;
+      }
+   }
+
+   public abstract class RunnableT
+   {
+      abstract void run(final ClientSessionFactory sf) throws Exception;
+   }
+   
+   static abstract class AssertionCheckMessageHandler implements MessageHandler
+   {
+      
+      
+      public void checkAssertions()
+      {
+         for (AssertionFailedError e: errors)
+         {
+            // it will throw the first error
+            throw e;
+         }
+      }
+      
+      private ArrayList<AssertionFailedError> errors = new ArrayList<AssertionFailedError>(); 
+
+      /* (non-Javadoc)
+       * @see org.hornetq.api.core.client.MessageHandler#onMessage(org.hornetq.api.core.client.ClientMessage)
+       */
+      public void onMessage(ClientMessage message)
+      {
+         try
+         {
+            onMessageAssert(message);
+         }
+         catch (AssertionFailedError e)
+         {
+            e.printStackTrace(); // System.out -> junit reports
+            errors.add(e);
+         }
+      }
+      
+      public abstract void onMessageAssert(ClientMessage message);
+      
+   }
+
    protected int getNumIterations()
    {
       return 500;

Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java	2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java	2011-04-12 09:47:15 UTC (rev 10479)
@@ -13,9 +13,27 @@
 
 package org.hornetq.tests.soak.journal;
 
-import java.util.concurrent.TimeUnit;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.hornetq.tests.stress.journal.JournalCleanupCompactStressTest;
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.*;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleIDGenerator;
 
 /**
  * A JournalCleanupCompactSoakTest
@@ -24,9 +42,538 @@
  *
  *
  */
-public class JournalCleanupCompactSoakTest extends JournalCleanupCompactStressTest
+public class JournalCleanupCompactSoakTest extends ServiceTestBase
 {
 
+
+   public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
+
+   private static final int MAX_WRITES = 20000;
+
+   private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+   // We want to maximize the difference between appends and deles, or we could get out of memory
+   public Semaphore maxRecords;
+
+   private volatile boolean running;
+
+   private AtomicInteger errors = new AtomicInteger(0);
+
+   private AtomicInteger numberOfRecords = new AtomicInteger(0);
+
+   private AtomicInteger numberOfUpdates = new AtomicInteger(0);
+
+   private AtomicInteger numberOfDeletes = new AtomicInteger(0);
+
+   private JournalImpl journal;
+
+   ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
+                                                     false,
+                                                     JournalCleanupCompactSoakTest.class.getClassLoader());
+
+   private ExecutorService threadPool;
+
+   private OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+
+   Executor testExecutor;
+
+
+   @Override
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      threadPool = Executors.newFixedThreadPool(20, tFactory);
+      executorFactory = new OrderedExecutorFactory(threadPool);
+      testExecutor = executorFactory.getExecutor();
+
+      maxRecords = new Semaphore(MAX_WRITES);
+
+      errors.set(0);
+
+      File dir = new File(getTemporaryDir());
+      dir.mkdirs();
+
+      SequentialFileFactory factory;
+
+      int maxAIO;
+      if (AsynchronousFileImpl.isLoaded())
+      {
+         factory = new AIOSequentialFileFactory(dir.getPath());
+         maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO;
+      }
+      else
+      {
+         factory = new NIOSequentialFileFactory(dir.getPath(), true);
+         maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
+      }
+
+      journal = new JournalImpl(50 * 1024,
+                                20,
+                                50,
+                                ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
+                                factory,
+                                "hornetq-data",
+                                "hq",
+                                maxAIO)
+      {
+         protected void onCompactLock() throws Exception
+         {
+         }
+
+         protected void onCompactStart() throws Exception
+         {
+            testExecutor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     // System.out.println("OnCompactStart enter");
+                     if (running)
+                     {
+                        long id = idGen.generateID();
+                        journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
+                        journal.forceMoveNextFile();
+                        journal.appendDeleteRecord(id, id == 20);
+                     }
+                     // System.out.println("OnCompactStart leave");
+                  }
+                  catch (Exception e)
+                  {
+                     e.printStackTrace();
+                     errors.incrementAndGet();
+                  }
+               }
+            });
+
+         }
+
+      };
+
+      journal.start();
+      journal.loadInternalOnly();
+
+   }
+
+   @Override
+   public void tearDown() throws Exception
+   {
+      try
+      {
+         if (journal.isStarted())
+         {
+            journal.stop();
+         }
+      }
+      catch (Exception e)
+      {
+         // don't care :-)
+      }
+
+      threadPool.shutdown();
+   }
+
+   public void testAppend() throws Exception
+   {
+
+      running = true;
+      SlowAppenderNoTX t1 = new SlowAppenderNoTX();
+
+      int NTHREADS = 5;
+
+      FastAppenderTx appenders[] = new FastAppenderTx[NTHREADS];
+      FastUpdateTx updaters[] = new FastUpdateTx[NTHREADS];
+
+      for (int i = 0; i < NTHREADS; i++)
+      {
+         appenders[i] = new FastAppenderTx();
+         updaters[i] = new FastUpdateTx(appenders[i].queue);
+      }
+
+      t1.start();
+
+      Thread.sleep(1000);
+
+      for (int i = 0; i < NTHREADS; i++)
+      {
+         appenders[i].start();
+         updaters[i].start();
+      }
+
+      long timeToEnd = System.currentTimeMillis() + getTotalTimeMilliseconds();
+
+      while (System.currentTimeMillis() < timeToEnd)
+      {
+         System.out.println("Append = " + numberOfRecords +
+                            ", Update = " +
+                            numberOfUpdates +
+                            ", Delete = " +
+                            numberOfDeletes +
+                            ", liveRecords = " +
+                            (numberOfRecords.get() - numberOfDeletes.get()));
+         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+         rwLock.writeLock().lock();
+         System.out.println("Restarting server");
+         journal.stop();
+         journal.start();
+         reloadJournal();
+         rwLock.writeLock().unlock();
+      }
+
+      running = false;
+
+      // Release Semaphore after setting running to false or the threads may never finish
+      maxRecords.release(MAX_WRITES - maxRecords.availablePermits());
+
+      for (Thread t : appenders)
+      {
+         t.join();
+      }
+
+      for (Thread t : updaters)
+      {
+         t.join();
+      }
+
+      t1.join();
+
+      final CountDownLatch latchExecutorDone = new CountDownLatch(1);
+      testExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            latchExecutorDone.countDown();
+         }
+      });
+
+      latchExecutorDone.await();
+
+      journal.stop();
+
+      journal.start();
+
+      reloadJournal();
+
+      Collection<Long> records = journal.getRecords().keySet();
+
+      System.out.println("Deleting everything!");
+      for (Long delInfo : records)
+      {
+         journal.appendDeleteRecord(delInfo, false);
+      }
+
+      journal.forceMoveNextFile();
+
+      journal.checkReclaimStatus();
+
+      Thread.sleep(5000);
+
+      assertEquals(0, journal.getDataFilesCount());
+
+      journal.stop();
+   }
+
+   /**
+    * @throws Exception
+    */
+   private void reloadJournal() throws Exception
+   {
+      assertEquals(0, errors.get());
+
+      ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+      ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+      journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
+      {
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+         }
+      });
+
+      long appends = 0, updates = 0;
+
+      for (RecordInfo record : committedRecords)
+      {
+         if (record.isUpdate)
+         {
+            updates++;
+         }
+         else
+         {
+            appends++;
+         }
+      }
+
+      assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
+   }
+
+   private byte[] generateRecord()
+   {
+      int size = RandomUtil.randomPositiveInt() % 10000;
+      if (size == 0)
+      {
+         size = 10000;
+      }
+      return RandomUtil.randomBytes(size);
+   }
+
+   class FastAppenderTx extends Thread
+   {
+      LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
+
+      OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+      public FastAppenderTx()
+      {
+         super("FastAppenderTX");
+      }
+
+      @Override
+      public void run()
+      {
+         rwLock.readLock().lock();
+
+         try
+         {
+            while (running)
+            {
+               final int txSize = RandomUtil.randomMax(100);
+
+               long txID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+               long rollbackTXID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+               final long ids[] = new long[txSize];
+
+               for (int i = 0; i < txSize; i++)
+               {
+                  ids[i] = JournalCleanupCompactSoakTest.idGen.generateID();
+               }
+
+               journal.appendAddRecordTransactional(rollbackTXID, ids[0], (byte)0, generateRecord());
+               journal.appendRollbackRecord(rollbackTXID, true);
+
+               for (int i = 0; i < txSize; i++)
+               {
+                  long id = ids[i];
+                  journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
+                  maxRecords.acquire();
+               }
+               journal.appendCommitRecord(txID, true, ctx);
+
+               ctx.executeOnCompletion(new IOAsyncTask()
+               {
+
+                  public void onError(final int errorCode, final String errorMessage)
+                  {
+                  }
+
+                  public void done()
+                  {
+                     numberOfRecords.addAndGet(txSize);
+                     for (Long id : ids)
+                     {
+                        queue.add(id);
+                     }
+                  }
+               });
+
+               rwLock.readLock().unlock();
+
+               Thread.yield();
+
+               rwLock.readLock().lock();
+
+
+            }
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            running = false;
+            errors.incrementAndGet();
+         }
+         finally
+         {
+            rwLock.readLock().unlock();
+         }
+      }
+   }
+
+   class FastUpdateTx extends Thread
+   {
+      final LinkedBlockingDeque<Long> queue;
+
+      OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+      public FastUpdateTx(final LinkedBlockingDeque<Long> queue)
+      {
+         super("FastUpdateTX");
+         this.queue = queue;
+      }
+
+      @Override
+      public void run()
+      {
+
+         rwLock.readLock().lock();
+
+         try
+         {
+            int txSize = RandomUtil.randomMax(100);
+            int txCount = 0;
+            long ids[] = new long[txSize];
+
+            long txID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+            while (running)
+            {
+
+               Long id = queue.poll(10, TimeUnit.SECONDS);
+               if (id != null)
+               {
+                  ids[txCount++] = id;
+                  journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+               }
+               if (txCount == txSize || id == null)
+               {
+                  if (txCount > 0)
+                  {
+                     journal.appendCommitRecord(txID, true, ctx);
+                     ctx.executeOnCompletion(new DeleteTask(ids));
+                  }
+
+                  rwLock.readLock().unlock();
+
+                  Thread.yield();
+
+                  rwLock.readLock().lock();
+
+                  txCount = 0;
+                  txSize = RandomUtil.randomMax(100);
+                  txID = JournalCleanupCompactSoakTest.idGen.generateID();
+                  ids = new long[txSize];
+               }
+            }
+
+            if (txCount > 0)
+            {
+               journal.appendCommitRecord(txID, true);
+            }
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            running = false;
+            errors.incrementAndGet();
+         }
+         finally
+         {
+            rwLock.readLock().unlock();
+         }
+      }
+   }
+
+   class DeleteTask implements IOAsyncTask
+   {
+      final long ids[];
+
+      DeleteTask(final long ids[])
+      {
+         this.ids = ids;
+      }
+
+      public void done()
+      {
+         rwLock.readLock().lock();
+         numberOfUpdates.addAndGet(ids.length);
+         try
+         {
+            for (long id : ids)
+            {
+               if (id != 0)
+               {
+                  journal.appendDeleteRecord(id, false);
+                  maxRecords.release();
+                  numberOfDeletes.incrementAndGet();
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            System.err.println("Can't delete id");
+            e.printStackTrace();
+            running = false;
+            errors.incrementAndGet();
+         }
+         finally
+         {
+            rwLock.readLock().unlock();
+         }
+      }
+
+      public void onError(final int errorCode, final String errorMessage)
+      {
+      }
+
+   }
+
+   /** Adds stuff to the journal, but it will take a long time to remove them.
+    *  This will cause cleanup and compacting to happen more often
+    */
+   class SlowAppenderNoTX extends Thread
+   {
+
+      public SlowAppenderNoTX()
+      {
+         super("SlowAppender");
+      }
+
+      @Override
+      public void run()
+      {
+         rwLock.readLock().lock();
+         try
+         {
+            while (running)
+            {
+               long ids[] = new long[5];
+               // Append
+               for (int i = 0; running & i < ids.length; i++)
+               {
+                  System.out.println("append slow");
+                  ids[i] = JournalCleanupCompactSoakTest.idGen.generateID();
+                  maxRecords.acquire();
+                  journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
+                  numberOfRecords.incrementAndGet();
+
+                  rwLock.readLock().unlock();
+
+                  Thread.sleep(TimeUnit.SECONDS.toMillis(50));
+
+                  rwLock.readLock().lock();
+               }
+               // Delete
+               for (int i = 0; running & i < ids.length; i++)
+               {
+                  System.out.println("Deleting");
+                  maxRecords.release();
+                  journal.appendDeleteRecord(ids[i], false);
+                  numberOfDeletes.incrementAndGet();
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            System.exit(-1);
+         }
+         finally
+         {
+            rwLock.readLock().unlock();
+         }
+      }
+   }
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------



More information about the hornetq-commits mailing list