[jboss-cvs] JBoss Messaging SVN: r6776 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 13 22:14:30 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-05-13 22:14:30 -0400 (Wed, 13 May 2009)
New Revision: 6776

Modified:
   trunk/docs/user-manual/en/modules/large-messages.xml
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1625 - XARollback and ordering on backup

Modified: trunk/docs/user-manual/en/modules/large-messages.xml
===================================================================
--- trunk/docs/user-manual/en/modules/large-messages.xml	2009-05-14 01:47:11 UTC (rev 6775)
+++ trunk/docs/user-manual/en/modules/large-messages.xml	2009-05-14 02:14:30 UTC (rev 6776)
@@ -33,8 +33,7 @@
             class</para>
         <programlisting>        ClientSessionFactory factory = ....;
         factory.setMinLargeMessageSize(25000);</programlisting>
-        <para>The default value is aways 100K which is a good value for most cases based on our
-            tests.</para>
+        <para>The default value is aways 100K which is a good value for most cases.</para>
     </section>
     <section>
         <title>Streaming Large messages</title>

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-05-14 01:47:11 UTC (rev 6775)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-05-14 02:14:30 UTC (rev 6776)
@@ -511,7 +511,7 @@
       }
       else
       {
-         final HashSet<Queue> queues = lockUsedQueues();
+         final HashSet<Queue> queues = lockUsedQueues(null);
 
          replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
          {
@@ -625,17 +625,30 @@
 
    public void handleXARollback(final SessionXARollbackMessage packet)
    {
+      
       if (replicatingChannel == null)
       {
          doHandleXARollback(packet);
       }
       else
       {
+         final Set<Queue> queues = lockUsedQueues(packet.getXid());
+
          replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
          {
             public void run()
             {
-               doHandleXARollback(packet);
+               try
+               {
+                  doHandleXARollback(packet);
+               }
+               finally
+               {
+                  for (Queue queue : queues)
+                  {
+                     queue.unlockDelivery();
+                  }
+               }
             }
          });
       }
@@ -865,7 +878,7 @@
       }
       else
       {
-         final HashSet<Queue> queues = lockUsedQueues();
+         final HashSet<Queue> queues = lockUsedQueues(null);
 
          // We need to stop the consumers first before replicating, to ensure no deliveries occur after this,
          // but we need to process the actual close() when the replication response returns, otherwise things
@@ -2508,12 +2521,12 @@
          postOffice.route(msg, tx);
       }
    }
-   
+
    /**
     * We need to avoid delivery when rolling back while doing replication, or the backup node could be on a different order
     * @return
     */
-   private HashSet<Queue> lockUsedQueues()
+   private HashSet<Queue> lockUsedQueues(Xid xid)
    {
       final HashSet<Queue> queues = new HashSet<Queue>();
       
@@ -2522,11 +2535,21 @@
          queues.add(consumer.getQueue());
       }
       
-      if (tx != null)
+      Transaction localTX;
+      if (xid == null)
       {
-         queues.addAll(tx.getDistinctQueues());
+         localTX = tx;
       }
+      else
+      {
+         localTX = resourceManager.getTransaction(xid);
+      }
       
+      if (localTX != null)
+      {
+         queues.addAll(localTX.getDistinctQueues());
+      }
+      
       for (Queue queue : queues)
       {
          queue.lockDelivery();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java	2009-05-14 01:47:11 UTC (rev 6775)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java	2009-05-14 02:14:30 UTC (rev 6776)
@@ -22,14 +22,15 @@
 
 package org.jboss.messaging.tests.integration.cluster.failover;
 
-import java.lang.reflect.Method;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
@@ -44,13 +45,10 @@
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
 import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.QueueImpl;
-import org.jboss.messaging.core.server.impl.ServerConsumerImpl;
 import org.jboss.messaging.tests.util.RandomUtil;
-import org.jboss.messaging.tests.util.SpawnedVMSupport;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -269,9 +267,19 @@
 
    }
 
+   public void testDeliveryOrderOnTransactionalRollbackMultiThreadXA() throws Exception
+   {
+      internalTestDeliveryOrderOnTransactionalRollbackMultiThread(true);
+   }
+
    public void testDeliveryOrderOnTransactionalRollbackMultiThread() throws Exception
    {
+      internalTestDeliveryOrderOnTransactionalRollbackMultiThread(false);
+   }
 
+   public void internalTestDeliveryOrderOnTransactionalRollbackMultiThread(final boolean isXA) throws Exception
+   {
+
       final SimpleString ADDRESS = new SimpleString("TEST");
       final SimpleString PROPERTY_KEY = new SimpleString("KEY-STR");
 
@@ -314,7 +322,7 @@
                   msg.putStringProperty(PROPERTY_KEY, RandomUtil.randomSimpleString());
                   prod.send(msg);
                }
-               
+
                sess.commit();
             }
             catch (Throwable e)
@@ -346,6 +354,8 @@
 
          final CountDownLatch latchStart;
 
+         Xid xid = null;
+
          final boolean rollback;
 
          ConsumerThread(final ClientSessionFactory sf,
@@ -359,17 +369,43 @@
             this.rollback = rollback;
          }
 
+         public void close()
+         {
+            try
+            {
+               if (xid != null)
+               {
+                  sess.rollback(xid);
+                  xid = null;
+               }
+               sess.close();
+               sess = null;
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+
+         }
+
          @Override
          public void run()
          {
+
             ClientConsumer cons = null;
             try
             {
                latchAlign.countDown();
                latchStart.await();
 
-               sess = sf.createSession(false, false, false);
+               sess = sf.createSession(isXA, false, false);
 
+               if (isXA)
+               {
+                  xid = newXID();
+                  sess.start(xid, XAResource.TMNOFLAGS);
+               }
+
                cons = sess.createConsumer(ADDRESS);
 
                sess.start();
@@ -390,9 +426,21 @@
             {
                try
                {
+                  if (isXA)
+                  {
+                     sess.end(xid, XAResource.TMSUCCESS);
+                  }
                   if (rollback)
                   {
-                     sess.rollback();
+                     if (isXA)
+                     {
+                        sess.rollback(xid);
+                        xid = null;
+                     }
+                     else
+                     {
+                        sess.rollback();
+                     }
                      cons.close();
                   }
                }
@@ -471,7 +519,7 @@
       {
          if (t.sess != null)
          {
-            t.sess.close();
+            t.close();
          }
       }
 




More information about the jboss-cvs-commits mailing list