[jboss-cvs] JBoss Messaging SVN: r6079 - in trunk: src/main/org/jboss/messaging/core/server/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 13 11:57:44 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-13 11:57:44 -0400 (Fri, 13 Mar 2009)
New Revision: 6079

Modified:
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
Log:
Synchronizing some of the transaction operations... 
as discussed here:
http://www.jboss.org/index.html?module=bb&op=viewtopic&t=152174

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-03-13 15:31:21 UTC (rev 6078)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-03-13 15:57:44 UTC (rev 6079)
@@ -842,18 +842,23 @@
 
    private final PageMessageOperation getPageOperation(final Transaction tx)
    {
-      PageMessageOperation oper = (PageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION);
-
-      if (oper == null)
+      // you could have races on the case two sessions using the same XID
+      // so this whole operation needs to be atomic per TX
+      synchronized (tx)
       {
-         oper = new PageMessageOperation();
-
-         tx.putProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION, oper);
-
-         tx.addOperation(oper);
+         PageMessageOperation oper = (PageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION);
+   
+         if (oper == null)
+         {
+            oper = new PageMessageOperation();
+   
+            tx.putProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION, oper);
+   
+            tx.addOperation(oper);
+         }
+   
+         return oper;
       }
-
-      return oper;
    }
 
    private class MessageExpiryRunner implements Runnable

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-03-13 15:31:21 UTC (rev 6078)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-03-13 15:57:44 UTC (rev 6079)
@@ -626,18 +626,21 @@
 
    final RefsOperation getRefsOperation(final Transaction tx)
    {
-      RefsOperation oper = (RefsOperation)tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
-
-      if (oper == null)
+      synchronized (tx)
       {
-         oper = new RefsOperation();
-
-         tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
-
-         tx.addOperation(oper);
+         RefsOperation oper = (RefsOperation)tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+   
+         if (oper == null)
+         {
+            oper = new RefsOperation();
+   
+            tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
+   
+            tx.addOperation(oper);
+         }
+   
+         return oper;
       }
-
-      return oper;
    }
 
    public void cancel(final Transaction tx, final MessageReference reference) throws Exception
@@ -1000,6 +1003,12 @@
    {
       return name.hashCode();
    }
+   
+   @Override
+   public String toString()
+   {
+      return "QueueImpl(name=" + this.name.toString() + ")";
+   }
 
    // Private
    // ------------------------------------------------------------------------------
@@ -1463,12 +1472,12 @@
 
       List<MessageReference> refsToAck = new ArrayList<MessageReference>();
 
-      void addRef(final MessageReference ref)
+      synchronized void addRef(final MessageReference ref)
       {
          refsToAdd.add(ref);
       }
 
-      void addAck(final MessageReference ref)
+      synchronized void addAck(final MessageReference ref)
       {
          refsToAck.add(ref);
       }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-03-13 15:31:21 UTC (rev 6078)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-03-13 15:57:44 UTC (rev 6079)
@@ -293,7 +293,7 @@
       this.messagingException = messagingException;
    }
 
-   public void addOperation(final TransactionOperation operation)
+   public synchronized void addOperation(final TransactionOperation operation)
    {
       checkCreateOperations();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java	2009-03-13 15:31:21 UTC (rev 6078)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java	2009-03-13 15:57:44 UTC (rev 6079)
@@ -26,16 +26,21 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -48,6 +53,7 @@
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
 public class BasicXaTest extends ServiceTestBase
 {
@@ -76,6 +82,7 @@
       configuration.setSecurityEnabled(false);
       configuration.setJournalMinFiles(2);
       configuration.setPagingDirectory(getPageDir());
+      configuration.setPagingMaxGlobalSizeBytes(0); // no paging for these tests
 
       messagingService = createService(false, configuration, addressSettings);
 
@@ -126,7 +133,7 @@
       validateRM(sessionFactory, sessionFactory);
       validateRM(nettyFactory, sessionFactory);
    }
-   
+
    private void validateRM(ClientSessionFactory factory1, ClientSessionFactory factory2) throws Exception
    {
       ClientSession session1 = factory1.createSession(true, false, false);
@@ -227,7 +234,7 @@
       clientSession.commit(xid, true);
       clientSession.start();
       clientConsumer = clientSession.createConsumer(atestq);
-      m = clientConsumer.receive(1000);
+      m = clientConsumer.receiveImmediate();
       assertNull(m);
 
    }
@@ -266,28 +273,40 @@
 
    public void testSendMultipleQueues() throws Exception
    {
-      multipleQueuesInternalTest(true, false, false, false);
+      multipleQueuesInternalTest(true, false, false, false, false);
    }
 
    public void testSendMultipleQueuesOnePhase() throws Exception
    {
-      multipleQueuesInternalTest(true, false, false, true);
-      multipleQueuesInternalTest(false, false, true, true);
+      multipleQueuesInternalTest(true, false, false, false, true);
+      multipleQueuesInternalTest(false, false, true, false, true);
    }
 
+   public void testSendMultipleQueuesOnePhaseJoin() throws Exception
+   {
+      multipleQueuesInternalTest(true, false, false, true, true);
+      multipleQueuesInternalTest(false, false, true, true, true);
+   }
+
+   public void testSendMultipleQueuesTwoPhaseJoin() throws Exception
+   {
+      multipleQueuesInternalTest(true, false, false, true, false);
+      multipleQueuesInternalTest(false, false, true, true, false);
+   }
+
    public void testSendMultipleQueuesRecreate() throws Exception
    {
-      multipleQueuesInternalTest(true, false, true, false);
+      multipleQueuesInternalTest(true, false, true, false, false);
    }
 
    public void testSendMultipleSuspend() throws Exception
    {
-      multipleQueuesInternalTest(true, true, false, false);
+      multipleQueuesInternalTest(true, true, false, false, false);
    }
 
    public void testSendMultipleSuspendRecreate() throws Exception
    {
-      multipleQueuesInternalTest(true, true, true, false);
+      multipleQueuesInternalTest(true, true, true, false, false);
    }
 
    public void testSendMultipleSuspendErrorCheck() throws Exception
@@ -313,6 +332,78 @@
       session.close();
    }
 
+   public void testForget() throws Exception
+   {
+      clientSession.forget(newXID());
+   }
+
+   public void testSimpleJoin() throws Exception
+   {
+      sessionFactory.setBlockOnPersistentSend(true);
+
+      SimpleString ADDRESS1 = new SimpleString("Address-1");
+      SimpleString ADDRESS2 = new SimpleString("Address-2");
+
+      clientSession.createQueue(ADDRESS1, ADDRESS1, true);
+      clientSession.createQueue(ADDRESS2, ADDRESS2, true);
+
+      Xid xid = newXID();
+
+      ClientSession sessionA = sessionFactory.createSession(true, false, false);
+      sessionA.start(xid, XAResource.TMNOFLAGS);
+
+      ClientSession sessionB = sessionFactory.createSession(true, false, false);
+      sessionB.start(xid, XAResource.TMJOIN);
+
+      ClientProducer prodA = sessionA.createProducer(ADDRESS1);
+      ClientProducer prodB = sessionB.createProducer(ADDRESS2);
+
+      for (int i = 0; i < 100; i++)
+      {
+         prodA.send(createTextMessage(sessionA, "A" + i));
+         prodB.send(createTextMessage(sessionB, "B" + i));
+      }
+
+      sessionA.end(xid, XAResource.TMSUCCESS);
+      sessionB.end(xid, XAResource.TMSUCCESS);
+
+      sessionB.close();
+
+      sessionA.commit(xid, true);
+
+      sessionA.close();
+
+      xid = newXID();
+
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+
+      ClientConsumer cons1 = clientSession.createConsumer(ADDRESS1);
+      ClientConsumer cons2 = clientSession.createConsumer(ADDRESS2);
+      clientSession.start();
+
+      for (int i = 0; i < 100; i++)
+      {
+         ClientMessage msg = cons1.receive(1000);
+         assertNotNull(msg);
+         assertEquals("A" + i, getTextMessage(msg));
+         msg.acknowledge();
+
+         msg = cons2.receive(1000);
+         assertNotNull(msg);
+         assertEquals("B" + i, getTextMessage(msg));
+         msg.acknowledge();
+      }
+
+      assertNull(cons1.receiveImmediate());
+      assertNull(cons2.receiveImmediate());
+
+      clientSession.end(xid, XAResource.TMSUCCESS);
+
+      clientSession.commit(xid, true);
+
+      clientSession.close();
+   }
+
    /**
     * @throws MessagingException
     * @throws XAException
@@ -320,7 +411,8 @@
    protected void multipleQueuesInternalTest(boolean createQueues,
                                              boolean suspend,
                                              boolean recreateSession,
-                                             boolean onePhase) throws MessagingException, XAException
+                                             boolean isJoinSession,
+                                             boolean onePhase) throws Exception
    {
       int NUMBER_OF_MSGS = 100;
       int NUMBER_OF_QUEUES = 10;
@@ -328,6 +420,8 @@
 
       SimpleString ADDRESS = new SimpleString("Address");
 
+      ClientSession newJoinSession = null;
+
       try
       {
 
@@ -338,6 +432,11 @@
             for (int i = 0; i < NUMBER_OF_QUEUES; i++)
             {
                session.createQueue(ADDRESS, ADDRESS.concat(Integer.toString(i)), true);
+               if (isJoinSession)
+               {
+                  clientSession.createQueue(ADDRESS.concat("-join"), ADDRESS.concat("-join." + i), true);
+               }
+
             }
          }
 
@@ -365,8 +464,30 @@
 
             prod.close();
 
+            if (isJoinSession)
+            {
+               newJoinSession = sessionFactory.createSession(true, false, false);
+
+               // This is a basic condition, or a real TM wouldn't be able to join both sessions in a single
+               // transactions
+               assertTrue(session.isSameRM(newJoinSession));
+
+               newJoinSession.start(xid, XAResource.TMJOIN);
+
+               // The Join Session will have its own queue, as it's not possible to guarantee ordering since this
+               // producer will be using a different session
+               ClientProducer newProd = newJoinSession.createProducer(ADDRESS.concat("-join"));
+               newProd.send(createTextMessage(newJoinSession, "After Join"));
+            }
+
             session.end(xid, XAResource.TMSUCCESS);
 
+            if (isJoinSession)
+            {
+               newJoinSession.end(xid, XAResource.TMSUCCESS);
+               newJoinSession.close();
+            }
+
             if (!onePhase)
             {
                session.prepare(xid);
@@ -416,6 +537,7 @@
 
                ClientMessage msg = consumer.receive(1000);
                assertNotNull(msg);
+               assertEquals("one more", getTextMessage(msg));
                msg.acknowledge();
 
                if (suspend)
@@ -426,8 +548,28 @@
 
                assertEquals("one more", getTextMessage(msg));
 
+               if (isJoinSession)
+               {
+                  ClientSession newSession = sessionFactory.createSession(true, false, false);
+
+                  newSession.start(xid, XAResource.TMJOIN);
+
+                  newSession.start();
+
+                  ClientConsumer newConsumer = newSession.createConsumer(ADDRESS.concat("-join." + nqueues));
+
+                  msg = newConsumer.receive(1000);
+                  assertNotNull(msg);
+
+                  assertEquals("After Join", getTextMessage(msg));
+                  msg.acknowledge();
+
+                  newSession.end(xid, XAResource.TMSUCCESS);
+
+                  newSession.close();
+               }
+
                assertNull(consumer.receiveImmediate());
-
                consumer.close();
 
             }




More information about the jboss-cvs-commits mailing list