[jboss-cvs] JBoss Messaging SVN: r5218 - branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 30 03:58:15 EDT 2008
Author: gaohoward
Date: 2008-10-30 03:58:15 -0400 (Thu, 30 Oct 2008)
New Revision: 5218
Modified:
branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
Log:
JBMESSAGING-1416
Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java 2008-10-30 07:36:33 UTC (rev 5217)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java 2008-10-30 07:58:15 UTC (rev 5218)
@@ -28,6 +28,7 @@
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ServerSession;
@@ -238,6 +239,87 @@
}
}
+ /*
+ * Make sure the ordering group messages are received in order
+ * thru a ConnectionConsumer in transaction mode, by two listeners.
+ */
+ public void testTransactedConcurrentReceive() throws Exception
+ {
+ if (ServerManagement.isRemote()) return;
+
+ Connection consumerConn = null;
+
+ Connection producerConn = null;
+
+ try
+ {
+ consumerConn = cf.createConnection();
+
+ consumerConn.start();
+
+ Session sessCons1 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+ Session sessCons2 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+
+ TxOrderingGroupMessageListener listener1 = new TxOrderingGroupMessageListener(this, sessCons1);
+ TxOrderingGroupMessageListener listener2 = new TxOrderingGroupMessageListener(this, sessCons2);
+
+ sessCons1.setMessageListener(listener1);
+ sessCons2.setMessageListener(listener2);
+
+ OrderingServerSessionPool pool = new OrderingServerSessionPool(sessCons1, sessCons2);
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1, null, pool, 5);
+
+ producerConn = cf.createConnection();
+
+ Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+ prod.enableOrderingGroup(null);
+
+ forceGC();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage m = sessProd.createTextMessage("testing" + i);
+ prod.send(m, Message.DEFAULT_DELIVERY_MODE, i%10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ //waiting enough time to allow delivery complete.
+ msgLatch.attempt(10000);
+
+ //check the order
+ assertEquals(NUM_MESSAGES, mList.size());
+
+ for (int i = 0; i < NUM_MESSAGES; ++i)
+ {
+ TextMessage txm = mList.get(i);
+ assertEquals(txm.getText(), "testing" + i);
+ }
+
+ doze(3000);
+
+ cc.close();
+
+ Session nSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer nCon = nSess.createConsumer(queue1);
+
+ Message mx = nCon.receive(500);
+ assertNull(mx);
+
+ consumerConn.close();
+ consumerConn = null;
+ producerConn.close();
+ producerConn = null;
+ }
+ finally
+ {
+ if (consumerConn != null) consumerConn.close();
+ if (producerConn != null) producerConn.close();
+
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -308,16 +390,40 @@
class OrderingServerSessionPool implements ServerSessionPool
{
- private ServerSession serverSession;
+ private ServerSession serverSession1;
+ private ServerSession serverSession2;
+ private long counter;
- OrderingServerSessionPool(Session sess)
+ OrderingServerSessionPool(Session sess1, Session sess2)
{
- serverSession = new MockServerSession(sess);
+ serverSession1 = new MockServerSession(sess1);
+ serverSession2 = new MockServerSession(sess2);
+ counter = 0L;
}
+ /**
+ * @param sessCons1
+ */
+ public OrderingServerSessionPool(Session sessCons1)
+ {
+ serverSession1 = new MockServerSession(sessCons1);
+ serverSession2 = null;
+ }
+
public synchronized ServerSession getServerSession() throws JMSException
{
- return serverSession;
+ if (serverSession2 == null)
+ {
+ return serverSession1;
+ }
+
+ ServerSession result = serverSession2;
+ if (counter%2 == 0)
+ {
+ result = serverSession1;
+ }
+ counter++;
+ return result;
}
}
More information about the jboss-cvs-commits
mailing list