[jboss-cvs] JBoss Messaging SVN: r7270 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 9 07:29:06 EDT 2009


Author: gaohoward
Date: 2009-06-09 07:29:06 -0400 (Tue, 09 Jun 2009)
New Revision: 7270

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
Log:
JBMESSAGING-1643


Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2009-06-09 08:48:39 UTC (rev 7269)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2009-06-09 11:29:06 UTC (rev 7270)
@@ -173,6 +173,22 @@
       return groupName;
    }
 
+   /**
+    * @param ref
+    */
+   public void unmarkSending(MessageReference ref)
+   {
+      if (sortedList.size() == 0)
+      {
+         return;
+      }
+      ReferenceHolder holder = sortedList.get(0);
+      if (holder.matchMessage(ref))
+      {
+         holder.unmarkSending();
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2009-06-09 08:48:39 UTC (rev 7269)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2009-06-09 11:29:06 UTC (rev 7270)
@@ -175,7 +175,7 @@
          OrderingGroup group = orderingGroups.get(grpName);
          if (group != null)
          {
-            group.unregister(ref);
+            group.unmarkSending(ref);
          }
       }
    }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java	2009-06-09 08:48:39 UTC (rev 7269)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java	2009-06-09 11:29:06 UTC (rev 7270)
@@ -22,6 +22,8 @@
 
 package org.jboss.test.messaging.jms;
 
+import java.util.ArrayList;
+
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Message;
@@ -33,6 +35,7 @@
 import javax.jms.TextMessage;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
+import javax.management.ObjectName;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
@@ -914,7 +917,575 @@
          }
       }
    }
+   
+   /*
+    * send 4 messages, start a XA transaction to receive, roll back the tx.
+    * then start another transaction, receive the 1st message, then try to receive second again.
+    * the second shouldn't be received. 
+    */
+   public void testSimpleXATransactionalRollbackReceive2() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalRollbackReceive2");
 
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      try
+      {
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         // Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         // Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         // prepare the tx
+         res1.prepare(xid1);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+         
+         res1.rollback(xid1);
+
+         //start another Tx, and rollback again.
+         Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+         res1.start(xid2, XAResource.TMNOFLAGS);
+
+         rm1 = (TextMessage)cons.receive(1000);
+         assertNotNull(rm1);
+         assertEquals(tm1.getText(), rm1.getText());
+         
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         res1.end(xid2, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         // prepare the tx
+         res1.prepare(xid2);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         res1.commit(xid2, false);
+         
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm2.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm3.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm4.getText());
+
+         checkEmpty(queue1);
+
+         conn1.close();
+
+         xconn1.close();
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+      }
+   }
+   
+   /*
+    * send 4 messages, start a XA transaction to receive m1, roll back and close the session set.
+    * then start another transaction, receive the 1st message, then try to receive second again.
+    * the second shouldn't be received. 
+    */
+   public void testSimpleXATransactionalRollbackReceive3() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalRollbackReceive2");
+
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      try
+      {
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         // Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         // Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         // prepare the tx
+         res1.prepare(xid1);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+         
+         res1.rollback(xid1);
+         
+         xsess1.getSession().close();
+
+         XASession xsess2 = xconn1.createXASession();
+
+         XAResource res2 = xsess2.getXAResource();
+         
+         MessageConsumer cons2 = xsess2.createConsumer(queue1);
+
+         //start another Tx, commit it.
+         Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+         res2.start(xid2, XAResource.TMNOFLAGS);
+
+         rm1 = (TextMessage)cons2.receive(1000);
+         assertNotNull(rm1);
+         assertEquals(tm1.getText(), rm1.getText());
+         
+         rm1 = (TextMessage)cons2.receive(2000);
+         assertNull(rm1);
+
+         res2.end(xid2, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons2.receive(2000);
+         assertNull(rm1);
+
+         // prepare the tx
+         res2.prepare(xid2);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons2.receive(2000);
+         assertNull(rm1);
+
+         res2.commit(xid2, false);
+         
+         rm1 = (TextMessage)cons2.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm2.getText());
+
+         rm1 = (TextMessage)cons2.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm3.getText());
+
+         rm1 = (TextMessage)cons2.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm4.getText());
+
+         checkEmpty(queue1);
+
+         conn1.close();
+
+         xconn1.close();
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+      }
+   }
+   
+   /*
+    * https://jira.jboss.org/jira/browse/JBMESSAGING-1643
+    */
+   public void testSimpleConsumerCloseReceive() throws Exception
+   {
+      log.trace("starting testSimpleConsumerCloseReceive");
+
+      Connection conn1 = null;
+
+      try
+      {
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+         
+         conn1.start();
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage rm1 = (TextMessage)cons1.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), "tm1");
+         
+         TextMessage rm2 = (TextMessage)cons1.receive(2000);
+         assertNull(rm2);
+         
+         //Next message not available to another consumer either.
+         MessageConsumer cons2 = sess1.createConsumer(queue1);
+         rm2 = (TextMessage)cons2.receive(2000);
+         assertNull(rm2);
+         
+         //ack rm1
+         rm1.acknowledge();
+         
+         cons1.close();
+         
+         rm1 = (TextMessage)cons2.receive(1000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), "tm2");
+         rm1.acknowledge();
+         
+         rm1 = (TextMessage)cons2.receive(1000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), "tm3");
+         rm1.acknowledge();
+         
+         rm1 = (TextMessage)cons2.receive(1000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), "tm4");
+         rm1.acknowledge();
+         
+         checkEmpty(queue1);
+         
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+         
+      }
+      
+   }
+
+   /*
+    * send 4 messages, start a XA transaction to receive, roll back 2nd message until it 
+    * goes to DLQ, then continue to receive the messages, check
+    * the messages received order. 
+    * https://jira.jboss.org/jira/browse/JBMESSAGING-1643
+    */
+   public void testSimpleXATransactionalRollbackReceive4() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         return;
+      }
+
+      log.trace("starting testSimpleXATransactionalRollbackReceive4");
+
+      Connection conn1 = null;
+
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+      String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";
+      
+      final int MAX_DELIVERIES = 5;
+
+      try
+      {
+         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+         ServerManagement.setAttribute(serverPeerObjectName,
+                                       "DefaultMaxDeliveryAttempts",
+                                       String.valueOf(MAX_DELIVERIES));
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("A");
+         TextMessage tm2 = sess1.createTextMessage("B");
+         TextMessage tm3 = sess1.createTextMessage("C");
+         TextMessage tm4 = sess1.createTextMessage("D");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+         
+         conn1.start();
+
+         ArrayList<String> rList = new ArrayList<String>();
+         int i = 0;
+         while (true)
+         {
+            XAConnection xconn1 = null;
+            XASession xsess1 = null;
+            MessageConsumer xcons1 = null;
+            i++;
+            try
+            {
+               xconn1 = cf.createXAConnection();
+
+               xsess1 = xconn1.createXASession();
+
+               XAResource res1 = xsess1.getXAResource();
+               
+               String bqStr = "bq" + i;
+
+               // Pretend to be a transaction manager by interacting through the XAResources
+               Xid xid1 = new MessagingXid(bqStr.getBytes(), 42, "eemeli".getBytes());
+
+               res1.start(xid1, XAResource.TMNOFLAGS);
+
+               xcons1 = xsess1.createConsumer(queue1);
+
+               xconn1.start();
+
+               TextMessage rm1 = (TextMessage)xcons1.receive(5000);
+
+               if (rm1 == null)
+               {
+                  break;
+               }
+               
+               rList.add(rm1.getText());
+
+               res1.end(xid1, XAResource.TMSUCCESS);
+
+               // prepare the tx
+               res1.prepare(xid1);
+               
+               if (rm1.getText().equals("B"))
+               {
+                  res1.rollback(xid1);               
+               }
+               else
+               {
+                  res1.commit(xid1, false);               
+               }
+            }
+            finally
+            {
+               if (xcons1 != null)
+               {
+                  try
+                  {
+                     xcons1.close();
+                  }
+                  catch (Exception e)
+                  {
+                     // Ignore
+                  }
+               }
+               if (xsess1 != null)
+               {
+                  try
+                  {
+                     xsess1.close();
+                  }
+                  catch (Exception e)
+                  {
+                     // Ignore
+                  }
+               }
+               if (xconn1 != null)
+               {
+                  try
+                  {
+                     xconn1.close();
+                  }
+                  catch (Exception e)
+                  {
+                     // Ignore
+                  }
+               }
+            }
+         }
+         
+         //examine result
+         assertEquals(rList.size(), 8);
+         assertEquals("A", rList.get(0));
+         assertEquals("B", rList.get(1));
+         assertEquals("B", rList.get(2));
+         assertEquals("B", rList.get(3));
+         assertEquals("B", rList.get(4));
+         assertEquals("B", rList.get(5));
+         assertEquals("C", rList.get(6));
+         assertEquals("D", rList.get(7));
+
+         MessageConsumer cons1 = sess1.createConsumer(queue2);
+         TextMessage dMsg = (TextMessage)cons1.receive(2000);
+         assertNotNull(dMsg);
+         assertEquals(dMsg.getText(), "B");
+         
+         checkEmpty(queue1);
+         checkEmpty(queue2);
+      }
+      finally
+      {
+         ServerManagement.setAttribute(serverPeerObjectName,
+                                       "DefaultDLQ",
+                                       "jboss.messaging.destination:service=Queue,name=DLQ");
+
+         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list