[jboss-cvs] JBoss Messaging SVN: r5335 - in branches/Branch_1416_merge: tests/src/org/jboss/test/messaging/jms and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 11 06:24:20 EST 2008


Author: gaohoward
Date: 2008-11-11 06:24:20 -0500 (Tue, 11 Nov 2008)
New Revision: 5335

Added:
   branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
   branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
   branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
   branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java
   branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java
Modified:
   branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
   branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
   branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Log:
JBMESSAGING-1416 code cleanup


Modified: branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-11-11 11:05:11 UTC (rev 5334)
+++ branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-11-11 11:24:20 UTC (rev 5335)
@@ -46,8 +46,9 @@
 
    // Attributes ----------------------------------------------------
    private OrderingList<ReferenceHolder> sortedList = new OrderingList<ReferenceHolder>();
+
    private HashMap<Long, ReferenceHolder> refMap = new HashMap<Long, ReferenceHolder>();
-   
+
    private String groupName;
 
    // Static --------------------------------------------------------
@@ -58,7 +59,7 @@
    {
       groupName = name;
    }
-   
+
    private OrderingGroup()
    {
    }
@@ -76,8 +77,8 @@
       ReferenceHolder holder = refMap.get(mid);
       if (holder != null)
       {
-         //is there a case where the ref can be registered more than once?
-         //if not, we simply remove the addRef().
+         // is there a case where the ref can be registered more than once?
+         // if not, we simply remove the addRef().
          holder.addRef();
          return true;
       }
@@ -105,11 +106,11 @@
    public int isAvailable(MessageReference ref)
    {
       ReferenceHolder holder = sortedList.getFirst();
-      if (holder == null) 
+      if (holder == null)
       {
          return OrderingGroupMonitor.OK;
       }
-      
+
       return holder.isAvailable(ref);
    }
 
@@ -147,12 +148,12 @@
       {
          if (holder.isPending())
          {
-            //true if the sortedList has more to offer.
+            // true if the sortedList has more to offer.
             result = sortedList.size() > 1;
          }
          else
          {
-            //means we still have the first un-sent.
+            // means we still have the first un-sent.
             result = true;
          }
       }
@@ -228,7 +229,7 @@
          elem = iter.next();
          if (elem.compareTo(ref) <= 0)
          {
-            //should be before this elem
+            // should be before this elem
             break;
          }
          index++;
@@ -278,14 +279,13 @@
 
 class ReferenceHolder implements Comparable<ReferenceHolder>
 {
-   private static final Logger log = Logger.getLogger(ReferenceHolder.class);
 
    private Long seq;
 
    private MessageReference ref;
-   
+
    private long refCount;
-   
+
    private long pendingSentCount;
 
    public ReferenceHolder(MessageReference r) throws JMSException
@@ -344,7 +344,7 @@
       }
       return refCount;
    }
-   
+
    /**
     * decrease the ref count
     * here we don't care about pendingSentCount here.

Modified: branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2008-11-11 11:05:11 UTC (rev 5334)
+++ branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2008-11-11 11:24:20 UTC (rev 5335)
@@ -26,7 +26,6 @@
 import java.util.Iterator;
 
 import javax.jms.JMSException;
-import javax.jms.TextMessage;
 
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.logging.Logger;

Modified: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2008-11-11 11:05:11 UTC (rev 5334)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2008-11-11 11:24:20 UTC (rev 5335)
@@ -41,6 +41,8 @@
  * ConnectionConsumer tests
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -550,41 +552,41 @@
    }
    
    
-   class MockServerSessionPool implements ServerSessionPool
+}
+
+class MockServerSession implements ServerSession
+{
+   Session session;
+   
+   MockServerSession(Session sess)
    {
-      private ServerSession serverSession;
-      
-      MockServerSessionPool(Session sess)
-      {
-         serverSession = new MockServerSession(sess);
-      }
-
-      public ServerSession getServerSession() throws JMSException
-      {
-         return serverSession;
-      }      
+      this.session = sess;
    }
    
-   class MockServerSession implements ServerSession
+
+   public Session getSession() throws JMSException
    {
-      Session session;
-      
-      MockServerSession(Session sess)
-      {
-         this.session = sess;
-      }
-      
+      return session;
+   }
 
-      public Session getSession() throws JMSException
-      {
-         return session;
-      }
-
-      public void start() throws JMSException
-      {
-         session.run();
-      }
-      
+   public void start() throws JMSException
+   {
+      session.run();
    }
    
 }
+
+class MockServerSessionPool implements ServerSessionPool
+{
+   private ServerSession serverSession;
+   
+   MockServerSessionPool(Session sess)
+   {
+      serverSession = new MockServerSession(sess);
+   }
+
+   public ServerSession getServerSession() throws JMSException
+   {
+      return serverSession;
+   }      
+}

Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java	                        (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java	2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,510 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.jms.tx.MessagingXid;
+import org.jboss.jms.tx.ResourceManagerFactory;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
+
+/**
+ * A OrderingGroupAckTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Oct 28, 2008 2:30:18 PM
+ *
+ *
+ */
+public class OrderingGroupAckTest extends JMSTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   public OrderingGroupAckTest(String name)
+   {
+      super(name);
+   }
+
+   // TestCase overrides -------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      // if this is not set testMockCoordinatorRecoveryWithJBossTSXids will create an invalid ObjectStore
+      ServiceContainer.setupObjectStoreDir();
+      super.setUp();
+   }
+
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+
+      ResourceManagerFactory.instance.clear();
+   }
+
+   // Public --------------------------------------------------------
+
+   /*
+    * This test shows how ordering group delivery handles transactions.
+    * A rollback will cause the first message to be re-delivered.
+    * A commit will cause the second message to be available for delivery.
+    */
+   public void testRollbackCommit() throws Exception
+   {
+      QueueConnection conn = null;
+
+      try
+      {
+         conn = cf.createQueueConnection();
+         QueueSession sess = conn.createQueueSession(true, 0);
+         JBossMessageProducer producer = (JBossMessageProducer)sess.createProducer(queue1);
+         producer.enableOrderingGroup(null);
+
+         QueueReceiver cons = sess.createReceiver(queue1);
+
+         conn.start();
+
+         Message m1 = sess.createTextMessage("testing1");
+         Message m2 = sess.createTextMessage("testing2");
+         producer.send(m1);
+         producer.send(m2);
+
+         sess.commit();
+
+         TextMessage mr = (TextMessage)cons.receive(3000);
+         assertNotNull(mr);
+         assertEquals("testing1", mr.getText());
+
+         sess.rollback();
+
+         mr = (TextMessage)cons.receive(3000);
+         assertNotNull(mr);
+         assertEquals("testing1", mr.getText());
+
+         // second message cannot be received
+         // if the first message is not committed.
+         mr = (TextMessage)cons.receive(3000);
+         assertNull(mr);
+
+         sess.commit();
+
+         mr = (TextMessage)cons.receive(3000);
+         assertNotNull(mr);
+         assertEquals("testing2", mr.getText());
+
+         sess.commit();
+
+         checkEmpty(queue1);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   /*
+    * This test shows how ordering group handles client acknowledge.
+    * the second message will never be sent out unless the first message is acked.
+    */
+   public void testClientAcknowledge() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         conn = cf.createConnection();
+
+         Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         JBossMessageProducer producer = (JBossMessageProducer)producerSess.createProducer(queue1);
+         producer.enableOrderingGroup(null);
+
+         Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = consumerSess.createConsumer(queue1);
+         conn.start();
+
+         final int NUM_MSG = 100;
+
+         // Send some messages
+         for (int i = 0; i < 100; ++i)
+         {
+            TextMessage tm = producerSess.createTextMessage("ordering" + i);
+            producer.send(tm);
+         }
+
+         assertRemainingMessages(NUM_MSG);
+
+         log.trace("Sent messages");
+
+         int count = 0;
+         while (true)
+         {
+            Message m = consumer.receive(400);
+            if (m == null)
+               break;
+            count++;
+         }
+
+         assertRemainingMessages(NUM_MSG);
+
+         log.trace("Received " + count + " messages");
+
+         // if ordering group, count should be 1.
+         assertEquals(1, count);
+
+         consumerSess.recover();
+
+         assertRemainingMessages(NUM_MSG);
+
+         log.trace("Session recover called");
+
+         TextMessage m = null;
+
+         int i = 0;
+         for (; i < 100; ++i)
+         {
+            m = (TextMessage)consumer.receive();
+            log.trace("Received message " + i);
+            m.acknowledge();
+            assertTrue(m.getText().equals("ordering" + i));
+         }
+
+         assertRemainingMessages(0);
+
+         // make sure I don't receive anything else
+
+         checkEmpty(queue1);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   /*
+    * send 4 messages, start a XA transaction to receive, messages will be received only if
+    * the last message is committed. 
+    */
+   public void testSimpleXATransactionalReceive() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalReceive");
+
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      try
+      {
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         // Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         // Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         // next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         // prepare the tx
+         res1.prepare(xid1);
+
+         res1.commit(xid1, false);
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm2.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm3.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm4.getText());
+
+         checkEmpty(queue1);
+
+         conn1.close();
+
+         xconn1.close();
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+      }
+   }
+
+   /*
+    * send 4 messages, start a XA transaction to receive. 
+    * In XA recovery, the messages will be re-sent without
+    * breaking the original order
+    */
+   public void testSimpleXATransactionalRecoveryCommitReceive() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalRecoveryCommitReceive");
+
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      XAConnection xconn2 = null;
+
+      try
+      {
+         // First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         // non-persistent will cause message lost in server failure
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRecoveryCommitReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         // Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         // Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         // prepare the tx
+
+         res1.prepare(xid1);
+
+         conn1.close();
+
+         xconn1.close();
+
+         conn1 = null;
+
+         xconn1 = null;
+
+         // Now "crash" the server
+
+         ServerManagement.stopServerPeer();
+
+         ServerManagement.startServerPeer();
+
+         deployAndLookupAdministeredObjects();
+
+         // Now recover
+
+         xconn2 = cf.createXAConnection();
+
+         XASession xsess2 = xconn2.createXASession();
+
+         XAResource res3 = xsess2.getXAResource();
+
+         Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = res3.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+
+         assertEquals(xid1, xids[0]);
+
+         // Commit the tx
+
+         res3.commit(xids[0], false);
+
+         // The message should be acknowldged
+
+         xconn2.close();
+
+         conn1 = cf.createConnection();
+
+         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+         conn1.start();
+
+         // tm2, tm3, tm4 should be received.
+         TextMessage tmr = (TextMessage)cons1.receive(1000);
+         assertNotNull(tmr);
+         assertEquals(tmr.getText(), tm2.getText());
+
+         tmr = (TextMessage)cons1.receive(1000);
+         assertNotNull(tmr);
+         assertEquals(tmr.getText(), tm3.getText());
+
+         tmr = (TextMessage)cons1.receive(1000);
+         assertNotNull(tmr);
+         assertEquals(tmr.getText(), tm4.getText());
+
+         checkEmpty(queue1);
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               // Ignore
+            }
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java	                        (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java	2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,466 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
+import org.jboss.jms.client.JBossConnectionConsumer;
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A OrderingGroupConnectionConsumerTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Oct 29, 2008 2:45:38 PM
+ *
+ *
+ */
+public class OrderingGroupConnectionConsumerTest extends JMSTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   public OrderingGroupConnectionConsumerTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+      mList.clear();
+   }
+
+   /*
+    * Make sure the ordering group messages are received in order
+    * thru a ConnectionConsumer.
+    */
+   public void testSimpleReceive() throws Exception
+   {
+      if (ServerManagement.isRemote())
+         return;
+
+      Connection consumerConn = null;
+
+      Connection producerConn = null;
+
+      try
+      {
+         consumerConn = cf.createConnection();
+
+         consumerConn.start();
+
+         Session sessCons1 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         OrderingGroupMessageListener listener = new OrderingGroupMessageListener(this);
+
+         sessCons1.setMessageListener(listener);
+
+         ServerSessionPool pool = new OrderingServerSessionPool(sessCons1);
+
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1,
+                                                                                                     null,
+                                                                                                     pool,
+                                                                                                     5);
+
+         producerConn = cf.createConnection();
+
+         Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+         prod.enableOrderingGroup(null);
+
+         forceGC();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage m = sessProd.createTextMessage("testing" + i);
+            prod.send(m, Message.DEFAULT_DELIVERY_MODE, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         // waiting enough time to allow delivery complete.
+         msgLatch.attempt(10000);
+
+         // check the order
+         assertEquals(NUM_MESSAGES, mList.size());
+
+         for (int i = 0; i < NUM_MESSAGES; ++i)
+         {
+            TextMessage txm = mList.get(i);
+            assertEquals(txm.getText(), "testing" + i);
+         }
+
+         // allow consumer thread gracefully shutdown
+         doze(3000);
+
+         cc.close();
+
+         consumerConn.close();
+         consumerConn = null;
+         producerConn.close();
+         producerConn = null;
+      }
+      finally
+      {
+         if (consumerConn != null)
+            consumerConn.close();
+         if (producerConn != null)
+            producerConn.close();
+
+      }
+   }
+
+   /**
+    * @param i
+    */
+   private void doze(long nt)
+   {
+      try
+      {
+         Thread.sleep(nt);
+      }
+      catch (InterruptedException e)
+      {
+      }
+   }
+
+   /*
+    * Make sure the ordering group messages are received in order
+    * thru a ConnectionConsumer in transaction mode.
+    */
+
+   public void testTransactedReceive() throws Exception
+   {
+      if (ServerManagement.isRemote())
+         return;
+
+      Connection consumerConn = null;
+
+      Connection producerConn = null;
+
+      try
+      {
+         consumerConn = cf.createConnection();
+
+         consumerConn.start();
+
+         Session sessCons1 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+
+         TxOrderingGroupMessageListener listener1 = new TxOrderingGroupMessageListener(this, sessCons1);
+
+         sessCons1.setMessageListener(listener1);
+
+         ServerSessionPool pool = new MockServerSessionPool(sessCons1);
+
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1,
+                                                                                                     null,
+                                                                                                     pool,
+                                                                                                     5);
+
+         producerConn = cf.createConnection();
+
+         Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+         prod.enableOrderingGroup(null);
+
+         forceGC();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage m = sessProd.createTextMessage("testing" + i);
+            prod.send(m, Message.DEFAULT_DELIVERY_MODE, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         // waiting enough time to allow delivery complete.
+         msgLatch.attempt(10000);
+
+         // check the order
+         assertEquals(NUM_MESSAGES, mList.size());
+
+         for (int i = 0; i < NUM_MESSAGES; ++i)
+         {
+            TextMessage txm = mList.get(i);
+            assertEquals(txm.getText(), "testing" + i);
+         }
+
+         doze(3000);
+
+         cc.close();
+
+         consumerConn.close();
+         consumerConn = null;
+         producerConn.close();
+         producerConn = null;
+      }
+      finally
+      {
+         if (consumerConn != null)
+            consumerConn.close();
+         if (producerConn != null)
+            producerConn.close();
+
+      }
+   }
+
+   /*
+    * Make sure the ordering group messages are received in order
+    * thru a ConnectionConsumer in transaction mode, by two listeners.
+    */
+   public void testTransactedConcurrentReceive() throws Exception
+   {
+      if (ServerManagement.isRemote())
+         return;
+
+      Connection consumerConn = null;
+
+      Connection producerConn = null;
+
+      try
+      {
+         consumerConn = cf.createConnection();
+
+         consumerConn.start();
+
+         Session sessCons1 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+         Session sessCons2 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+
+         TxOrderingGroupMessageListener listener1 = new TxOrderingGroupMessageListener(this, sessCons1);
+         TxOrderingGroupMessageListener listener2 = new TxOrderingGroupMessageListener(this, sessCons2);
+
+         sessCons1.setMessageListener(listener1);
+         sessCons2.setMessageListener(listener2);
+
+         OrderingServerSessionPool pool = new OrderingServerSessionPool(sessCons1, sessCons2);
+
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1,
+                                                                                                     null,
+                                                                                                     pool,
+                                                                                                     5);
+
+         producerConn = cf.createConnection();
+
+         Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+         prod.enableOrderingGroup(null);
+
+         forceGC();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage m = sessProd.createTextMessage("testing" + i);
+            prod.send(m, Message.DEFAULT_DELIVERY_MODE, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         // waiting enough time to allow delivery complete.
+         msgLatch.attempt(10000);
+
+         // check the order
+         assertEquals(NUM_MESSAGES, mList.size());
+
+         for (int i = 0; i < NUM_MESSAGES; ++i)
+         {
+            TextMessage txm = mList.get(i);
+            assertEquals(txm.getText(), "testing" + i);
+         }
+
+         doze(3000);
+
+         cc.close();
+
+         Session nSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer nCon = nSess.createConsumer(queue1);
+
+         Message mx = nCon.receive(500);
+         assertNull(mx);
+
+         consumerConn.close();
+         consumerConn = null;
+         producerConn.close();
+         producerConn = null;
+      }
+      finally
+      {
+         if (consumerConn != null)
+            consumerConn.close();
+         if (producerConn != null)
+            producerConn.close();
+
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   Latch msgLatch = new Latch();
+
+   ArrayList<TextMessage> mList = new ArrayList<TextMessage>();
+
+   final int NUM_MESSAGES = 100;
+
+   // Inner classes -------------------------------------------------
+   class OrderingGroupMessageListener implements MessageListener
+   {
+
+      OrderingGroupConnectionConsumerTest owner;
+
+      OrderingGroupMessageListener(OrderingGroupConnectionConsumerTest theTest)
+      {
+         owner = theTest;
+      }
+
+      public synchronized void onMessage(Message message)
+      {
+         try
+         {
+            owner.addReceived((TextMessage)message);
+         }
+         catch (Exception e)
+         {
+            log.error(e);
+         }
+      }
+   }
+
+   class TxOrderingGroupMessageListener implements MessageListener
+   {
+
+      OrderingGroupConnectionConsumerTest owner;
+
+      long counter = 0;
+
+      Session sessRef;
+
+      TxOrderingGroupMessageListener(OrderingGroupConnectionConsumerTest theTest, Session sess)
+      {
+         owner = theTest;
+         sessRef = sess;
+      }
+
+      public synchronized void onMessage(Message message)
+      {
+         try
+         {
+            // roll back once for every 5 messages
+            if (counter % 5 == 0)
+            {
+               sessRef.rollback();
+            }
+            else
+            {
+               owner.addReceived((TextMessage)message);
+               sessRef.commit();
+            }
+            counter++;
+         }
+         catch (Exception e)
+         {
+            log.error(e);
+         }
+      }
+   }
+
+   class OrderingServerSessionPool implements ServerSessionPool
+   {
+      private ServerSession serverSession1;
+
+      private ServerSession serverSession2;
+
+      private long counter;
+
+      OrderingServerSessionPool(Session sess1, Session sess2)
+      {
+         serverSession1 = new MockServerSession(sess1);
+         serverSession2 = new MockServerSession(sess2);
+         counter = 0L;
+      }
+
+      /**
+       * @param sessCons1
+       */
+      public OrderingServerSessionPool(Session sessCons1)
+      {
+         serverSession1 = new MockServerSession(sessCons1);
+         serverSession2 = null;
+      }
+
+      public synchronized ServerSession getServerSession() throws JMSException
+      {
+         if (serverSession2 == null)
+         {
+            return serverSession1;
+         }
+
+         ServerSession result = serverSession2;
+         if (counter % 2 == 0)
+         {
+            result = serverSession1;
+         }
+         counter++;
+         return result;
+      }
+   }
+
+   /**
+    * here we do not synchronize because this is called from onMessage().
+    * It is guaranteed that next message won't arrive until the previous 
+    * message processing is finished (meaning onMessage() returns in auto-ack mode)
+    */
+   public void addReceived(TextMessage message)
+   {
+      mList.add(message);
+      if (mList.size() == NUM_MESSAGES)
+      {
+         msgLatch.release();
+      }
+   }
+
+}

Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java	                        (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java	2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,499 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A OrderingGroupMiscTest
+ *
+ * @author HowardGao
+ * 
+ * Created Oct 31, 2008 10:44:48 AM
+ *
+ *
+ */
+public class OrderingGroupMiscTest extends JMSTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private HashMap<String, ArrayList<TextMessage>> recvBuffer = new HashMap<String, ArrayList<TextMessage>>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   public OrderingGroupMiscTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   /*
+    * Sending 5 messages and letting the 3rd and 5th messages go to dlq and 
+    * the others (1st, 2nd and 4th) should go to the receiver
+    */
+   public void testOrderingWithDLQ() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         return;
+      }
+
+      final int NUM_MESSAGES = 5;
+
+      final int MAX_DELIVERIES = 8;
+
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+      String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";
+
+      Connection conn = null;
+
+      try
+      {
+         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+         ServerManagement.setAttribute(serverPeerObjectName,
+                                       "DefaultMaxDeliveryAttempts",
+                                       String.valueOf(MAX_DELIVERIES));
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+         conn = cf.createConnection();
+
+         {
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            JBossMessageProducer prod = (JBossMessageProducer)sess.createProducer(queue1);
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+
+               prod.send(tm);
+            }
+
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            MessageConsumer cons = sess2.createConsumer(queue1);
+
+            conn.start();
+
+            // first
+            TextMessage rm1 = (TextMessage)cons.receive(1000);
+            assertNotNull(rm1);
+            assertEquals("Message:0", rm1.getText());
+            rm1.acknowledge();
+
+            // second
+            TextMessage rm2 = (TextMessage)cons.receive(1000);
+            assertNotNull(rm2);
+            assertEquals("Message:1", rm2.getText());
+            rm2.acknowledge();
+
+            // third, leaving a hole
+            for (int i = 0; i < MAX_DELIVERIES; i++)
+            {
+               TextMessage rm3 = (TextMessage)cons.receive(1000);
+               assertNotNull(rm3);
+               assertEquals("Message:2", rm3.getText());
+               sess2.recover();
+            }
+
+            // fourth
+            TextMessage rm4 = (TextMessage)cons.receive(1000);
+            assertNotNull(rm4);
+            assertEquals("Message:3", rm4.getText());
+            rm4.acknowledge();
+
+            // fifth, leaving another hole
+            for (int i = 0; i < MAX_DELIVERIES; i++)
+            {
+               TextMessage rm5 = (TextMessage)cons.receive(1000);
+               assertNotNull(rm5);
+               assertEquals("Message:4", rm5.getText());
+               sess2.recover();
+            }
+
+            TextMessage rmx = (TextMessage)cons.receive(1000);
+            assertNull(rmx);
+
+            // At this point all the messages have been delivered exactly MAX_DELIVERIES times
+
+            checkEmpty(queue1);
+
+            // Now should be in default dlq
+            MessageConsumer cons3 = sess.createConsumer(queue2);
+
+            TextMessage dm3 = (TextMessage)cons3.receive(1000);
+            assertNotNull(dm3);
+            assertEquals("Message:2", dm3.getText());
+
+            TextMessage dm5 = (TextMessage)cons3.receive(1000);
+            assertNotNull(dm5);
+            assertEquals("Message:4", dm5.getText());
+
+            conn.close();
+         }
+      }
+      finally
+      {
+         ServerManagement.setAttribute(serverPeerObjectName,
+                                       "DefaultDLQ",
+                                       "jboss.messaging.destination:service=Queue,name=DLQ");
+
+         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   /*
+    * Sending 5 messages and letting the 3rd and 5th messages expire and 
+    * the others (1st, 2nd and 4th) should go to the receiver
+    */
+   public void testOrderingWithExpiryQueue() throws Exception
+   {
+      final int NUM_MESSAGES = 5;
+
+      Connection conn = null;
+
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+      try
+      {
+         ServerManagement.deployQueue("DefaultExpiry");
+
+         ServerManagement.deployQueue("TestOrderingQueue");
+
+         String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=DefaultExpiry";
+
+         String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=TestOrderingQueue";
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+
+         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", "");
+
+         Queue testQueue = (Queue)ic.lookup("/queue/TestOrderingQueue");
+
+         Queue defaultExpiry = (Queue)ic.lookup("/queue/DefaultExpiry");
+
+         conn = cf.createConnection();
+
+         {
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            JBossMessageProducer prod = (JBossMessageProducer)sess.createProducer(testQueue);
+
+            conn.start();
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+
+               if (i == 2 || i == 4)
+               {
+                  // Send messages with time to live of 2000 enough time to get to client consumer - so
+                  // they won't be expired on the server side
+                  prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+               }
+               else
+               {
+                  prod.send(tm);
+               }
+            }
+
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            MessageConsumer cons = sess2.createConsumer(testQueue);
+
+            // The messages should now be sitting in the consumer buffer
+            // Now give them enough time to expire
+            Thread.sleep(2500);
+
+            // this moment, only first message is delivered but waiting for ack.
+            // 3rd and 5th message still in queue, but when they are delivered
+            // they will be expired and won't be received by this consumer.
+            TextMessage rm1 = (TextMessage)cons.receive(1000);
+            assertNotNull(rm1);
+            assertEquals("Message:0", rm1.getText());
+            rm1.acknowledge();
+
+            TextMessage rm2 = (TextMessage)cons.receive(1000);
+            assertNotNull(rm2);
+            assertEquals("Message:1", rm2.getText());
+            rm2.acknowledge();
+
+            TextMessage rm3 = (TextMessage)cons.receive(1000);
+            assertNotNull(rm3);
+            assertEquals("Message:3", rm3.getText());
+            rm3.acknowledge();
+
+            TextMessage rm4 = (TextMessage)cons.receive(1000);
+            assertNull(rm4);
+
+            // Message should all be in the default expiry queue - let's check
+
+            MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+
+            TextMessage dm1 = (TextMessage)cons3.receive(1000);
+            assertNotNull(dm1);
+            assertEquals("Message:2", dm1.getText());
+
+            TextMessage dm2 = (TextMessage)cons3.receive(1000);
+            assertNotNull(dm2);
+            assertEquals("Message:4", dm2.getText());
+
+            conn.close();
+         }
+
+      }
+      finally
+      {
+         ServerManagement.setAttribute(serverPeerObjectName,
+                                       "DefaultExpiryQueue",
+                                       "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
+
+         ServerManagement.undeployQueue("DefaultExpiry");
+
+         ServerManagement.undeployQueue("TestOrderingQueue");
+
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   /*
+    * First send 2 normal messages, then send 10 ordering messages with some priority and 
+    * then disable ordering, then send 2 more normal messages with high
+    * priority. Make sure the normal messages are received first
+    * and the ordered messages are received later but ordered.
+    */
+   public void testOrderingGroupOnOff() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         conn = cf.createConnection();
+
+         Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         JBossMessageProducer producer = (JBossMessageProducer)producerSess.createProducer(queue1);
+
+         conn.start();
+
+         TextMessage tmNormal1 = producerSess.createTextMessage("NoOrdering-1");
+         producer.send(tmNormal1, DeliveryMode.PERSISTENT, 6, Message.DEFAULT_TIME_TO_LIVE);
+         TextMessage tmNormal2 = producerSess.createTextMessage("NoOrdering-2");
+         producer.send(tmNormal2, DeliveryMode.PERSISTENT, 7, Message.DEFAULT_TIME_TO_LIVE);
+
+         producer.enableOrderingGroup(null);
+         // sending out ordering messages with priorities ranging from 0 to 5;
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tm = producerSess.createTextMessage("Ordering" + i);
+            producer.send(tm, DeliveryMode.PERSISTENT, i % 6, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         producer.disableOrderingGroup();
+
+         TextMessage tmNormal3 = producerSess.createTextMessage("NoOrdering-3");
+         producer.send(tmNormal3, DeliveryMode.PERSISTENT, 8, Message.DEFAULT_TIME_TO_LIVE);
+         TextMessage tmNormal4 = producerSess.createTextMessage("NoOrdering-4");
+         producer.send(tmNormal4, DeliveryMode.PERSISTENT, 9, Message.DEFAULT_TIME_TO_LIVE);
+
+         Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = consumerSess.createConsumer(queue1);
+
+         TextMessage rmNormal = (TextMessage)consumer.receive(1000);
+         assertNotNull(rmNormal);
+         assertEquals("NoOrdering-4", rmNormal.getText());
+
+         rmNormal = (TextMessage)consumer.receive(1000);
+         assertNotNull(rmNormal);
+         assertEquals("NoOrdering-3", rmNormal.getText());
+
+         rmNormal = (TextMessage)consumer.receive(1000);
+         assertNotNull(rmNormal);
+         assertEquals("NoOrdering-2", rmNormal.getText());
+
+         rmNormal = (TextMessage)consumer.receive(1000);
+         assertNotNull(rmNormal);
+         assertEquals("NoOrdering-1", rmNormal.getText());
+
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage rm = (TextMessage)consumer.receive(1000);
+            assertNotNull(rm);
+            assertEquals("Ordering" + i, rm.getText());
+            rm.acknowledge();
+         }
+
+         assertNull(consumer.receive(1000));
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   /*
+    * create 10 ordering groups, each sending 100 messages
+    * make sure the order of each group is guaranteed.
+    */
+   public void testMultipleOrderingGroups() throws Exception
+   {
+
+      final int NUM_PRODUCERS = 10;
+      final int NUM_MSG = 100;
+      JBossMessageProducer[] prods = new JBossMessageProducer[NUM_PRODUCERS];
+      Connection conn = null;
+
+      try
+      {
+         conn = cf.createConnection();
+
+         Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         for (int i = 0; i < NUM_PRODUCERS; i++)
+         {
+            prods[i] = (JBossMessageProducer)producerSess.createProducer(queue1);
+            prods[i].enableOrderingGroup(null);
+         }
+
+         Session consumerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = consumerSess.createConsumer(queue1);
+         conn.start();
+
+         // Send some messages
+         for (int i = 0; i < NUM_PRODUCERS; i++)
+         {
+            String key = prepareReceivingBuffer(i);
+            for (int j = 0; j < NUM_MSG; j++)
+            {
+               TextMessage tm = producerSess.createTextMessage(key + ":" + j);
+               prods[i].send(tm, DeliveryMode.PERSISTENT, j % 10, Message.DEFAULT_TIME_TO_LIVE);
+            }
+         }
+
+         assertEquals(NUM_PRODUCERS, recvBuffer.size());
+
+         log.trace("Sent messages");
+
+         while (true)
+         {
+            TextMessage rm = (TextMessage)consumer.receive(1000);
+            if (rm == null)
+               break;
+            putToBuffer(rm);
+         }
+
+         for (int i = 0; i < NUM_PRODUCERS; ++i)
+         {
+            String key = "ordering-" + i;
+            ArrayList<TextMessage> group = recvBuffer.get(key);
+            assertEquals(NUM_MSG, group.size());
+            for (int j = 0; j < NUM_MSG; ++j)
+            {
+               TextMessage rm = group.get(j);
+               assertEquals(key + ":" + j, rm.getText());
+            }
+         }
+
+         // make sure I don't receive anything else
+
+         checkEmpty(queue1);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * @param rm
+    * @throws JMSException 
+    */
+   private void putToBuffer(TextMessage rm) throws JMSException
+   {
+      String text = rm.getText();
+      String[] tokens = text.split(":");
+      String key = tokens[0];
+      ArrayList<TextMessage> group = recvBuffer.get(key);
+      group.add(rm);
+   }
+
+   /**
+    * initialize a buffer for receiving ordering group messages.
+    * @param i
+    */
+   private String prepareReceivingBuffer(int i)
+   {
+      String key = "ordering-" + i;
+      ArrayList<TextMessage> grpBuffer = recvBuffer.get(key);
+      if (grpBuffer == null)
+      {
+         grpBuffer = new ArrayList<TextMessage>();
+         recvBuffer.put(key, grpBuffer);
+      }
+      return key;
+   }
+
+   // Inner classes -------------------------------------------------
+}

Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java	                        (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java	2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,427 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * OrderingGroupBasicClusteringTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Nov 4, 2008 10:30:22 AM
+ *
+ *
+ */
+public class OrderingGroupBasicClusteringTest extends ClusteringTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private String queueName = "testDistributedQueue";
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   public OrderingGroupBasicClusteringTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   /*
+    * make sure the ordering will be observed when queue merging
+    * happens.
+    */
+   public void testOrderingGroupMergeQueue() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+
+      try
+      {
+         // Objects Server0
+         conn0 = createConnectionOnServer(cf, 0);
+
+         assertEquals(0, getServerId(conn0));
+
+         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+         conn0.start();
+
+         JBossMessageProducer producer0 = (JBossMessageProducer)session0.createProducer(queue[0]);
+         producer0.enableOrderingGroup(null);
+
+         MessageConsumer consumer0 = session0.createConsumer(queue[0]);
+
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tmm = session0.createTextMessage("message " + i);
+            producer0.send(tmm, DeliveryMode.PERSISTENT, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         session0.commit();
+
+         TextMessage msg;
+
+         for (int i = 0; i < 5; i++)
+         {
+            msg = (TextMessage)consumer0.receive(5000);
+            session0.commit();
+            assertNotNull(msg);
+            log.info("msg = " + msg.getText());
+            assertEquals("message " + i, msg.getText());
+         }
+
+         log.info("****Closing consumer");
+         consumer0.close();
+
+         // Objects Server1
+         conn1 = createConnectionOnServer(cf, 1);
+
+         assertEquals(1, getServerId(conn1));
+
+         conn1.start();
+
+         Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer1 = session1.createProducer(queue[1]);
+
+         producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         for (int i = 10; i < 20; i++)
+         {
+            producer1.send(session0.createTextMessage("message " + i));
+         }
+
+         // At this point there should be 5 messages on the node 0 queue (5-9)
+         // and 10 messages on the node 1 queue (10-19)
+
+         ServerManagement.kill(1);
+
+         consumer0 = session0.createConsumer(queue[0]);
+
+         for (int i = 5; i < 20; i++)
+         {
+            msg = (TextMessage)consumer0.receive(5000);
+            assertNotNull(msg);
+            session0.commit();
+            log.info("msg = " + msg.getText());
+            assertEquals("message " + i, msg.getText());
+         }
+
+         assertNull(consumer0.receive(5000));
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+   }
+
+   /*
+    * make sure the ordering group works when failover happens.
+    */
+   public void testOrderingKillFailoverNode() throws Exception
+   {
+      testKillFailoverNode(false);
+   }
+
+   /*
+    * make sure the ordering group works when failover happens.
+    */
+   public void testOrderingKillFailoverNodeTx() throws Exception
+   {
+      testKillFailoverNode(true);
+   }
+
+   /*
+    * Make sure the ordering group works when message sucking happens.
+    */
+   public void testOnSuckPersistent() throws Exception
+   {
+      testMessageReceiveOnSuck(true);
+   }
+
+   /*
+    * Make sure the ordering group works when message sucking happens.
+    */
+   public void testOnSuckNonPersistent() throws Exception
+   {
+      testMessageReceiveOnSuck(false);
+   }
+
+   private void testMessageReceiveOnSuck(boolean persistent) throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+
+      try
+      {
+         conn0 = this.createConnectionOnServer(cf, 0);
+         conn1 = this.createConnectionOnServer(cf, 1);
+         conn2 = this.createConnectionOnServer(cf, 2);
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+
+         conn0.start();
+         conn2.start();
+
+         // Send at node 0
+         JBossMessageProducer prod0 = (JBossMessageProducer)sess0.createProducer(queue[0]);
+         prod0.enableOrderingGroup(null);
+
+         int delMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+         for (int i = 0; i < 100; i++)
+         {
+            TextMessage tms = sess0.createTextMessage("suckedmessage" + i);
+            prod0.send(tms, delMode, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         for (int j = 0; j < 100; j++)
+         {
+            TextMessage rms = (TextMessage)cons2.receive(1000);
+            assertNotNull(rms);
+            assertEquals("suckedmessage" + j, rms.getText());
+         }
+
+         TextMessage nulMsg = (TextMessage)cons2.receive(5000);
+         assertNull(nulMsg);
+
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   private void testKillFailoverNode(boolean transactional) throws Exception
+   {
+      JBossConnectionFactory factory = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+      Connection conn1 = createConnectionOnServer(factory, 1);
+
+      try
+      {
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+
+         Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod1 = (JBossMessageProducer)sessSend.createProducer(queue[1]);
+         prod1.enableOrderingGroup(null);
+
+         final int numMessages = 10;
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+
+            prod1.send(tm, DeliveryMode.PERSISTENT, i % 10, Message.DEFAULT_TIME_TO_LIVE);
+
+            log.info("Sent " + tm.getJMSMessageID());
+         }
+
+         Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED
+                                                                         : Session.CLIENT_ACKNOWLEDGE);
+
+         MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+
+         conn1.start();
+
+         TextMessage tm = null;
+
+         for (int i = 0; i < numMessages / 2; i++)
+         {
+            tm = (TextMessage)cons1.receive(2000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+            if (transactional)
+            {
+               sess1.commit();
+            }
+            else
+            {
+               tm.acknowledge();
+            }
+         }
+
+         // Don't ack
+         tm = (TextMessage)cons1.receive(2000);
+         assertNotNull(tm);
+         // assertEquals("message5", tm.getText());
+         String gropname = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORDERING_GROUP_ID);
+
+         // We kill the failover node for node 1
+         int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+
+         int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+         assertEquals(0, recoveryMapSize);
+         Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+
+         log.info("Killing failover node:" + failoverNodeId);
+
+         ServerManagement.kill(failoverNodeId);
+
+         log.info("Killed failover node");
+
+         Thread.sleep(8000);
+
+         // Now kill node 1
+
+         failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+         assertEquals(0, recoveryMapSize);
+         recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+
+         log.info("Failover node id is now " + failoverNodeId);
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         log.info("Waiting for failover to complete");
+
+         while (true)
+         {
+            FailoverEvent event = failoverListener.getEvent(30000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         log.info("Failover completed");
+
+         assertEquals(failoverNodeId, getServerId(conn1));
+
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+         assertEquals(0, recoveryMapSize);
+         recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+
+         Message moreMsg = cons1.receive(2000);
+         assertNull(moreMsg);
+
+         // Now ack
+         if (transactional)
+         {
+            sess1.commit();
+         }
+         else
+         {
+            tm.acknowledge();
+         }
+
+         log.info("acked");
+
+         sess1.close();
+
+         log.info("closed");
+
+         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         log.info("created new session");
+
+         cons1 = sess1.createConsumer(queue[1]);
+
+         log.info("Created consumer");
+
+         // the remaining messages should be received.
+         for (int i = numMessages / 2 + 1; i < numMessages; i++)
+         {
+            tm = (TextMessage)cons1.receive(2000);
+            assertNotNull(tm);
+            assertEquals("message" + i, tm.getText());
+         }
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+
+}

Copied: branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java (from rev 5331, branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java)
===================================================================
--- branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java	                        (rev 0)
+++ branches/Branch_1416_merge/tests/src/org/jboss/test/messaging/jms/message/JMSOrderingGroupPropertyTest.java	2008-11-11 11:24:20 UTC (rev 5335)
@@ -0,0 +1,286 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.message;
+
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.test.messaging.jms.JMSTestCase;
+
+/**
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ */
+public class JMSOrderingGroupPropertyTest extends JMSTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   public JMSOrderingGroupPropertyTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+   /*
+    * Note - this test is testing the order of messages under the control of 
+    * ordering group message producers. All messages sent through a order group
+    * message producer obeys strict ordering rule -- they are delivered in the order
+    * they are sent, regardless of priorities.
+    */
+   public void testMessageOrderWithPriority() throws Exception
+   {
+      Connection conn = cf.createConnection();
+
+      conn.start();
+
+      Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      JBossMessageProducer prod = (JBossMessageProducer)sessSend.createProducer(queue1);
+
+      prod.enableOrderingGroup(null);
+
+      TextMessage m0 = sessSend.createTextMessage("a");
+      TextMessage m1 = sessSend.createTextMessage("b");
+      TextMessage m2 = sessSend.createTextMessage("c");
+      TextMessage m3 = sessSend.createTextMessage("d");
+      TextMessage m4 = sessSend.createTextMessage("e");
+      TextMessage m5 = sessSend.createTextMessage("f");
+      TextMessage m6 = sessSend.createTextMessage("g");
+      TextMessage m7 = sessSend.createTextMessage("h");
+      TextMessage m8 = sessSend.createTextMessage("i");
+      TextMessage m9 = sessSend.createTextMessage("j");
+
+      Random rdm = new Random();
+      int pri = rdm.nextInt(10);
+      prod.send(m0, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m1, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m2, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m3, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m4, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m5, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m6, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m7, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m8, DeliveryMode.NON_PERSISTENT, pri, 0);
+      pri = rdm.nextInt(10);
+      prod.send(m9, DeliveryMode.NON_PERSISTENT, pri, 0);
+
+      // NP messages are sent async so we need to allow them time to all hit the server
+      Thread.sleep(2000);
+
+      Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageConsumer cons = sessReceive.createConsumer(queue1);
+
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("a", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("b", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("c", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("d", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("e", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("f", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("g", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("h", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("i", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("j", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(500);
+         assertNull(t);
+      }
+
+      cons.close();
+
+      conn.close();
+   }
+
+   /*
+    * If messages are sent to a queue with certain priorities, and a consumer is already open
+    * then it is likely that they will be immediately sent to the consumer.
+    * even in this case, ordering group should be strictly obeyed.
+    */
+   public void testMessageOrderWithConsumerBuffering() throws Exception
+   {
+      Connection conn = cf.createConnection();
+
+      conn.start();
+
+      Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      JBossMessageProducer prod = (JBossMessageProducer)sessSend.createProducer(queue1);
+      prod.enableOrderingGroup(null);
+
+      TextMessage m0 = sessSend.createTextMessage("a");
+      TextMessage m1 = sessSend.createTextMessage("b");
+      TextMessage m2 = sessSend.createTextMessage("c");
+      TextMessage m3 = sessSend.createTextMessage("d");
+      TextMessage m4 = sessSend.createTextMessage("e");
+      TextMessage m5 = sessSend.createTextMessage("f");
+      TextMessage m6 = sessSend.createTextMessage("g");
+      TextMessage m7 = sessSend.createTextMessage("h");
+      TextMessage m8 = sessSend.createTextMessage("i");
+      TextMessage m9 = sessSend.createTextMessage("j");
+
+      Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageConsumer cons = sessReceive.createConsumer(queue1);
+
+      prod.send(m0, DeliveryMode.NON_PERSISTENT, 0, 0);
+      prod.send(m1, DeliveryMode.NON_PERSISTENT, 1, 0);
+      prod.send(m2, DeliveryMode.NON_PERSISTENT, 2, 0);
+      prod.send(m3, DeliveryMode.NON_PERSISTENT, 3, 0);
+      prod.send(m4, DeliveryMode.NON_PERSISTENT, 4, 0);
+      prod.send(m5, DeliveryMode.NON_PERSISTENT, 5, 0);
+      prod.send(m6, DeliveryMode.NON_PERSISTENT, 6, 0);
+      prod.send(m7, DeliveryMode.NON_PERSISTENT, 7, 0);
+      prod.send(m8, DeliveryMode.NON_PERSISTENT, 8, 0);
+      prod.send(m9, DeliveryMode.NON_PERSISTENT, 9, 0);
+
+      // Let them all get there
+
+      Thread.sleep(2000);
+
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("a", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("b", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("c", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("d", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("e", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("f", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("g", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("h", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("i", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(1000);
+         assertNotNull(t);
+         assertEquals("j", t.getText());
+      }
+      {
+         TextMessage t = (TextMessage)cons.receive(500);
+         assertNull(t);
+      }
+
+      cons.close();
+
+      conn.close();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}




More information about the jboss-cvs-commits mailing list