[jboss-cvs] JBoss Messaging SVN: r5335 - in branches/Branch_1416_merge: tests/src/org/jboss/test/messaging/jms and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 11 06:24:20 EST 2008
Author: gaohoward
Date: 2008-11-11 06:24:20 -0500 (Tue, 11 Nov 2008)
New Revision: 5335
Added:
branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java
branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java
Modified:
branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Log:
JBMESSAGING-1416 code cleanup
Modified: branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2008-11-11 11:05:11 UTC (rev 5334)
+++ branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2008-11-11 11:24:20 UTC (rev 5335)
@@ -46,8 +46,9 @@
// Attributes ----------------------------------------------------
private OrderingList<ReferenceHolder> sortedList = new OrderingList<ReferenceHolder>();
+
private HashMap<Long, ReferenceHolder> refMap = new HashMap<Long, ReferenceHolder>();
-
+
private String groupName;
// Static --------------------------------------------------------
@@ -58,7 +59,7 @@
{
groupName = name;
}
-
+
private OrderingGroup()
{
}
@@ -76,8 +77,8 @@
ReferenceHolder holder = refMap.get(mid);
if (holder != null)
{
- //is there a case where the ref can be registered more than once?
- //if not, we simply remove the addRef().
+ // is there a case where the ref can be registered more than once?
+ // if not, we simply remove the addRef().
holder.addRef();
return true;
}
@@ -105,11 +106,11 @@
public int isAvailable(MessageReference ref)
{
ReferenceHolder holder = sortedList.getFirst();
- if (holder == null)
+ if (holder == null)
{
return OrderingGroupMonitor.OK;
}
-
+
return holder.isAvailable(ref);
}
@@ -147,12 +148,12 @@
{
if (holder.isPending())
{
- //true if the sortedList has more to offer.
+ // true if the sortedList has more to offer.
result = sortedList.size() > 1;
}
else
{
- //means we still have the first un-sent.
+ // means we still have the first un-sent.
result = true;
}
}
@@ -228,7 +229,7 @@
elem = iter.next();
if (elem.compareTo(ref) <= 0)
{
- //should be before this elem
+ // should be before this elem
break;
}
index++;
@@ -278,14 +279,13 @@
class ReferenceHolder implements Comparable<ReferenceHolder>
{
- private static final Logger log = Logger.getLogger(ReferenceHolder.class);
private Long seq;
private MessageReference ref;
-
+
private long refCount;
-
+
private long pendingSentCount;
public ReferenceHolder(MessageReference r) throws JMSException
@@ -344,7 +344,7 @@
}
return refCount;
}
-
+
/**
* decrease the ref count
* here we don't care about pendingSentCount here.
Modified: branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java 2008-11-11 11:05:11 UTC (rev 5334)
+++ branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java 2008-11-11 11:24:20 UTC (rev 5335)
@@ -26,7 +26,6 @@
import java.util.Iterator;
import javax.jms.JMSException;
-import javax.jms.TextMessage;
import org.jboss.jms.message.JBossMessage;
import org.jboss.logging.Logger;
Modified: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2008-11-11 11:05:11 UTC (rev 5334)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2008-11-11 11:24:20 UTC (rev 5335)
@@ -41,6 +41,8 @@
* ConnectionConsumer tests
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -550,41 +552,41 @@
}
- class MockServerSessionPool implements ServerSessionPool
+}
+
+class MockServerSession implements ServerSession
+{
+ Session session;
+
+ MockServerSession(Session sess)
{
- private ServerSession serverSession;
-
- MockServerSessionPool(Session sess)
- {
- serverSession = new MockServerSession(sess);
- }
-
- public ServerSession getServerSession() throws JMSException
- {
- return serverSession;
- }
+ this.session = sess;
}
- class MockServerSession implements ServerSession
+
+ public Session getSession() throws JMSException
{
- Session session;
-
- MockServerSession(Session sess)
- {
- this.session = sess;
- }
-
+ return session;
+ }
- public Session getSession() throws JMSException
- {
- return session;
- }
-
- public void start() throws JMSException
- {
- session.run();
- }
-
+ public void start() throws JMSException
+ {
+ session.run();
}
}
+
+class MockServerSessionPool implements ServerSessionPool
+{
+ private ServerSession serverSession;
+
+ MockServerSessionPool(Session sess)
+ {
+ serverSession = new MockServerSession(sess);
+ }
+
+ public ServerSession getServerSession() throws JMSException
+ {
+ return serverSession;
+ }
+}
Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java 2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,510 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.jms.tx.MessagingXid;
+import org.jboss.jms.tx.ResourceManagerFactory;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
+
+/**
+ * A OrderingGroupAckTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Oct 28, 2008 2:30:18 PM
+ *
+ *
+ */
+public class OrderingGroupAckTest extends JMSTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public OrderingGroupAckTest(String name)
+ {
+ super(name);
+ }
+
+ // TestCase overrides -------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ // if this is not set testMockCoordinatorRecoveryWithJBossTSXids will create an invalid ObjectStore
+ ServiceContainer.setupObjectStoreDir();
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ ResourceManagerFactory.instance.clear();
+ }
+
+ // Public --------------------------------------------------------
+
+ /*
+ * This test shows how ordering group delivery handles transactions.
+ * A rollback will cause the first message to be re-delivered.
+ * A commit will cause the second message to be available for delivery.
+ */
+ public void testRollbackCommit() throws Exception
+ {
+ QueueConnection conn = null;
+
+ try
+ {
+ conn = cf.createQueueConnection();
+ QueueSession sess = conn.createQueueSession(true, 0);
+ JBossMessageProducer producer = (JBossMessageProducer)sess.createProducer(queue1);
+ producer.enableOrderingGroup(null);
+
+ QueueReceiver cons = sess.createReceiver(queue1);
+
+ conn.start();
+
+ Message m1 = sess.createTextMessage("testing1");
+ Message m2 = sess.createTextMessage("testing2");
+ producer.send(m1);
+ producer.send(m2);
+
+ sess.commit();
+
+ TextMessage mr = (TextMessage)cons.receive(3000);
+ assertNotNull(mr);
+ assertEquals("testing1", mr.getText());
+
+ sess.rollback();
+
+ mr = (TextMessage)cons.receive(3000);
+ assertNotNull(mr);
+ assertEquals("testing1", mr.getText());
+
+ // second message cannot be received
+ // if the first message is not committed.
+ mr = (TextMessage)cons.receive(3000);
+ assertNull(mr);
+
+ sess.commit();
+
+ mr = (TextMessage)cons.receive(3000);
+ assertNotNull(mr);
+ assertEquals("testing2", mr.getText());
+
+ sess.commit();
+
+ checkEmpty(queue1);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /*
+ * This test shows how ordering group handles client acknowledge.
+ * the second message will never be sent out unless the first message is acked.
+ */
+ public void testClientAcknowledge() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ JBossMessageProducer producer = (JBossMessageProducer)producerSess.createProducer(queue1);
+ producer.enableOrderingGroup(null);
+
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MSG = 100;
+
+ // Send some messages
+ for (int i = 0; i < 100; ++i)
+ {
+ TextMessage tm = producerSess.createTextMessage("ordering" + i);
+ producer.send(tm);
+ }
+
+ assertRemainingMessages(NUM_MSG);
+
+ log.trace("Sent messages");
+
+ int count = 0;
+ while (true)
+ {
+ Message m = consumer.receive(400);
+ if (m == null)
+ break;
+ count++;
+ }
+
+ assertRemainingMessages(NUM_MSG);
+
+ log.trace("Received " + count + " messages");
+
+ // if ordering group, count should be 1.
+ assertEquals(1, count);
+
+ consumerSess.recover();
+
+ assertRemainingMessages(NUM_MSG);
+
+ log.trace("Session recover called");
+
+ TextMessage m = null;
+
+ int i = 0;
+ for (; i < 100; ++i)
+ {
+ m = (TextMessage)consumer.receive();
+ log.trace("Received message " + i);
+ m.acknowledge();
+ assertTrue(m.getText().equals("ordering" + i));
+ }
+
+ assertRemainingMessages(0);
+
+ // make sure I don't receive anything else
+
+ checkEmpty(queue1);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /*
+ * send 4 messages, start a XA transaction to receive, messages will be received only if
+ * the last message is committed.
+ */
+ public void testSimpleXATransactionalReceive() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalReceive");
+
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ try
+ {
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ // Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ // Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ // prepare the tx
+ res1.prepare(xid1);
+
+ res1.commit(xid1, false);
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm2.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm3.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm4.getText());
+
+ checkEmpty(queue1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ /*
+ * send 4 messages, start a XA transaction to receive.
+ * In XA recovery, the messages will be re-sent without
+ * breaking the original order
+ */
+ public void testSimpleXATransactionalRecoveryCommitReceive() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalRecoveryCommitReceive");
+
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ XAConnection xconn2 = null;
+
+ try
+ {
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ // non-persistent will cause message lost in server failure
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRecoveryCommitReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ // Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ // Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ // prepare the tx
+
+ res1.prepare(xid1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ conn1 = null;
+
+ xconn1 = null;
+
+ // Now "crash" the server
+
+ ServerManagement.stopServerPeer();
+
+ ServerManagement.startServerPeer();
+
+ deployAndLookupAdministeredObjects();
+
+ // Now recover
+
+ xconn2 = cf.createXAConnection();
+
+ XASession xsess2 = xconn2.createXASession();
+
+ XAResource res3 = xsess2.getXAResource();
+
+ Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+
+ Xid[] xids2 = res3.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, xids2.length);
+
+ assertEquals(xid1, xids[0]);
+
+ // Commit the tx
+
+ res3.commit(xids[0], false);
+
+ // The message should be acknowldged
+
+ xconn2.close();
+
+ conn1 = cf.createConnection();
+
+ sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ conn1.start();
+
+ // tm2, tm3, tm4 should be received.
+ TextMessage tmr = (TextMessage)cons1.receive(1000);
+ assertNotNull(tmr);
+ assertEquals(tmr.getText(), tm2.getText());
+
+ tmr = (TextMessage)cons1.receive(1000);
+ assertNotNull(tmr);
+ assertEquals(tmr.getText(), tm3.getText());
+
+ tmr = (TextMessage)cons1.receive(1000);
+ assertNotNull(tmr);
+ assertEquals(tmr.getText(), tm4.getText());
+
+ checkEmpty(queue1);
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java 2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,466 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
+import org.jboss.jms.client.JBossConnectionConsumer;
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A OrderingGroupConnectionConsumerTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Oct 29, 2008 2:45:38 PM
+ *
+ *
+ */
+public class OrderingGroupConnectionConsumerTest extends JMSTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public OrderingGroupConnectionConsumerTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ mList.clear();
+ }
+
+ /*
+ * Make sure the ordering group messages are received in order
+ * thru a ConnectionConsumer.
+ */
+ public void testSimpleReceive() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ return;
+
+ Connection consumerConn = null;
+
+ Connection producerConn = null;
+
+ try
+ {
+ consumerConn = cf.createConnection();
+
+ consumerConn.start();
+
+ Session sessCons1 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ OrderingGroupMessageListener listener = new OrderingGroupMessageListener(this);
+
+ sessCons1.setMessageListener(listener);
+
+ ServerSessionPool pool = new OrderingServerSessionPool(sessCons1);
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1,
+ null,
+ pool,
+ 5);
+
+ producerConn = cf.createConnection();
+
+ Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+ prod.enableOrderingGroup(null);
+
+ forceGC();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage m = sessProd.createTextMessage("testing" + i);
+ prod.send(m, Message.DEFAULT_DELIVERY_MODE, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ // waiting enough time to allow delivery complete.
+ msgLatch.attempt(10000);
+
+ // check the order
+ assertEquals(NUM_MESSAGES, mList.size());
+
+ for (int i = 0; i < NUM_MESSAGES; ++i)
+ {
+ TextMessage txm = mList.get(i);
+ assertEquals(txm.getText(), "testing" + i);
+ }
+
+ // allow consumer thread gracefully shutdown
+ doze(3000);
+
+ cc.close();
+
+ consumerConn.close();
+ consumerConn = null;
+ producerConn.close();
+ producerConn = null;
+ }
+ finally
+ {
+ if (consumerConn != null)
+ consumerConn.close();
+ if (producerConn != null)
+ producerConn.close();
+
+ }
+ }
+
+ /**
+ * @param i
+ */
+ private void doze(long nt)
+ {
+ try
+ {
+ Thread.sleep(nt);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ /*
+ * Make sure the ordering group messages are received in order
+ * thru a ConnectionConsumer in transaction mode.
+ */
+
+ public void testTransactedReceive() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ return;
+
+ Connection consumerConn = null;
+
+ Connection producerConn = null;
+
+ try
+ {
+ consumerConn = cf.createConnection();
+
+ consumerConn.start();
+
+ Session sessCons1 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+
+ TxOrderingGroupMessageListener listener1 = new TxOrderingGroupMessageListener(this, sessCons1);
+
+ sessCons1.setMessageListener(listener1);
+
+ ServerSessionPool pool = new MockServerSessionPool(sessCons1);
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1,
+ null,
+ pool,
+ 5);
+
+ producerConn = cf.createConnection();
+
+ Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+ prod.enableOrderingGroup(null);
+
+ forceGC();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage m = sessProd.createTextMessage("testing" + i);
+ prod.send(m, Message.DEFAULT_DELIVERY_MODE, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ // waiting enough time to allow delivery complete.
+ msgLatch.attempt(10000);
+
+ // check the order
+ assertEquals(NUM_MESSAGES, mList.size());
+
+ for (int i = 0; i < NUM_MESSAGES; ++i)
+ {
+ TextMessage txm = mList.get(i);
+ assertEquals(txm.getText(), "testing" + i);
+ }
+
+ doze(3000);
+
+ cc.close();
+
+ consumerConn.close();
+ consumerConn = null;
+ producerConn.close();
+ producerConn = null;
+ }
+ finally
+ {
+ if (consumerConn != null)
+ consumerConn.close();
+ if (producerConn != null)
+ producerConn.close();
+
+ }
+ }
+
+ /*
+ * Make sure the ordering group messages are received in order
+ * thru a ConnectionConsumer in transaction mode, by two listeners.
+ */
+ public void testTransactedConcurrentReceive() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ return;
+
+ Connection consumerConn = null;
+
+ Connection producerConn = null;
+
+ try
+ {
+ consumerConn = cf.createConnection();
+
+ consumerConn.start();
+
+ Session sessCons1 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+ Session sessCons2 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+
+ TxOrderingGroupMessageListener listener1 = new TxOrderingGroupMessageListener(this, sessCons1);
+ TxOrderingGroupMessageListener listener2 = new TxOrderingGroupMessageListener(this, sessCons2);
+
+ sessCons1.setMessageListener(listener1);
+ sessCons2.setMessageListener(listener2);
+
+ OrderingServerSessionPool pool = new OrderingServerSessionPool(sessCons1, sessCons2);
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1,
+ null,
+ pool,
+ 5);
+
+ producerConn = cf.createConnection();
+
+ Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+ prod.enableOrderingGroup(null);
+
+ forceGC();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage m = sessProd.createTextMessage("testing" + i);
+ prod.send(m, Message.DEFAULT_DELIVERY_MODE, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ // waiting enough time to allow delivery complete.
+ msgLatch.attempt(10000);
+
+ // check the order
+ assertEquals(NUM_MESSAGES, mList.size());
+
+ for (int i = 0; i < NUM_MESSAGES; ++i)
+ {
+ TextMessage txm = mList.get(i);
+ assertEquals(txm.getText(), "testing" + i);
+ }
+
+ doze(3000);
+
+ cc.close();
+
+ Session nSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer nCon = nSess.createConsumer(queue1);
+
+ Message mx = nCon.receive(500);
+ assertNull(mx);
+
+ consumerConn.close();
+ consumerConn = null;
+ producerConn.close();
+ producerConn = null;
+ }
+ finally
+ {
+ if (consumerConn != null)
+ consumerConn.close();
+ if (producerConn != null)
+ producerConn.close();
+
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+ Latch msgLatch = new Latch();
+
+ ArrayList<TextMessage> mList = new ArrayList<TextMessage>();
+
+ final int NUM_MESSAGES = 100;
+
+ // Inner classes -------------------------------------------------
+ class OrderingGroupMessageListener implements MessageListener
+ {
+
+ OrderingGroupConnectionConsumerTest owner;
+
+ OrderingGroupMessageListener(OrderingGroupConnectionConsumerTest theTest)
+ {
+ owner = theTest;
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ try
+ {
+ owner.addReceived((TextMessage)message);
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ }
+ }
+ }
+
+ class TxOrderingGroupMessageListener implements MessageListener
+ {
+
+ OrderingGroupConnectionConsumerTest owner;
+
+ long counter = 0;
+
+ Session sessRef;
+
+ TxOrderingGroupMessageListener(OrderingGroupConnectionConsumerTest theTest, Session sess)
+ {
+ owner = theTest;
+ sessRef = sess;
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ try
+ {
+ // roll back once for every 5 messages
+ if (counter % 5 == 0)
+ {
+ sessRef.rollback();
+ }
+ else
+ {
+ owner.addReceived((TextMessage)message);
+ sessRef.commit();
+ }
+ counter++;
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ }
+ }
+ }
+
+ class OrderingServerSessionPool implements ServerSessionPool
+ {
+ private ServerSession serverSession1;
+
+ private ServerSession serverSession2;
+
+ private long counter;
+
+ OrderingServerSessionPool(Session sess1, Session sess2)
+ {
+ serverSession1 = new MockServerSession(sess1);
+ serverSession2 = new MockServerSession(sess2);
+ counter = 0L;
+ }
+
+ /**
+ * @param sessCons1
+ */
+ public OrderingServerSessionPool(Session sessCons1)
+ {
+ serverSession1 = new MockServerSession(sessCons1);
+ serverSession2 = null;
+ }
+
+ public synchronized ServerSession getServerSession() throws JMSException
+ {
+ if (serverSession2 == null)
+ {
+ return serverSession1;
+ }
+
+ ServerSession result = serverSession2;
+ if (counter % 2 == 0)
+ {
+ result = serverSession1;
+ }
+ counter++;
+ return result;
+ }
+ }
+
+ /**
+ * here we do not synchronize because this is called from onMessage().
+ * It is guaranteed that next message won't arrive until the previous
+ * message processing is finished (meaning onMessage() returns in auto-ack mode)
+ */
+ public void addReceived(TextMessage message)
+ {
+ mList.add(message);
+ if (mList.size() == NUM_MESSAGES)
+ {
+ msgLatch.release();
+ }
+ }
+
+}
Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java 2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,499 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A OrderingGroupMiscTest
+ *
+ * @author HowardGao
+ *
+ * Created Oct 31, 2008 10:44:48 AM
+ *
+ *
+ */
+public class OrderingGroupMiscTest extends JMSTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private HashMap<String, ArrayList<TextMessage>> recvBuffer = new HashMap<String, ArrayList<TextMessage>>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public OrderingGroupMiscTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ /*
+ * Sending 5 messages and letting the 3rd and 5th messages go to dlq and
+ * the others (1st, 2nd and 4th) should go to the receiver
+ */
+ public void testOrderingWithDLQ() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ final int NUM_MESSAGES = 5;
+
+ final int MAX_DELIVERIES = 8;
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";
+
+ Connection conn = null;
+
+ try
+ {
+ String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+ ServerManagement.setAttribute(serverPeerObjectName,
+ "DefaultMaxDeliveryAttempts",
+ String.valueOf(MAX_DELIVERIES));
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+ conn = cf.createConnection();
+
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess.createProducer(queue1);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(queue1);
+
+ conn.start();
+
+ // first
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm1);
+ assertEquals("Message:0", rm1.getText());
+ rm1.acknowledge();
+
+ // second
+ TextMessage rm2 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm2);
+ assertEquals("Message:1", rm2.getText());
+ rm2.acknowledge();
+
+ // third, leaving a hole
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ TextMessage rm3 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm3);
+ assertEquals("Message:2", rm3.getText());
+ sess2.recover();
+ }
+
+ // fourth
+ TextMessage rm4 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm4);
+ assertEquals("Message:3", rm4.getText());
+ rm4.acknowledge();
+
+ // fifth, leaving another hole
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ TextMessage rm5 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm5);
+ assertEquals("Message:4", rm5.getText());
+ sess2.recover();
+ }
+
+ TextMessage rmx = (TextMessage)cons.receive(1000);
+ assertNull(rmx);
+
+ // At this point all the messages have been delivered exactly MAX_DELIVERIES times
+
+ checkEmpty(queue1);
+
+ // Now should be in default dlq
+ MessageConsumer cons3 = sess.createConsumer(queue2);
+
+ TextMessage dm3 = (TextMessage)cons3.receive(1000);
+ assertNotNull(dm3);
+ assertEquals("Message:2", dm3.getText());
+
+ TextMessage dm5 = (TextMessage)cons3.receive(1000);
+ assertNotNull(dm5);
+ assertEquals("Message:4", dm5.getText());
+
+ conn.close();
+ }
+ }
+ finally
+ {
+ ServerManagement.setAttribute(serverPeerObjectName,
+ "DefaultDLQ",
+ "jboss.messaging.destination:service=Queue,name=DLQ");
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /*
+ * Sending 5 messages and letting the 3rd and 5th messages expire and
+ * the others (1st, 2nd and 4th) should go to the receiver
+ */
+ public void testOrderingWithExpiryQueue() throws Exception
+ {
+ final int NUM_MESSAGES = 5;
+
+ Connection conn = null;
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ try
+ {
+ ServerManagement.deployQueue("DefaultExpiry");
+
+ ServerManagement.deployQueue("TestOrderingQueue");
+
+ String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=DefaultExpiry";
+
+ String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=TestOrderingQueue";
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", "");
+
+ Queue testQueue = (Queue)ic.lookup("/queue/TestOrderingQueue");
+
+ Queue defaultExpiry = (Queue)ic.lookup("/queue/DefaultExpiry");
+
+ conn = cf.createConnection();
+
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess.createProducer(testQueue);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ if (i == 2 || i == 4)
+ {
+ // Send messages with time to live of 2000 enough time to get to client consumer - so
+ // they won't be expired on the server side
+ prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+ }
+ else
+ {
+ prod.send(tm);
+ }
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(testQueue);
+
+ // The messages should now be sitting in the consumer buffer
+ // Now give them enough time to expire
+ Thread.sleep(2500);
+
+ // this moment, only first message is delivered but waiting for ack.
+ // 3rd and 5th message still in queue, but when they are delivered
+ // they will be expired and won't be received by this consumer.
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm1);
+ assertEquals("Message:0", rm1.getText());
+ rm1.acknowledge();
+
+ TextMessage rm2 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm2);
+ assertEquals("Message:1", rm2.getText());
+ rm2.acknowledge();
+
+ TextMessage rm3 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm3);
+ assertEquals("Message:3", rm3.getText());
+ rm3.acknowledge();
+
+ TextMessage rm4 = (TextMessage)cons.receive(1000);
+ assertNull(rm4);
+
+ // Message should all be in the default expiry queue - let's check
+
+ MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+
+ TextMessage dm1 = (TextMessage)cons3.receive(1000);
+ assertNotNull(dm1);
+ assertEquals("Message:2", dm1.getText());
+
+ TextMessage dm2 = (TextMessage)cons3.receive(1000);
+ assertNotNull(dm2);
+ assertEquals("Message:4", dm2.getText());
+
+ conn.close();
+ }
+
+ }
+ finally
+ {
+ ServerManagement.setAttribute(serverPeerObjectName,
+ "DefaultExpiryQueue",
+ "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
+
+ ServerManagement.undeployQueue("DefaultExpiry");
+
+ ServerManagement.undeployQueue("TestOrderingQueue");
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /*
+ * First send 2 normal messages, then send 10 ordering messages with some priority and
+ * then disable ordering, then send 2 more normal messages with high
+ * priority. Make sure the normal messages are received first
+ * and the ordered messages are received later but ordered.
+ */
+ public void testOrderingGroupOnOff() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ JBossMessageProducer producer = (JBossMessageProducer)producerSess.createProducer(queue1);
+
+ conn.start();
+
+ TextMessage tmNormal1 = producerSess.createTextMessage("NoOrdering-1");
+ producer.send(tmNormal1, DeliveryMode.PERSISTENT, 6, Message.DEFAULT_TIME_TO_LIVE);
+ TextMessage tmNormal2 = producerSess.createTextMessage("NoOrdering-2");
+ producer.send(tmNormal2, DeliveryMode.PERSISTENT, 7, Message.DEFAULT_TIME_TO_LIVE);
+
+ producer.enableOrderingGroup(null);
+ // sending out ordering messages with priorities ranging from 0 to 5;
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage tm = producerSess.createTextMessage("Ordering" + i);
+ producer.send(tm, DeliveryMode.PERSISTENT, i % 6, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ producer.disableOrderingGroup();
+
+ TextMessage tmNormal3 = producerSess.createTextMessage("NoOrdering-3");
+ producer.send(tmNormal3, DeliveryMode.PERSISTENT, 8, Message.DEFAULT_TIME_TO_LIVE);
+ TextMessage tmNormal4 = producerSess.createTextMessage("NoOrdering-4");
+ producer.send(tmNormal4, DeliveryMode.PERSISTENT, 9, Message.DEFAULT_TIME_TO_LIVE);
+
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+
+ TextMessage rmNormal = (TextMessage)consumer.receive(1000);
+ assertNotNull(rmNormal);
+ assertEquals("NoOrdering-4", rmNormal.getText());
+
+ rmNormal = (TextMessage)consumer.receive(1000);
+ assertNotNull(rmNormal);
+ assertEquals("NoOrdering-3", rmNormal.getText());
+
+ rmNormal = (TextMessage)consumer.receive(1000);
+ assertNotNull(rmNormal);
+ assertEquals("NoOrdering-2", rmNormal.getText());
+
+ rmNormal = (TextMessage)consumer.receive(1000);
+ assertNotNull(rmNormal);
+ assertEquals("NoOrdering-1", rmNormal.getText());
+
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage rm = (TextMessage)consumer.receive(1000);
+ assertNotNull(rm);
+ assertEquals("Ordering" + i, rm.getText());
+ rm.acknowledge();
+ }
+
+ assertNull(consumer.receive(1000));
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /*
+ * create 10 ordering groups, each sending 100 messages
+ * make sure the order of each group is guaranteed.
+ */
+ public void testMultipleOrderingGroups() throws Exception
+ {
+
+ final int NUM_PRODUCERS = 10;
+ final int NUM_MSG = 100;
+ JBossMessageProducer[] prods = new JBossMessageProducer[NUM_PRODUCERS];
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ for (int i = 0; i < NUM_PRODUCERS; i++)
+ {
+ prods[i] = (JBossMessageProducer)producerSess.createProducer(queue1);
+ prods[i].enableOrderingGroup(null);
+ }
+
+ Session consumerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ // Send some messages
+ for (int i = 0; i < NUM_PRODUCERS; i++)
+ {
+ String key = prepareReceivingBuffer(i);
+ for (int j = 0; j < NUM_MSG; j++)
+ {
+ TextMessage tm = producerSess.createTextMessage(key + ":" + j);
+ prods[i].send(tm, DeliveryMode.PERSISTENT, j % 10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ assertEquals(NUM_PRODUCERS, recvBuffer.size());
+
+ log.trace("Sent messages");
+
+ while (true)
+ {
+ TextMessage rm = (TextMessage)consumer.receive(1000);
+ if (rm == null)
+ break;
+ putToBuffer(rm);
+ }
+
+ for (int i = 0; i < NUM_PRODUCERS; ++i)
+ {
+ String key = "ordering-" + i;
+ ArrayList<TextMessage> group = recvBuffer.get(key);
+ assertEquals(NUM_MSG, group.size());
+ for (int j = 0; j < NUM_MSG; ++j)
+ {
+ TextMessage rm = group.get(j);
+ assertEquals(key + ":" + j, rm.getText());
+ }
+ }
+
+ // make sure I don't receive anything else
+
+ checkEmpty(queue1);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @param rm
+ * @throws JMSException
+ */
+ private void putToBuffer(TextMessage rm) throws JMSException
+ {
+ String text = rm.getText();
+ String[] tokens = text.split(":");
+ String key = tokens[0];
+ ArrayList<TextMessage> group = recvBuffer.get(key);
+ group.add(rm);
+ }
+
+ /**
+ * initialize a buffer for receiving ordering group messages.
+ * @param i
+ */
+ private String prepareReceivingBuffer(int i)
+ {
+ String key = "ordering-" + i;
+ ArrayList<TextMessage> grpBuffer = recvBuffer.get(key);
+ if (grpBuffer == null)
+ {
+ grpBuffer = new ArrayList<TextMessage>();
+ recvBuffer.put(key, grpBuffer);
+ }
+ return key;
+ }
+
+ // Inner classes -------------------------------------------------
+}
Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java 2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,427 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * OrderingGroupBasicClusteringTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Nov 4, 2008 10:30:22 AM
+ *
+ *
+ */
+public class OrderingGroupBasicClusteringTest extends ClusteringTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private String queueName = "testDistributedQueue";
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public OrderingGroupBasicClusteringTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ /*
+ * make sure the ordering will be observed when queue merging
+ * happens.
+ */
+ public void testOrderingGroupMergeQueue() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+
+ try
+ {
+ // Objects Server0
+ conn0 = createConnectionOnServer(cf, 0);
+
+ assertEquals(0, getServerId(conn0));
+
+ Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+ conn0.start();
+
+ JBossMessageProducer producer0 = (JBossMessageProducer)session0.createProducer(queue[0]);
+ producer0.enableOrderingGroup(null);
+
+ MessageConsumer consumer0 = session0.createConsumer(queue[0]);
+
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage tmm = session0.createTextMessage("message " + i);
+ producer0.send(tmm, DeliveryMode.PERSISTENT, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ session0.commit();
+
+ TextMessage msg;
+
+ for (int i = 0; i < 5; i++)
+ {
+ msg = (TextMessage)consumer0.receive(5000);
+ session0.commit();
+ assertNotNull(msg);
+ log.info("msg = " + msg.getText());
+ assertEquals("message " + i, msg.getText());
+ }
+
+ log.info("****Closing consumer");
+ consumer0.close();
+
+ // Objects Server1
+ conn1 = createConnectionOnServer(cf, 1);
+
+ assertEquals(1, getServerId(conn1));
+
+ conn1.start();
+
+ Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer1 = session1.createProducer(queue[1]);
+
+ producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i = 10; i < 20; i++)
+ {
+ producer1.send(session0.createTextMessage("message " + i));
+ }
+
+ // At this point there should be 5 messages on the node 0 queue (5-9)
+ // and 10 messages on the node 1 queue (10-19)
+
+ ServerManagement.kill(1);
+
+ consumer0 = session0.createConsumer(queue[0]);
+
+ for (int i = 5; i < 20; i++)
+ {
+ msg = (TextMessage)consumer0.receive(5000);
+ assertNotNull(msg);
+ session0.commit();
+ log.info("msg = " + msg.getText());
+ assertEquals("message " + i, msg.getText());
+ }
+
+ assertNull(consumer0.receive(5000));
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ /*
+ * make sure the ordering group works when failover happens.
+ */
+ public void testOrderingKillFailoverNode() throws Exception
+ {
+ testKillFailoverNode(false);
+ }
+
+ /*
+ * make sure the ordering group works when failover happens.
+ */
+ public void testOrderingKillFailoverNodeTx() throws Exception
+ {
+ testKillFailoverNode(true);
+ }
+
+ /*
+ * Make sure the ordering group works when message sucking happens.
+ */
+ public void testOnSuckPersistent() throws Exception
+ {
+ testMessageReceiveOnSuck(true);
+ }
+
+ /*
+ * Make sure the ordering group works when message sucking happens.
+ */
+ public void testOnSuckNonPersistent() throws Exception
+ {
+ testMessageReceiveOnSuck(false);
+ }
+
+ private void testMessageReceiveOnSuck(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+ conn0 = this.createConnectionOnServer(cf, 0);
+ conn1 = this.createConnectionOnServer(cf, 1);
+ conn2 = this.createConnectionOnServer(cf, 2);
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+
+ conn0.start();
+ conn2.start();
+
+ // Send at node 0
+ JBossMessageProducer prod0 = (JBossMessageProducer)sess0.createProducer(queue[0]);
+ prod0.enableOrderingGroup(null);
+
+ int delMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ for (int i = 0; i < 100; i++)
+ {
+ TextMessage tms = sess0.createTextMessage("suckedmessage" + i);
+ prod0.send(tms, delMode, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ for (int j = 0; j < 100; j++)
+ {
+ TextMessage rms = (TextMessage)cons2.receive(1000);
+ assertNotNull(rms);
+ assertEquals("suckedmessage" + j, rms.getText());
+ }
+
+ TextMessage nulMsg = (TextMessage)cons2.receive(5000);
+ assertNull(nulMsg);
+
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+ private void testKillFailoverNode(boolean transactional) throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+ Connection conn1 = createConnectionOnServer(factory, 1);
+
+ try
+ {
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+
+ Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod1 = (JBossMessageProducer)sessSend.createProducer(queue[1]);
+ prod1.enableOrderingGroup(null);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod1.send(tm, DeliveryMode.PERSISTENT, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+
+ log.info("Sent " + tm.getJMSMessageID());
+ }
+
+ Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED
+ : Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+
+ conn1.start();
+
+ TextMessage tm = null;
+
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ tm = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ if (transactional)
+ {
+ sess1.commit();
+ }
+ else
+ {
+ tm.acknowledge();
+ }
+ }
+
+ // Don't ack
+ tm = (TextMessage)cons1.receive(2000);
+ assertNotNull(tm);
+ // assertEquals("message5", tm.getText());
+ String gropname = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORDERING_GROUP_ID);
+
+ // We kill the failover node for node 1
+ int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+
+ log.info("Killing failover node:" + failoverNodeId);
+
+ ServerManagement.kill(failoverNodeId);
+
+ log.info("Killed failover node");
+
+ Thread.sleep(8000);
+
+ // Now kill node 1
+
+ failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+
+ log.info("Failover node id is now " + failoverNodeId);
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ log.info("Waiting for failover to complete");
+
+ while (true)
+ {
+ FailoverEvent event = failoverListener.getEvent(30000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ log.info("Failover completed");
+
+ assertEquals(failoverNodeId, getServerId(conn1));
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+
+ Message moreMsg = cons1.receive(2000);
+ assertNull(moreMsg);
+
+ // Now ack
+ if (transactional)
+ {
+ sess1.commit();
+ }
+ else
+ {
+ tm.acknowledge();
+ }
+
+ log.info("acked");
+
+ sess1.close();
+
+ log.info("closed");
+
+ sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("created new session");
+
+ cons1 = sess1.createConsumer(queue[1]);
+
+ log.info("Created consumer");
+
+ // the remaining messages should be received.
+ for (int i = numMessages / 2 + 1; i < numMessages; i++)
+ {
+ tm = (TextMessage)cons1.receive(2000);
+ assertNotNull(tm);
+ assertEquals("message" + i, tm.getText());
+ }
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java 2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,286 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.message;
+
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.test.messaging.jms.JMSTestCase;
+
+/**
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ */
+public class JMSOrderingGroupPropertyTest extends JMSTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public JMSOrderingGroupPropertyTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+ /*
+ * Note - this test is testing the order of messages under the control of
+ * ordering group message producers. All messages sent through a order group
+ * message producer obeys strict ordering rule -- they are delivered in the order
+ * they are sent, regardless of priorities.
+ */
+ public void testMessageOrderWithPriority() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ conn.start();
+
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sessSend.createProducer(queue1);
+
+ prod.enableOrderingGroup(null);
+
+ TextMessage m0 = sessSend.createTextMessage("a");
+ TextMessage m1 = sessSend.createTextMessage("b");
+ TextMessage m2 = sessSend.createTextMessage("c");
+ TextMessage m3 = sessSend.createTextMessage("d");
+ TextMessage m4 = sessSend.createTextMessage("e");
+ TextMessage m5 = sessSend.createTextMessage("f");
+ TextMessage m6 = sessSend.createTextMessage("g");
+ TextMessage m7 = sessSend.createTextMessage("h");
+ TextMessage m8 = sessSend.createTextMessage("i");
+ TextMessage m9 = sessSend.createTextMessage("j");
+
+ Random rdm = new Random();
+ int pri = rdm.nextInt(10);
+ prod.send(m0, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m1, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m2, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m3, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m4, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m5, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m6, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m7, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m8, DeliveryMode.NON_PERSISTENT, pri, 0);
+ pri = rdm.nextInt(10);
+ prod.send(m9, DeliveryMode.NON_PERSISTENT, pri, 0);
+
+ // NP messages are sent async so we need to allow them time to all hit the server
+ Thread.sleep(2000);
+
+ Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
+
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("a", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("b", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("c", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("d", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("e", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("f", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("g", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("h", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("i", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("j", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(500);
+ assertNull(t);
+ }
+
+ cons.close();
+
+ conn.close();
+ }
+
+ /*
+ * If messages are sent to a queue with certain priorities, and a consumer is already open
+ * then it is likely that they will be immediately sent to the consumer.
+ * even in this case, ordering group should be strictly obeyed.
+ */
+ public void testMessageOrderWithConsumerBuffering() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ conn.start();
+
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sessSend.createProducer(queue1);
+ prod.enableOrderingGroup(null);
+
+ TextMessage m0 = sessSend.createTextMessage("a");
+ TextMessage m1 = sessSend.createTextMessage("b");
+ TextMessage m2 = sessSend.createTextMessage("c");
+ TextMessage m3 = sessSend.createTextMessage("d");
+ TextMessage m4 = sessSend.createTextMessage("e");
+ TextMessage m5 = sessSend.createTextMessage("f");
+ TextMessage m6 = sessSend.createTextMessage("g");
+ TextMessage m7 = sessSend.createTextMessage("h");
+ TextMessage m8 = sessSend.createTextMessage("i");
+ TextMessage m9 = sessSend.createTextMessage("j");
+
+ Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
+
+ prod.send(m0, DeliveryMode.NON_PERSISTENT, 0, 0);
+ prod.send(m1, DeliveryMode.NON_PERSISTENT, 1, 0);
+ prod.send(m2, DeliveryMode.NON_PERSISTENT, 2, 0);
+ prod.send(m3, DeliveryMode.NON_PERSISTENT, 3, 0);
+ prod.send(m4, DeliveryMode.NON_PERSISTENT, 4, 0);
+ prod.send(m5, DeliveryMode.NON_PERSISTENT, 5, 0);
+ prod.send(m6, DeliveryMode.NON_PERSISTENT, 6, 0);
+ prod.send(m7, DeliveryMode.NON_PERSISTENT, 7, 0);
+ prod.send(m8, DeliveryMode.NON_PERSISTENT, 8, 0);
+ prod.send(m9, DeliveryMode.NON_PERSISTENT, 9, 0);
+
+ // Let them all get there
+
+ Thread.sleep(2000);
+
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("a", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("b", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("c", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("d", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("e", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("f", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("g", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("h", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("i", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("j", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(500);
+ assertNull(t);
+ }
+
+ cons.close();
+
+ conn.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list