[jboss-cvs] JBoss Messaging SVN: r7218 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jun 5 05:03:15 EDT 2009


Author: gaohoward
Date: 2009-06-05 05:03:15 -0400 (Fri, 05 Jun 2009)
New Revision: 7218

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
Log:
JBMESSAGING-1638


Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2009-06-05 08:02:44 UTC (rev 7217)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2009-06-05 09:03:15 UTC (rev 7218)
@@ -834,6 +834,22 @@
          {
          	deliveringCount.decrement();
          }
+         
+         MessageReference ref = d.getReference();
+         if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
+         {
+            if (trace)
+            {
+               log.trace("Ordering group message " + ref + " has been completed, trying to send next.");
+            }
+            synchronized (lock)
+            {
+               if (monitor.messageCompleted(ref))
+               {
+                  deliverInternal();
+               }
+            }
+         }
       }
       else
       {
@@ -850,22 +866,6 @@
             pm.addTransaction(tx);
          }
       }
-      
-      MessageReference ref = d.getReference();
-      if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
-      {
-         if (trace)
-         {
-            log.trace("Ordering group message " + ref + " has been completed, trying to send next.");
-         }
-         synchronized (lock)
-         {
-            if (monitor.messageCompleted(ref))
-            {
-               deliverInternal();
-            }
-         }
-      }
    }
 
    protected InMemoryCallback getCallback(Transaction tx)
@@ -1054,6 +1054,19 @@
                {
                	deliveringCount.decrement();
                }
+               
+               MessageReference ref = del.getReference();
+               if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
+               {
+                  if (trace)
+                  {
+                     log.trace("Ordering group message " + ref + " has been completed, trying to send next.");
+                  }
+                  synchronized (lock)
+                  {
+                     promptDelivery = (promptDelivery || monitor.messageCompleted(ref));
+                  }
+               }
             }
 
             // prompt delivery

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java	2009-06-05 08:02:44 UTC (rev 7217)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java	2009-06-05 09:03:15 UTC (rev 7218)
@@ -429,6 +429,8 @@
 
          XASession xsess2 = xconn2.createXASession();
 
+         MessageConsumer cons2 = xsess2.createConsumer(queue1);
+
          XAResource res3 = xsess2.getXAResource();
 
          Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
@@ -438,6 +440,10 @@
          assertEquals(0, xids2.length);
 
          assertEquals(xid1, xids[0]);
+         
+         //before recover and commit the message cannot be received.
+         rm1 = (TextMessage)cons2.receive(2000);
+         assertNull(rm1);
 
          // Commit the tx
 
@@ -498,7 +504,417 @@
          }
       }
    }
+   
+   
+   /*
+    * send 4 messages, start a XA transaction to receive, next message will not
+    * be delivered if the XA is in prepared state 
+    */
+   public void testSimpleXATransactionalRollbackReceive() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalRollbackReceive");
 
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      try
+      {
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         // Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         // Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         // prepare the tx
+         res1.prepare(xid1);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+         
+         res1.rollback(xid1);
+
+         //start another Tx, and rollback again.
+         Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+         res1.start(xid2, XAResource.TMNOFLAGS);
+
+         rm1 = (TextMessage)cons.receive(1000);
+         assertNotNull(rm1);
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid2, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         // prepare the tx
+         res1.prepare(xid2);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         res1.rollback(xid2);
+         
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm1.getText());
+         
+         
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm2.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm3.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm4.getText());
+
+         checkEmpty(queue1);
+
+         conn1.close();
+
+         xconn1.close();
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+      }
+   }
+   
+   /*
+    * send 4 messages, start a XA transaction to receive, first rollback, then commit
+    * next message will be delivered only if the XA tx is committed  
+    */
+   public void testSimpleXATransactionalRollbackCommitReceive() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalRollbackReceive");
+
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      try
+      {
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         // Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         // Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         //roll back directly
+         res1.rollback(xid1);
+
+         //start another Tx, and rollback again.
+         Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+         res1.start(xid2, XAResource.TMNOFLAGS);
+
+         rm1 = (TextMessage)cons.receive(1000);
+         assertNotNull(rm1);
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid2, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         // prepare the tx
+         res1.prepare(xid2);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         res1.commit(xid2, false);
+         
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm2.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm3.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm4.getText());
+
+         checkEmpty(queue1);
+
+         conn1.close();
+
+         xconn1.close();
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+      }
+   }
+   
+   /*
+    * send 4 messages, start a XA transaction to receive,  then commit without prepare
+    * next message will be delivered only if the XA tx is committed  
+    */
+   public void testSimpleXATransactionalCommitOnephaseReceive() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalRollbackReceive");
+
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      try
+      {
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         // Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         // Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         //roll back directly
+         res1.rollback(xid1);
+
+         //start another Tx, and rollback again.
+         Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+         res1.start(xid2, XAResource.TMNOFLAGS);
+
+         rm1 = (TextMessage)cons.receive(1000);
+         assertNotNull(rm1);
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid2, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         res1.commit(xid2, true);
+         
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm2.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm3.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm4.getText());
+
+         checkEmpty(queue1);
+
+         conn1.close();
+
+         xconn1.close();
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list