[jboss-cvs] JBoss Messaging SVN: r6110 - in trunk: src/main/org/jboss/messaging/core/server/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 18 17:49:31 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-18 17:49:31 -0400 (Wed, 18 Mar 2009)
New Revision: 6110

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XAMultiThreadRandomFailoverTest.java
Modified:
   trunk/build-messaging.xml
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
   trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
Log:
XA Tests, JBMESSAGING-1548 (Empty XID and Rollback fix), XAFailoverTests and other small pieces

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/build-messaging.xml	2009-03-18 21:49:31 UTC (rev 6110)
@@ -286,32 +286,51 @@
       <path refid="jboss.jbossxb.classpath"/>
    </path>
 
-   <path id="unit.test.execution.classpath">
-      <!-- ensure that the core client jar is included for
-           tests security tests which needs to read version.properties
-           from inside this jar
-      -->
-      <fileset dir="${build.jars.dir}">
-         <include name="${core.client.jar.name}"/>
-      </fileset>
-      <pathelement location="${test.dir}/config"/>
-      <pathelement location="${test.dir}/tmpfiles"/>
-      <pathelement location="${test.classes.dir}"/>
-      <pathelement location="${src.config.dir}"/>
-      <pathelement location="${src.schemas.dir}"/>
-      <path refid="test.compilation.classpath"/>
-      <path refid="oswego.concurrent.classpath"/>
-      <path refid="apache.log4j.classpath"/>
-      <path refid="cglib.classpath"/>
-      <path refid="jboss.common.core.classpath"/>
-      <path refid="jboss.aop.classpath"/>
-      <path refid="trove.trove.classpath"/>
-      <path refid="javassist.classpath"/>
-      <path refid="jboss.jbossxb.classpath"/>
-      <path refid="apache.xerces.classpath"/>
-      <path refid="apache.logging.classpath"/>
-   </path>
+	   <path id="unit.test.execution.classpath">
+	      <!-- ensure that the core client jar is included for
+	           tests security tests which needs to read version.properties
+	           from inside this jar
+	      -->
+	      <fileset dir="${build.jars.dir}">
+	         <include name="${core.client.jar.name}"/>
+	      </fileset>
+	      <pathelement location="${test.dir}/config"/>
+	      <pathelement location="${test.dir}/tmpfiles"/>
+	      <pathelement location="${test.classes.dir}"/>
+	      <pathelement location="${src.config.dir}"/>
+	      <pathelement location="${src.schemas.dir}"/>
+	      <path refid="test.compilation.classpath"/>
+	      <path refid="oswego.concurrent.classpath"/>
+	      <path refid="apache.log4j.classpath"/>
+	      <path refid="cglib.classpath"/>
+	      <path refid="jboss.common.core.classpath"/>
+	      <path refid="jboss.aop.classpath"/>
+	      <path refid="trove.trove.classpath"/>
+	      <path refid="javassist.classpath"/>
+	      <path refid="jboss.jbossxb.classpath"/>
+	      <path refid="apache.xerces.classpath"/>
+	      <path refid="apache.logging.classpath"/>
+	   </path>
 
+	   <path id="emma.unit.test.execution.classpath">
+	      <pathelement location="${test.dir}/config"/>
+	      <pathelement location="${test.dir}/tmpfiles"/>
+	      <pathelement location="${test.classes.dir}"/>
+	      <pathelement location="${src.config.dir}"/>
+	      <pathelement location="${src.schemas.dir}"/>
+	      <path refid="test.compilation.classpath"/>
+	      <path refid="oswego.concurrent.classpath"/>
+	      <path refid="apache.log4j.classpath"/>
+	      <path refid="cglib.classpath"/>
+	      <path refid="jboss.common.core.classpath"/>
+	      <path refid="jboss.aop.classpath"/>
+	      <path refid="trove.trove.classpath"/>
+	      <path refid="javassist.classpath"/>
+	      <path refid="jboss.jbossxb.classpath"/>
+	      <path refid="apache.xerces.classpath"/>
+	      <path refid="apache.logging.classpath"/>
+	   </path>
+
    <path id="jms.test.execution.classpath">
       <pathelement location="${test.dir}/config"/>
       <pathelement location="${src.config.dir}"/>
@@ -1372,7 +1391,7 @@
 
       <path id="emma.execution.classpath">
          <path refid="emma.lib"/>
-         <path refid="unit.test.execution.classpath"/>
+         <path refid="emma.unit.test.execution.classpath"/>
       </path>
 
       <echo message=""/>

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -1560,6 +1560,7 @@
             if (!msgs.contains(msg))
             {
                store.addSize(-msg.getMemoryEstimate());
+               msg.decrementRefCount();
             }
 
             msgs.add(msg);

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -195,7 +195,7 @@
             }
          }
 
-         if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
+         if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED) )
          {
             storageManager.commit(id);
          }
@@ -336,7 +336,7 @@
 
    private void doRollback() throws Exception
    {
-      if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
+      if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED))
       {
          storageManager.rollback(id);
       }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -29,6 +29,9 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientFileMessage;
@@ -77,7 +80,8 @@
 
    // Protected -----------------------------------------------------
 
-   protected void testChunks(final boolean realFiles,
+   protected void testChunks(final boolean isXA,
+                             final boolean realFiles,
                              final boolean useFile,
                              final boolean preAck,
                              final boolean sendingBlocking,
@@ -87,7 +91,8 @@
                              final int waitOnConsumer,
                              final long delayDelivery) throws Exception
    {
-      testChunks(realFiles,
+      testChunks(isXA,
+                 realFiles,
                  useFile,
                  preAck,
                  sendingBlocking,
@@ -102,7 +107,8 @@
                  false);
    }
 
-   protected void testChunks(final boolean realFiles,
+   protected void testChunks(final boolean isXA,
+                             final boolean realFiles,
                              final boolean useFile,
                              final boolean preAck,
                              final boolean sendingBlocking,
@@ -116,7 +122,6 @@
                              final int minSizeConsumer,
                              final boolean testTime) throws Exception
    {
-
       clearData();
 
       messagingService = createService(realFiles);
@@ -139,67 +144,51 @@
          }
          
          sf.setMinLargeMessageSize(minSizeProducer);
+         
 
-         ClientSession session = sf.createSession(null, null, false, true, false, preAck, 0);
+         ClientSession session;
 
+         Xid xid = null;
+         session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+         
+         if (isXA)
+         {
+            xid = newXID();
+            session.start(xid, XAResource.TMNOFLAGS);
+         }
+         
          session.createQueue(ADDRESS, ADDRESS, null, true, false);
 
          ClientProducer producer = session.createProducer(ADDRESS);
 
-         if (useFile)
+         sendMessages(useFile, numberOfMessages, numberOfIntegers, delayDelivery, testTime, session, producer);
+
+         if (isXA)
          {
-            File tmpData = createLargeFile(getTemporaryDir(), "someFile.dat", numberOfIntegers);
+            session.end(xid, XAResource.TMSUCCESS);
+            session.rollback(xid);
+            xid = newXID();
+            session.start(xid, XAResource.TMNOFLAGS);
+         }
+         else
+         {
+            session.rollback();
+         }
 
-            for (int i = 0; i < numberOfMessages; i++)
-            {
-               ClientMessage message = session.createFileMessage(true);
-               ((ClientFileMessage)message).setFile(tmpData);
-               message.putIntProperty(new SimpleString("counter-message"), i);
-               long timeStart = System.currentTimeMillis();
-               if (delayDelivery > 0)
-               {
-                  long time = System.currentTimeMillis();
-                  message.putLongProperty(new SimpleString("original-time"), time);
-                  message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+         validateNoFilesOnLargeDir();
 
-                  producer.send(message);
-               }
-               else
-               {
-                  producer.send(message);
-               }
+         sendMessages(useFile, numberOfMessages, numberOfIntegers, delayDelivery, testTime, session, producer);
 
-               if (testTime)
-               {
-                  System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
-               }
-            }
+         if (isXA)
+         {
+            session.end(xid, XAResource.TMSUCCESS);
+            session.commit(xid, true);
+            xid = newXID();
+            session.start(xid, XAResource.TMNOFLAGS);
          }
          else
          {
-            for (int i = 0; i < numberOfMessages; i++)
-            {
-               ClientMessage message = session.createClientMessage(true);
-               message.putIntProperty(new SimpleString("counter-message"), i);
-               message.setBody(createLargeBuffer(numberOfIntegers));
-               long timeStart = System.currentTimeMillis();
-               if (delayDelivery > 0)
-               {
-                  long time = System.currentTimeMillis();
-                  message.putLongProperty(new SimpleString("original-time"), time);
-                  message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
-
-                  producer.send(message);
-               }
-               else
-               {
-                  producer.send(message);
-               }
-               if (testTime)
-               {
-                  System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
-               }
-            }
+            session.commit();
          }
 
          session.close();
@@ -217,7 +206,13 @@
          
          sf.setMinLargeMessageSize(minSizeConsumer);
 
-         session = sf.createSession(null, null, false, true, true, preAck, 0);
+         session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+         
+         if (isXA)
+         {
+            xid = newXID();
+            session.start(xid, XAResource.TMNOFLAGS);
+         }
 
          ClientConsumer consumer = null;
 
@@ -271,6 +266,19 @@
                   message.acknowledge();
                }
 
+               if (isXA)
+               {
+                  session.end(xid, XAResource.TMSUCCESS);
+                  session.commit(xid, true);
+                  xid = newXID();
+                  session.start(xid, XAResource.TMNOFLAGS);
+               }
+               else
+               {
+                  session.commit();
+               }
+
+
                assertNotNull(message);
 
                if (delayDelivery <= 0)
@@ -302,7 +310,17 @@
             if (iteration == 0)
             {
                consumer.close();
-               session.rollback();
+               if (isXA)
+               {
+                  session.end(xid, XAResource.TMSUCCESS);
+                  session.rollback(xid);
+                  xid = newXID();
+                  session.start(xid, XAResource.TMNOFLAGS);
+               }
+               else
+               {
+                  session.rollback();
+               }
             }
          }
 
@@ -330,6 +348,85 @@
       }
    }
 
+   /**
+    * @param useFile
+    * @param numberOfMessages
+    * @param numberOfIntegers
+    * @param delayDelivery
+    * @param testTime
+    * @param session
+    * @param producer
+    * @throws FileNotFoundException
+    * @throws IOException
+    * @throws MessagingException
+    */
+   private void sendMessages(final boolean useFile,
+                             final int numberOfMessages,
+                             final int numberOfIntegers,
+                             final long delayDelivery,
+                             final boolean testTime,
+                             ClientSession session,
+                             ClientProducer producer) throws FileNotFoundException, IOException, MessagingException
+   {
+      if (useFile)
+      {
+         File tmpData = createLargeFile(getTemporaryDir(), "someFile.dat", numberOfIntegers);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createFileMessage(true);
+            ((ClientFileMessage)message).setFile(tmpData);
+            message.putIntProperty(new SimpleString("counter-message"), i);
+            long timeStart = System.currentTimeMillis();
+            if (delayDelivery > 0)
+            {
+               long time = System.currentTimeMillis();
+               message.putLongProperty(new SimpleString("original-time"), time);
+               message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+
+               producer.send(message);
+            }
+            else
+            {
+               producer.send(message);
+            }
+            if (testTime)
+            {
+               System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+            }
+         }
+         
+         
+      }
+      else
+      {
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createClientMessage(true);
+            message.putIntProperty(new SimpleString("counter-message"), i);
+            message.setBody(createLargeBuffer(numberOfIntegers));
+            long timeStart = System.currentTimeMillis();
+            if (delayDelivery > 0)
+            {
+               long time = System.currentTimeMillis();
+               message.putLongProperty(new SimpleString("original-time"), time);
+               message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+
+               producer.send(message);
+            }
+            else
+            {
+               producer.send(message);
+            }
+ 
+            if (testTime)
+            {
+               System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+            }
+         }
+      }
+   }
+
    protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
    {
       MessagingBuffer body = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers); 
@@ -345,9 +442,14 @@
 
    protected ClientFileMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
    {
+      return createLargeClientMessage(session, numberOfIntegers, true);
+   }
 
-      ClientFileMessage clientMessage = session.createFileMessage(true);
+   protected ClientFileMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers, boolean persistent) throws Exception
+   {
 
+      ClientFileMessage clientMessage = session.createFileMessage(persistent);
+
       File tmpFile = createLargeFile(getTemporaryDir(), "tmpUpload.data", numberOfIntegers);
 
       clientMessage.setFile(tmpFile);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -26,6 +26,9 @@
 import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import junit.framework.AssertionFailedError;
 
 import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -284,34 +287,64 @@
 
    public void testMessageChunkFilePersistence() throws Exception
    {
-      testChunks(true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
    }
 
+   public void testMessageChunkFilePersistenceXA() throws Exception
+   {
+      testChunks(true, true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+   }
+
    public void testMessageChunkFilePersistenceBlocked() throws Exception
    {
-      testChunks(true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
    }
 
+   public void testMessageChunkFilePersistenceBlockedXA() throws Exception
+   {
+      testChunks(true, true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+   }
+
    public void testMessageChunkFilePersistenceBlockedPreACK() throws Exception
    {
-      testChunks(true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
    }
 
+   public void testMessageChunkFilePersistenceBlockedPreACKXA() throws Exception
+   {
+      testChunks(true, true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+   }
+
    public void testMessageChunkFilePersistenceDelayed() throws Exception
    {
-      testChunks(true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
+      testChunks(false, true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
    }
 
+   public void testMessageChunkFilePersistenceDelayedXA() throws Exception
+   {
+      testChunks(true, true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
+   }
+
    public void testMessageChunkNullPersistence() throws Exception
    {
-      testChunks(false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
    }
 
+   public void testMessageChunkNullPersistenceXA() throws Exception
+   {
+      testChunks(true, false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
+   }
+
    public void testMessageChunkNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
+      testChunks(false, false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
    }
 
+   public void testMessageChunkNullPersistenceDelayedXA() throws Exception
+   {
+      testChunks(true, false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
+   }
+
    public void testPageOnLargeMessage() throws Exception
    {
       testPageOnLargeMessage(true, false);
@@ -325,46 +358,85 @@
 
    public void testSendfileMessage() throws Exception
    {
-      testChunks(true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+   }
 
+
+   public void testSendfileMessageXA() throws Exception
+   {
+      testChunks(true, true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendfileMessageOnNullPersistence() throws Exception
    {
-      testChunks(false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
    }
 
+   public void testSendfileMessageOnNullPersistenceXA() throws Exception
+   {
+      testChunks(true, false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+   }
+
    public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
    {
-      testChunks(false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
    }
 
+   public void testSendfileMessageOnNullPersistenceSmallMessageXA() throws Exception
+   {
+      testChunks(true, false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+   }
+
    public void testSendfileMessageSmallMessage() throws Exception
    {
-      testChunks(true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
+   }
 
+   public void testSendfileMessageSmallMessageXA() throws Exception
+   {
+      testChunks(true, true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendRegularMessageNullPersistence() throws Exception
    {
-      testChunks(false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
    }
 
+   public void testSendRegularMessageNullPersistenceXA() throws Exception
+   {
+      testChunks(true, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+   }
+
    public void testSendRegularMessageNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+      testChunks(false, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
    }
 
+   public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception
+   {
+      testChunks(true, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+   }
+
    public void testSendRegularMessagePersistence() throws Exception
    {
-      testChunks(true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+      testChunks(false, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
    }
 
+   public void testSendRegularMessagePersistenceXA() throws Exception
+   {
+      testChunks(true, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+   }
+
    public void testSendRegularMessagePersistenceDelayed() throws Exception
    {
-      testChunks(true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+      testChunks(false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
    }
 
+   public void testSendRegularMessagePersistenceDelayedXA() throws Exception
+   {
+      testChunks(false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+   }
+
    public void testTwoBindingsTwoStartedConsumers() throws Exception
    {
       // there are two bindings.. one is ACKed, the other is not, the server is restarted
@@ -521,9 +593,77 @@
       }
 
    }
+   
+   public void testSendRollback() throws Exception
+   {
+      clearData();
+      
+      boolean isXA = false;
+      
+      messagingService = createService(true);
 
+      messagingService.start();
+
+      ClientSessionFactory sf = createInVMFactory();
+     
+      ClientSession session = sf.createSession(isXA, false, false);
+      
+      session.createQueue(ADDRESS, ADDRESS, true);
+      
+      Xid xid = null;
+      
+      if (isXA)
+      {
+         xid = newXID();
+         session.start(xid, XAResource.TMNOFLAGS);
+      }
+      
+      
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      
+      Message clientFile = createLargeClientMessage(session, 50000, false);
+
+      for (int i = 0; i < 1; i++)
+      {
+         producer.send(clientFile);
+      }
+      
+      
+
+      if (isXA)
+      {
+         session.end(xid, XAResource.TMSUCCESS);
+         session.prepare(xid);
+         session.rollback(xid);
+      }
+      else
+      {
+         session.rollback();
+      }
+      
+      session.close();
+      
+      validateNoFilesOnLargeDir();
+      
+      messagingService.stop();
+
+   }
+
+   
    public void testSimpleRollback() throws Exception
    {
+      simpleRollbackInternalTest(false);
+   }
+   
+   public void testSimpleRollbackXA() throws Exception
+   {
+      simpleRollbackInternalTest(true);
+   }
+   
+   
+   public void simpleRollbackInternalTest(boolean isXA) throws Exception
+   {
       // there are two bindings.. one is ACKed, the other is not, the server is restarted
       // The other binding is acked... The file must be deleted
 
@@ -538,14 +678,20 @@
 
          ClientSessionFactory sf = createInVMFactory();
 
-         ClientSession session = sf.createSession(false, false, false);
+         ClientSession session = sf.createSession(isXA, false, false);
+         
+         Xid xid = null;
+         
+         if (isXA)
+         {
+            xid = newXID();
+            session.start(xid, XAResource.TMNOFLAGS);
+         }
 
          session.createQueue(ADDRESS, ADDRESS, null, true, false);
 
-         int numberOfIntegers = 100;
+         int numberOfIntegers = 50000;
 
-         Message clientFile = createLargeClientMessage(session, numberOfIntegers);
-
          session.start();
 
          log.info ("Session started");
@@ -556,12 +702,40 @@
          
          for (int n = 0; n < 10; n++)
          {
+            Message clientFile = createLargeClientMessage(session, numberOfIntegers, n%2 == 0);
+
             producer.send(clientFile);
 
             assertNull(consumer.receiveImmediate());
 
-            session.commit();
+            if (isXA)
+            {
+               session.end(xid, XAResource.TMSUCCESS);
+               session.rollback(xid);
+               xid = newXID();
+               session.start(xid, XAResource.TMNOFLAGS);
+            }
+            else
+            {
+               session.rollback();
+            }
 
+            producer.send(clientFile);
+
+            assertNull(consumer.receiveImmediate());
+
+            if (isXA)
+            {
+               session.end(xid, XAResource.TMSUCCESS);
+               session.commit(xid, true);
+               xid = newXID();
+               session.start(xid, XAResource.TMNOFLAGS);
+            }
+            else
+            {
+               session.commit();
+            }
+
             for (int i = 0; i < 2; i++)
             {
 
@@ -573,13 +747,34 @@
 
                clientMessage.acknowledge();
 
-               if (i == 0)
+               if (isXA)
                {
-                  session.rollback();
+                  if (i == 0)
+                  {
+                     session.end(xid, XAResource.TMSUCCESS);
+                     session.prepare(xid);
+                     session.rollback(xid);
+                     xid = newXID();
+                     session.start(xid, XAResource.TMNOFLAGS);
+                  }
+                  else
+                  {
+                     session.end(xid, XAResource.TMSUCCESS);
+                     session.commit(xid, true);
+                     xid = newXID();
+                     session.start(xid, XAResource.TMNOFLAGS);
+                  }
                }
                else
                {
-                  session.commit();
+                  if (i == 0)
+                  {
+                     session.rollback();
+                  }
+                  else
+                  {
+                     session.commit();
+                  }
                }
             }
          }

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -0,0 +1,292 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * A MultiThreadFailoverSupport
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Mar 17, 2009 11:15:02 AM
+ *
+ *
+ */
+public abstract class MultiThreadFailoverSupport extends UnitTestCase
+{
+
+   // Constants -----------------------------------------------------
+   
+   private final Logger log = Logger.getLogger(this.getClass());
+
+   // Attributes ----------------------------------------------------
+
+   protected Timer timer;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected abstract void start() throws Exception;
+
+   protected abstract void stop() throws Exception;
+   
+   protected abstract ClientSessionFactoryInternal createSessionFactory();
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      timer = new Timer();
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      timer.cancel();
+      super.tearDown();
+   }
+   
+   protected boolean shouldFail()
+   {
+      return true;
+   }
+
+
+   
+   protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
+                                       final int numThreads,
+                                       final int numIts,
+                                       final boolean failOnCreateConnection,
+                                       final long failDelay) throws Exception
+   {
+      for (int its = 0; its < numIts; its++)
+      {
+         log.info("************ ITERATION: " + its);
+
+         start();
+
+         final ClientSessionFactoryInternal sf = createSessionFactory();
+
+         final ClientSession session = sf.createSession(false, true, true);
+
+         Failer failer = startFailer(failDelay, session, failOnCreateConnection);
+
+         class Runner extends Thread
+         {
+            private volatile Throwable throwable;
+
+            private final RunnableT test;
+
+            private final int threadNum;
+
+            Runner(final RunnableT test, final int threadNum)
+            {
+               this.test = test;
+
+               this.threadNum = threadNum;
+            }
+
+            @Override
+            public void run()
+            {
+               try
+               {
+                  test.run(sf, threadNum);
+               }
+               catch (Throwable t)
+               {
+                  throwable = t;
+
+                  log.error("Failed to run test", t);
+
+                  // Case a failure happened here, it should print the Thread dump
+                  // Sending it to System.out, as it would show on the Tests report
+                  System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
+               }
+            }
+         }
+
+         do
+         {
+            List<Runner> threads = new ArrayList<Runner>();
+
+            for (int i = 0; i < numThreads; i++)
+            {
+               Runner runner = new Runner(runnable, i);
+
+               threads.add(runner);
+
+               runner.start();
+            }
+
+            for (Runner thread : threads)
+            {
+               thread.join();
+
+               if (thread.throwable != null)
+               {
+                  throw new Exception("Exception on thread " + thread, thread.throwable);
+               }
+            }
+
+            log.info("completed loop");
+
+            runnable.checkFail();
+
+         }
+         while (!failer.isExecuted());
+
+         InVMConnector.resetFailures();
+
+         log.info("closing session");
+         session.close();
+         log.info("closed session");
+
+         assertEquals(0, sf.numSessions());
+
+         assertEquals(0, sf.numConnections());
+
+         log.info("stopping");
+         stop();
+         log.info("stopped");
+      }
+   }
+
+
+   // Private -------------------------------------------------------
+
+   private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
+   {
+      Failer failer = new Failer(session, failOnCreateConnection);
+
+      // This is useful for debugging.. just change shouldFail to return false, and Failer will not be executed
+      if (shouldFail())
+      {
+         timer.schedule(failer, (long)(time * Math.random()), 100);
+      }
+
+      return failer;
+   }
+
+
+   // Inner classes -------------------------------------------------
+
+ 
+   protected abstract class RunnableT extends Thread
+   {
+      private volatile String failReason;
+
+      private volatile Throwable throwable;
+
+      public void setFailed(final String reason, final Throwable throwable)
+      {
+         failReason = reason;
+         this.throwable = throwable;
+      }
+
+      public void checkFail()
+      {
+         if (throwable != null)
+         {
+            log.error("Test failed: " + failReason, throwable);
+         }
+         if (failReason != null)
+         {
+            fail(failReason);
+         }
+      }
+
+      public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
+   }
+
+   
+
+   private class Failer extends TimerTask
+   {
+      private final ClientSession session;
+
+      private boolean executed;
+
+      private final boolean failOnCreateConnection;
+
+      public Failer(final ClientSession session, final boolean failOnCreateConnection)
+      {
+         this.session = session;
+
+         this.failOnCreateConnection = failOnCreateConnection;
+      }
+
+      @Override
+      public synchronized void run()
+      {
+         log.info("** Failing connection");
+
+         RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+         if (failOnCreateConnection)
+         {
+            InVMConnector.numberOfFailures = 1;
+            InVMConnector.failOnCreateConnection = true;
+         }
+         else
+         {
+            conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+         }
+
+         log.info("** Fail complete");
+
+         cancel();
+
+         executed = true;
+      }
+
+      public synchronized boolean isExecuted()
+      {
+         log.info("executed??" + executed);
+         return executed;
+      }
+   }
+
+   
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -22,14 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.failover;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -41,17 +38,13 @@
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.jms.client.JBossBytesMessage;
 import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -62,7 +55,7 @@
  * 
  *
  */
-public abstract class MultiThreadRandomFailoverTestBase extends UnitTestCase
+public abstract class MultiThreadRandomFailoverTestBase extends MultiThreadFailoverSupport
 {
 
    private final Logger log = Logger.getLogger(getClass());
@@ -73,7 +66,7 @@
    
    private final int LATCH_WAIT = getLatchWait();
 
-   private static final int NUM_THREADS = 10;
+   private int NUM_THREADS = getNumThreads();
 
    // Attributes ----------------------------------------------------
    protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
@@ -233,7 +226,7 @@
          {
             doTestL(sf);
          }
-      }, NUM_THREADS, false, true, 10);
+      }, NUM_THREADS, true, 10);
    }
 
    // public void testM() throws Exception
@@ -268,6 +261,11 @@
    protected abstract void setBody(ClientMessage message) throws Exception;
 
    protected abstract boolean checkSize(ClientMessage message);
+   
+   protected int getNumThreads()
+   {
+      return 10;
+   }
 
    protected ClientSession createAutoCommitSession(ClientSessionFactory sf) throws Exception
    {
@@ -1333,128 +1331,22 @@
 
    // Private -------------------------------------------------------
 
-   private void runTestMultipleThreads(final RunnableT runnable, final int numThreads, final boolean fileBased) throws Exception
-   {
-      runTestMultipleThreads(runnable, numThreads, fileBased, false);
-   }
-
    private void runTestMultipleThreads(final RunnableT runnable,
                                        final int numThreads,
-                                       final boolean fileBased,
                                        final boolean failOnCreateConnection) throws Exception
    {
-      this.runTestMultipleThreads(runnable, numThreads, fileBased, failOnCreateConnection, 1000);
+      this.runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
    }
 
    private void runTestMultipleThreads(final RunnableT runnable,
                                        final int numThreads,
-                                       final boolean fileBased,
                                        final boolean failOnCreateConnection,
                                        final long failDelay) throws Exception
    {
-      final int numIts = getNumIterations();
-
-      for (int its = 0; its < numIts; its++)
-      {
-         log.info("************ ITERATION: " + its);
-
-         start();
-
-         final ClientSessionFactoryInternal sf = createSessionFactory();
-
-         final ClientSession session = sf.createSession(false, true, true);
-
-         Failer failer = startFailer(failDelay, session, failOnCreateConnection);
-
-         class Runner extends Thread
-         {
-            private volatile Throwable throwable;
-
-            private final RunnableT test;
-
-            private final int threadNum;
-
-            Runner(final RunnableT test, final int threadNum)
-            {
-               this.test = test;
-
-               this.threadNum = threadNum;
-            }
-
-            @Override
-            public void run()
-            {
-               try
-               {
-                  test.run(sf, threadNum);
-               }
-               catch (Throwable t)
-               {
-                  throwable = t;
-                  // Case a failure happened here, it should print the Thread dump
-                  // Sending it to System.out, as it would show on the Tests report
-                  System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads"));
-
-                  log.error("Failed to run test", t);
-               }
-            }
-         }
-
-         do
-         {
-            List<Runner> threads = new ArrayList<Runner>();
-
-            for (int i = 0; i < numThreads; i++)
-            {
-               Runner runner = new Runner(runnable, i);
-
-               threads.add(runner);
-
-               runner.start();
-            }
-
-            for (Runner thread : threads)
-            {
-               thread.join();
-
-               if (thread.throwable != null)
-               {
-                  throw new Exception("Exception on thread " + thread, thread.throwable);
-               }
-            }
-
-            log.info("completed loop");
-
-            runnable.checkFail();
-
-         }
-         while (!failer.isExecuted());
-
-         InVMConnector.resetFailures();
-
-         log.info("closing session");
-         session.close();
-         log.info("closed session");
-
-         assertEquals(0, sf.numSessions());
-
-         assertEquals(0, sf.numConnections());
-
-         log.info("stopping");
-         stop();
-         log.info("stopped");
-      }
+      
+      runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(), failOnCreateConnection, failDelay);
    }
 
-   private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
-   {
-      Failer failer = new Failer(session, failOnCreateConnection);
-
-      timer.schedule(failer, (long)(time * Math.random()), 100);
-
-      return failer;
-   }
-
    /**
     * @return
     */
@@ -1472,7 +1364,7 @@
       return sf;
    }
 
-   private void stop() throws Exception
+   protected void stop() throws Exception
    {
       backupService.stop();
 
@@ -1552,79 +1444,6 @@
 
    // Inner classes -------------------------------------------------
 
-   private class Failer extends TimerTask
-   {
-      private final ClientSession session;
-
-      private boolean executed;
-
-      private final boolean failOnCreateConnection;
-
-      public Failer(final ClientSession session, final boolean failOnCreateConnection)
-      {
-         this.session = session;
-
-         this.failOnCreateConnection = failOnCreateConnection;
-      }
-
-      @Override
-      public synchronized void run()
-      {
-         log.info("** Failing connection");
-
-         RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
-
-         if (failOnCreateConnection)
-         {
-            InVMConnector.numberOfFailures = 1;
-            InVMConnector.failOnCreateConnection = true;
-         }
-         else
-         {
-            conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
-         }
-
-         log.info("** Fail complete");
-
-         cancel();
-
-         executed = true;
-      }
-
-      public synchronized boolean isExecuted()
-      {
-         log.info("executed??" + executed);
-         return executed;
-      }
-   }
-
-   private abstract class RunnableT extends Thread
-   {
-      private volatile String failReason;
-
-      private volatile Throwable throwable;
-
-      public void setFailed(final String reason, final Throwable throwable)
-      {
-         failReason = reason;
-         this.throwable = throwable;
-      }
-
-      public void checkFail()
-      {
-         if (throwable != null)
-         {
-            log.error("Test failed: " + failReason, throwable);
-         }
-         if (failReason != null)
-         {
-            fail(failReason);
-         }
-      }
-
-      public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
-   }
-
    private class MyHandler implements MessageHandler
    {
       CountDownLatch latch = new CountDownLatch(1);
@@ -1647,7 +1466,7 @@
          
          failure = null;
          
-         latch = new CountDownLatch(1);;
+         latch = new CountDownLatch(1);
       }
 
       MyHandler(final int threadNum, final int numMessages)

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -0,0 +1,150 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.Messaging;
+
+/**
+ * A LargeMessageMultiThreadFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Jan 18, 2009 4:52:09 PM
+ *
+ *
+ */
+public class XALargeMessageMultiThreadFailoverTest extends XAMultiThreadRandomFailoverTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private final byte[] BODY = new byte[500];
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   @Override
+   protected ClientSessionFactoryInternal createSessionFactory()
+   {
+      ClientSessionFactoryInternal sf = super.createSessionFactory();
+      sf.setMinLargeMessageSize(200);
+      return sf;
+
+   }
+
+   @Override
+   protected void start() throws Exception
+   {
+
+      deleteDirectory(new File(getTestDir()));
+
+      Configuration backupConf = new ConfigurationImpl();
+
+      backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+      backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+      backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+      backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+      backupConf.setJournalFileSize(100 * 1024);
+
+      backupConf.setJournalType(JournalType.NIO);
+
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+      backupConf.setBackup(true);
+
+      backupService = Messaging.newMessagingService(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+
+      liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+      liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+      liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+      liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+      liveConf.setJournalFileSize(100 * 1024);
+
+      liveConf.setJournalType(JournalType.NIO);
+
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveService = Messaging.newMessagingService(liveConf);
+
+      liveService.start();
+
+   }
+
+   @Override
+   protected void setBody(final ClientMessage message) throws Exception
+   {
+
+      message.getBody().writeBytes(BODY);
+
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+    */
+   @Override
+   protected boolean checkSize(final ClientMessage message)
+   {
+      return BODY.length == message.getBodySize();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XAMultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XAMultiThreadRandomFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XAMultiThreadRandomFailoverTest.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -0,0 +1,896 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A MultiThreadRandomFailoverStressTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class XAMultiThreadRandomFailoverTest extends MultiThreadFailoverSupport
+{
+   protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private static final int RECEIVE_TIMEOUT = 30000;
+
+   private final Logger log = Logger.getLogger(getClass());
+
+   protected MessagingService liveService;
+
+   protected MessagingService backupService;
+
+   protected final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   protected Map<ClientSession, Xid> xids;
+
+   private int NUM_THREADS = getNumThreads();
+
+   private final int LATCH_WAIT = getLatchWait();
+
+   private final int NUM_SESSIONS = getNumSessions();
+
+   protected int getNumSessions()
+   {
+      return 10;
+   }
+
+   protected int getLatchWait()
+   {
+      return 20000;
+   }
+
+   protected int getNumThreads()
+   {
+      return 10;
+   }
+
+   protected int getNumIterations()
+   {
+      return 2;
+   }
+
+   protected boolean shouldFail()
+   {
+      return true;
+   }
+
+   protected ClientSession createTransactionalSession(ClientSessionFactory sf) throws Exception
+   {
+      ClientSession sess = sf.createSession(true, false, false);
+      return sess;
+   }
+
+   public void testC() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestC(sf, threadNum);
+         }
+      }, NUM_THREADS, false, 3000);
+   }
+
+   public void testD() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestD(sf, threadNum);
+         }
+      }, NUM_THREADS, false, 3000);
+   }
+
+   public void testG() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestG(sf, threadNum);
+         }
+      }, NUM_THREADS, false, 3000);
+   }
+
+   public void testH() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestH(sf, threadNum);
+         }
+      }, NUM_THREADS, false, 3000);
+   }
+
+   protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = NUM_SESSIONS;
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = createTransactionalSession(sf);
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         MyHandler handler = new MyHandler(threadNum, numMessages, sessConsume, consumer);
+
+         handler.setCommitOnComplete(false);
+
+         handler.start();
+
+         handlers.add(handler);
+      }
+
+      ClientSession sessSend = createTransactionalSession(sf);
+
+      transactionallySendMessages(threadNum, numMessages, sessSend);
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         handler.setCommitOnComplete(true);
+         handler.start();
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+      }
+
+      sessSend.close();
+
+      for (MyHandler handler : handlers)
+      {
+         handler.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      for (MyHandler handler : handlers)
+      {
+         if (handler.failure != null)
+         {
+            fail(handler.failure);
+         }
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+         ClientSession sessConsume = createTransactionalSession(sf);
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         MyHandler handler = new MyHandler(threadNum, numMessages, sessConsume, consumer);
+
+         handlers.add(handler);
+      }
+
+      ClientSession sessSend = createTransactionalSession(sf);
+
+      transactionallySendMessages(threadNum, numMessages, sessSend);
+
+      for (MyHandler handler : handlers)
+      {
+         handler.session.start();
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         handler.start();
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
+      }
+
+      Set<MyHandler> newhandlers = new HashSet<MyHandler>();
+
+      for (MyHandler handler : handlers)
+      {
+         MyHandler newHandler = new MyHandler(threadNum, numMessages, handler.session, handler.consumer);
+         newHandler.setCommitOnComplete(true);
+         newHandler.start();
+         newhandlers.add(newHandler);
+      }
+
+      handlers.clear();
+
+      handlers = newhandlers;
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed on rollback: " + handler.failure);
+         }
+      }
+
+      sessSend.close();
+
+      for (MyHandler handler : handlers)
+      {
+         handler.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = getNumSessions();
+
+      Set<MyInfo> myinfos = new HashSet<MyInfo>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(true, false, false);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         Xid xid = newXID();
+
+         sessConsume.start(xid, XAResource.TMNOFLAGS);
+
+         myinfos.add(new MyInfo(sessConsume, consumer, xid));
+      }
+
+      ClientSession sessSend = sf.createSession(true, false, false);
+
+      transactionallySendMessages(threadNum, numMessages, sessSend);
+      consumeMessages(myinfos, numMessages, threadNum);
+
+      for (MyInfo info : myinfos)
+      {
+         info.session.end(info.xid, XAResource.TMSUCCESS);
+         info.session.prepare(info.xid);
+         info.session.rollback(info.xid);
+         info.xid = newXID();
+         info.session.start(info.xid, XAResource.TMNOFLAGS);
+      }
+
+      consumeMessages(myinfos, numMessages, threadNum);
+
+      for (MyInfo info : myinfos)
+      {
+         info.session.end(info.xid, XAResource.TMSUCCESS);
+         info.session.prepare(info.xid);
+         info.session.commit(info.xid, false);
+         info.xid = null;
+      }
+
+      sessSend.close();
+      for (MyInfo info : myinfos)
+      {
+         info.session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<MyInfo> myinfos = new HashSet<MyInfo>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(true, false, false);
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         Xid xid = newXID();
+
+         sessConsume.start(xid, XAResource.TMNOFLAGS);
+
+         myinfos.add(new MyInfo(sessConsume, consumer, xid));
+      }
+
+      ClientSession sessSend = sf.createSession(true, false, false);
+
+      transactionallySendMessages(threadNum, numMessages, sessSend);
+
+      for (MyInfo info : myinfos)
+      {
+         info.session.start();
+      }
+
+      consumeMessages(myinfos, numMessages, threadNum);
+
+      for (MyInfo info : myinfos)
+      {
+         info.session.end(info.xid, XAResource.TMSUCCESS);
+         info.session.prepare(info.xid);
+         info.session.rollback(info.xid);
+         info.xid = newXID();
+         info.session.start(info.xid, XAResource.TMNOFLAGS);
+      }
+
+      consumeMessages(myinfos, numMessages, threadNum);
+
+      for (MyInfo info : myinfos)
+      {
+         info.session.end(info.xid, XAResource.TMSUCCESS);
+         info.session.prepare(info.xid);
+         info.session.commit(info.xid, false);
+         info.xid = null;
+      }
+
+      sessSend.close();
+      for (MyInfo info : myinfos)
+      {
+         info.session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
+   /**
+    * @param threadNum
+    * @param numMessages
+    * @param sessSend
+    * @throws XAException
+    * @throws MessagingException
+    * @throws Exception
+    */
+   private void transactionallySendMessages(final int threadNum, final int numMessages, ClientSession sessSend) throws XAException,
+                                                                                                               MessagingException,
+                                                                                                               Exception
+   {
+      Xid xid = newXID();
+      sessSend.start(xid, XAResource.TMNOFLAGS);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.end(xid, XAResource.TMSUCCESS);
+      sessSend.rollback(xid);
+
+      xid = newXID();
+      sessSend.start(xid, XAResource.TMNOFLAGS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      sessSend.end(xid, XAResource.TMSUSPEND);
+
+      sessSend.start(xid, XAResource.TMRESUME);
+
+      sessSend.end(xid, XAResource.TMSUCCESS);
+
+      sessSend.commit(xid, true);
+   }
+
+   private void consumeMessages(final Set<MyInfo> myinfos, final int numMessages, final int threadNum) throws Exception
+   {
+      // We make sure the messages arrive in the order they were sent from a particular producer
+      Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (MyInfo myinfo : myinfos)
+         {
+            Map<Integer, Integer> consumerCounts = counts.get(myinfo);
+
+            if (consumerCounts == null)
+            {
+               consumerCounts = new HashMap<Integer, Integer>();
+               counts.put(myinfo.consumer, consumerCounts);
+            }
+
+            ClientMessage msg = myinfo.consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+            int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+
+            Integer c = consumerCounts.get(tn);
+            if (c == null)
+            {
+               c = new Integer(cnt);
+            }
+
+            if (tn == threadNum && cnt != c.intValue())
+            {
+               throw new Exception("Invalid count, expected " + tn + ": " + c + " got " + cnt);
+            }
+
+            c++;
+
+            // Wrap
+            if (c == numMessages)
+            {
+               c = 0;
+            }
+
+            consumerCounts.put(tn, c);
+
+            msg.acknowledge();
+         }
+      }
+   }
+
+   @Override
+   protected void start() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = Messaging.newNullStorageMessagingService(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+      TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveService = Messaging.newNullStorageMessagingService(liveConf);
+      liveService.start();
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadFailoverSupport#stop()
+    */
+   @Override
+   protected void stop() throws Exception
+   {
+      backupService.stop();
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   @Override
+   protected ClientSessionFactoryInternal createSessionFactory()
+   {
+      final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                           new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                      backupParams),
+                                                                           0,
+                                                                           1,
+                                                                           ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                           ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
+
+      sf.setSendWindowSize(32 * 1024);
+      return sf;
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#setBody(org.jboss.messaging.core.client.ClientMessage)
+    */
+   protected void setBody(final ClientMessage message) throws Exception
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+    */
+   protected boolean checkSize(final ClientMessage message)
+   {
+      return 0 == message.getBody().writerIndex();
+   }
+
+   private void runTestMultipleThreads(final RunnableT runnable,
+                                       final int numThreads,
+                                       final boolean failOnCreateConnection,
+                                       final long failDelay) throws Exception
+   {
+
+      runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(), failOnCreateConnection, failDelay);
+   }
+
+   private void sendMessages(final ClientSession sessSend,
+                             final ClientProducer producer,
+                             final int numMessages,
+                             final int threadNum) throws Exception
+   {
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossBytesMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("threadnum"), threadNum);
+         message.putIntProperty(new SimpleString("count"), i);
+         setBody(message);
+         producer.send(message);
+      }
+   }
+
+   private class MyInfo
+   {
+      final ClientSession session;
+
+      Xid xid;
+
+      final ClientConsumer consumer;
+
+      public MyInfo(final ClientSession session, final ClientConsumer consumer, final Xid xid)
+      {
+         this.session = session;
+         this.consumer = consumer;
+         this.xid = xid;
+      }
+   }
+
+   private class MyHandler implements MessageHandler
+   {
+      CountDownLatch latch = new CountDownLatch(1);
+
+      private final Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
+
+      volatile String failure;
+
+      final int tn;
+
+      final int numMessages;
+
+      final ClientSession session;
+
+      final ClientConsumer consumer;
+
+      volatile Xid xid;
+
+      volatile boolean done;
+
+      volatile boolean started = false;
+
+      volatile boolean commit = false;
+
+      synchronized void start() throws Exception
+      {
+         counts.clear();
+
+         done = false;
+
+         failure = null;
+
+         latch = new CountDownLatch(1);
+
+         xid = newXID();
+         session.start(xid, XAResource.TMNOFLAGS);
+         started = true;
+         consumer.setMessageHandler(this);
+         session.start();
+      }
+
+      synchronized void stop() throws Exception
+      {
+         session.stop();
+         // FIXME: Remove this line when https://jira.jboss.org/jira/browse/JBMESSAGING-1549 is done
+         consumer.setMessageHandler(null);
+         started = false;
+      }
+
+      synchronized void close() throws Exception
+      {
+         stop();
+         session.close();
+      }
+
+      private synchronized void rollback()
+      {
+         try
+         {
+            stop();
+            session.end(xid, XAResource.TMSUCCESS);
+            session.prepare(xid);
+            session.rollback(xid);
+         }
+         catch (Exception e)
+         {
+            this.failure = e.getLocalizedMessage();
+         }
+      }
+
+      private synchronized void commit()
+      {
+         try
+         {
+            stop();
+            
+            // Suspend & resume... just exercising the API as part of the test
+            session.end(xid, XAResource.TMSUSPEND);
+            session.start(xid, XAResource.TMRESUME);
+            
+            session.end(xid, XAResource.TMSUCCESS);
+            session.prepare(xid);
+            session.commit(xid, false);
+         }
+         catch (Exception e)
+         {
+            this.failure = e.getLocalizedMessage();
+         }
+      }
+
+      MyHandler(final int threadNum, final int numMessages, final ClientSession session, final ClientConsumer consumer) throws Exception
+      {
+         tn = threadNum;
+
+         this.numMessages = numMessages;
+
+         this.session = session;
+
+         this.consumer = consumer;
+
+      }
+
+      public void setCommitOnComplete(boolean commit)
+      {
+         this.commit = commit;
+      }
+
+      public synchronized void onMessage(final ClientMessage message)
+      {
+
+         if (!started)
+         {
+            this.failure = "Received message with session stopped (thread = " + tn + ")";
+            log.error(failure);
+            return;
+         }
+
+         // log.info("*** handler got message");
+         try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException me)
+         {
+            log.error("Failed to process", me);
+         }
+
+         if (done)
+         {
+            return;
+         }
+
+         int threadNum = (Integer)message.getProperty(new SimpleString("threadnum"));
+         int cnt = (Integer)message.getProperty(new SimpleString("count"));
+
+         Integer c = counts.get(threadNum);
+         if (c == null)
+         {
+            c = new Integer(cnt);
+         }
+
+         // log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
+
+         if (tn == threadNum && cnt != c.intValue())
+         {
+            failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
+            log.error(failure);
+
+            latch.countDown();
+         }
+
+         if (!checkSize(message))
+         {
+            failure = "Invalid size on message";
+            log.error(failure);
+            latch.countDown();
+         }
+
+         if (tn == threadNum && c == numMessages - 1)
+         {
+            done = true;
+            if (commit)
+            {
+               commit();
+            }
+            else
+            {
+               rollback();
+            }
+            latch.countDown();
+         }
+
+         c++;
+         // Wrap around at numMessages
+         if (c == numMessages)
+         {
+            c = 0;
+         }
+
+         counts.put(threadNum, c);
+
+      }
+   }
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java	2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -326,7 +326,7 @@
 
       session.close();
    }
-   
+
    public void testEmptyXID() throws Exception
    {
       Xid xid = newXID();
@@ -336,28 +336,55 @@
       session.rollback(xid);
 
       session.close();
-      
+
       messagingService.stop();
+
+      // do the same test with a file persistence now
+      messagingService = createService(true, configuration, addressSettings);
+
+      messagingService.start();
+
+      sessionFactory = createInVMFactory();
+
+      xid = newXID();
+      session = sessionFactory.createSession(true, false, false);
+      session.start(xid, XAResource.TMNOFLAGS);
+      session.end(xid, XAResource.TMSUCCESS);
+      session.rollback(xid);
+
+      xid = newXID();
+      session.start(xid, XAResource.TMNOFLAGS);
+      session.end(xid, XAResource.TMSUCCESS);
+      session.prepare(xid);
+      session.commit(xid, false);
+
+
+      xid = newXID();
+      session = sessionFactory.createSession(true, false, false);
+      session.start(xid, XAResource.TMNOFLAGS);
+      session.end(xid, XAResource.TMSUCCESS);
+      session.prepare(xid);
+      session.rollback(xid);
+
       
-      // Enable this when https://jira.jboss.org/jira/browse/JBMESSAGING-1548 is done
-      
-//      // do the same test with a file persistence now
-//      messagingService = createService(true, configuration, addressSettings);
-//      
-//      messagingService.start();
-//      
-//      sessionFactory = createInVMFactory();
-//      
-//      xid = newXID();
-//      session = sessionFactory.createSession(true, false, false);
-//      session.start(xid, XAResource.TMNOFLAGS);
-//      session.end(xid, XAResource.TMSUCCESS);
-//      session.rollback(xid);
-      
+      session.close();
 
+      messagingService.stop();
+      messagingService.start();
+
+      // This is not really necessary... But since the server has stopped, I would prefer to keep recreating the factory
+      sessionFactory = createInVMFactory();
+
+      session = sessionFactory.createSession(true, false, false);
+
+      Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+
+      assertEquals(0, xids.length);
+
+      session.close();
+
    }
 
-
    public void testForget() throws Exception
    {
       clientSession.forget(newXID());

Modified: trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2009-03-18 21:49:31 UTC (rev 6110)
@@ -49,8 +49,9 @@
 
    public void testMessageChunkFilePersistence1G() throws Exception
    {
-      testChunks(true,
+      testChunks(false, 
                  true,
+                 true,
                  false,
                  true,
                  false,




More information about the jboss-cvs-commits mailing list