[jboss-cvs] JBoss Messaging SVN: r5218 - branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 30 03:58:15 EDT 2008


Author: gaohoward
Date: 2008-10-30 03:58:15 -0400 (Thu, 30 Oct 2008)
New Revision: 5218

Modified:
   branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
Log:
JBMESSAGING-1416


Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java	2008-10-30 07:36:33 UTC (rev 5217)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java	2008-10-30 07:58:15 UTC (rev 5218)
@@ -28,6 +28,7 @@
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.ServerSession;
@@ -238,6 +239,87 @@
       }
    }
 
+   /*
+    * Make sure the ordering group messages are received in order
+    * thru a ConnectionConsumer in transaction mode, by two listeners.
+    */
+   public void testTransactedConcurrentReceive() throws Exception
+   {
+      if (ServerManagement.isRemote()) return;
+
+      Connection consumerConn = null;
+
+      Connection producerConn = null;
+
+      try
+      {
+         consumerConn = cf.createConnection();
+
+         consumerConn.start();
+
+         Session sessCons1 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+         Session sessCons2 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+
+         TxOrderingGroupMessageListener listener1 = new TxOrderingGroupMessageListener(this, sessCons1);
+         TxOrderingGroupMessageListener listener2 = new TxOrderingGroupMessageListener(this, sessCons2);
+
+         sessCons1.setMessageListener(listener1);
+         sessCons2.setMessageListener(listener2);
+
+         OrderingServerSessionPool pool = new OrderingServerSessionPool(sessCons1, sessCons2);
+
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1, null, pool, 5);
+
+         producerConn = cf.createConnection();
+
+         Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+         prod.enableOrderingGroup(null);
+
+         forceGC();
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage m = sessProd.createTextMessage("testing" + i);
+            prod.send(m, Message.DEFAULT_DELIVERY_MODE, i%10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         //waiting enough time to allow delivery complete.
+         msgLatch.attempt(10000);
+         
+         //check the order
+         assertEquals(NUM_MESSAGES, mList.size());
+         
+         for (int i = 0; i < NUM_MESSAGES; ++i)
+         {
+            TextMessage txm = mList.get(i);
+            assertEquals(txm.getText(), "testing" + i);
+         }
+
+         doze(3000);
+         
+         cc.close();
+
+         Session nSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer nCon = nSess.createConsumer(queue1);
+         
+         Message mx = nCon.receive(500);
+         assertNull(mx);
+         
+         consumerConn.close();
+         consumerConn = null;
+         producerConn.close();
+         producerConn = null;
+      }
+      finally
+      {
+         if (consumerConn != null) consumerConn.close();
+         if (producerConn != null) producerConn.close();
+         
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -308,16 +390,40 @@
 
    class OrderingServerSessionPool implements ServerSessionPool
    {
-      private ServerSession serverSession;
+      private ServerSession serverSession1;
+      private ServerSession serverSession2;
+      private long counter;
       
-      OrderingServerSessionPool(Session sess)
+      OrderingServerSessionPool(Session sess1, Session sess2)
       {
-         serverSession = new MockServerSession(sess);
+         serverSession1 = new MockServerSession(sess1);
+         serverSession2 = new MockServerSession(sess2);
+         counter = 0L;
       }
 
+      /**
+       * @param sessCons1
+       */
+      public OrderingServerSessionPool(Session sessCons1)
+      {
+         serverSession1 = new MockServerSession(sessCons1);
+         serverSession2 = null;
+      }
+
       public synchronized ServerSession getServerSession() throws JMSException
       {
-         return serverSession;
+         if (serverSession2 == null)
+         {
+            return serverSession1;
+         }
+         
+         ServerSession result = serverSession2;
+         if (counter%2 == 0)
+         {
+            result = serverSession1;
+         }
+         counter++;
+         return result;
       }      
    }
 




More information about the jboss-cvs-commits mailing list