[jboss-cvs] JBoss Messaging SVN: r5199 - branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 29 05:25:57 EDT 2008


Author: gaohoward
Date: 2008-10-29 05:25:57 -0400 (Wed, 29 Oct 2008)
New Revision: 5199

Added:
   branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
Modified:
   branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Log:
JBMESSAGING-1416


Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2008-10-29 05:50:58 UTC (rev 5198)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2008-10-29 09:25:57 UTC (rev 5199)
@@ -550,41 +550,41 @@
    }
    
    
-   class MockServerSessionPool implements ServerSessionPool
+}
+
+class MockServerSession implements ServerSession
+{
+   Session session;
+   
+   MockServerSession(Session sess)
    {
-      private ServerSession serverSession;
-      
-      MockServerSessionPool(Session sess)
-      {
-         serverSession = new MockServerSession(sess);
-      }
-
-      public ServerSession getServerSession() throws JMSException
-      {
-         return serverSession;
-      }      
+      this.session = sess;
    }
    
-   class MockServerSession implements ServerSession
+
+   public Session getSession() throws JMSException
    {
-      Session session;
-      
-      MockServerSession(Session sess)
-      {
-         this.session = sess;
-      }
-      
+      return session;
+   }
 
-      public Session getSession() throws JMSException
-      {
-         return session;
-      }
-
-      public void start() throws JMSException
-      {
-         session.run();
-      }
-      
+   public void start() throws JMSException
+   {
+      session.run();
    }
    
 }
+
+class MockServerSessionPool implements ServerSessionPool
+{
+   private ServerSession serverSession;
+   
+   MockServerSessionPool(Session sess)
+   {
+      serverSession = new MockServerSession(sess);
+   }
+
+   public ServerSession getServerSession() throws JMSException
+   {
+      return serverSession;
+   }      
+}

Added: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java	2008-10-29 09:25:57 UTC (rev 5199)
@@ -0,0 +1,222 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
+import org.jboss.jms.client.JBossConnectionConsumer;
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A OrderingGroupConnectionConsumerTest
+ *
+ * @author howard
+ * 
+ * Created Oct 29, 2008 2:45:38 PM
+ *
+ *
+ */
+public class OrderingGroupConnectionConsumerTest extends JMSTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   public OrderingGroupConnectionConsumerTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+   
+   public void tearDown() throws Exception
+   {     
+      super.tearDown();
+      mList.clear();
+   }
+
+   /*
+    * Make sure the ordering group messages are received in order
+    * thru a ConnectionConsumer.
+    */
+   public void testSimpleReceive() throws Exception
+   {
+      if (ServerManagement.isRemote()) return;
+
+      Connection consumerConn = null;
+
+      Connection producerConn = null;
+
+      try
+      {
+         consumerConn = cf.createConnection();
+
+         consumerConn.start();
+
+         Session sessCons1 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessCons2 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         OrderingGroupMessageListener listener = new OrderingGroupMessageListener(this);
+
+         sessCons1.setMessageListener(listener);
+         sessCons2.setMessageListener(listener);
+
+         ServerSessionPool pool = new OrderingServerSessionPool(sessCons1, sessCons2);
+
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1, null, pool, 1);
+
+         producerConn = cf.createConnection();
+
+         Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+         prod.enableOrderingGroup(null);
+
+         forceGC();
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage m = sessProd.createTextMessage("testing" + i);
+            prod.send(m, Message.DEFAULT_DELIVERY_MODE, i%10, Message.DEFAULT_TIME_TO_LIVE);
+         }
+
+         //waiting enough time to allow delivery complete.
+         msgLatch.attempt(10000);
+         
+         //check the order
+         assertEquals(mList.size(), NUM_MESSAGES);
+         
+         for (int i = 0; i < NUM_MESSAGES; ++i)
+         {
+            TextMessage txm = mList.get(i);
+            assertEquals(txm.getText(), "testing" + i);
+         }
+
+         cc.close();
+
+         consumerConn.close();
+         consumerConn = null;
+         producerConn.close();
+         producerConn = null;
+      }
+      finally
+      {
+         if (consumerConn != null) consumerConn.close();
+         if (producerConn != null) producerConn.close();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   Latch msgLatch = new Latch();
+   ArrayList<TextMessage> mList = new ArrayList<TextMessage>();
+   final int NUM_MESSAGES = 100;
+
+   // Inner classes -------------------------------------------------
+   class OrderingGroupMessageListener implements MessageListener
+   {
+      
+      OrderingGroupConnectionConsumerTest owner;
+       
+      OrderingGroupMessageListener(OrderingGroupConnectionConsumerTest theTest)
+      {
+         owner = theTest;
+      }
+      
+      public synchronized void onMessage(Message message)
+      {
+         try
+         {
+            owner.addReceived((TextMessage)message);
+         }
+         catch (Exception e)
+         {
+            log.error(e);
+         }
+      }
+   }
+
+   class OrderingServerSessionPool implements ServerSessionPool
+   {
+      private ServerSession serverSession1;
+      private ServerSession serverSession2;
+      private long flag;
+      
+      OrderingServerSessionPool(Session sess1, Session sess2)
+      {
+         serverSession1 = new MockServerSession(sess1);
+         serverSession2 = new MockServerSession(sess2);
+         flag = 0L;
+      }
+
+      public synchronized ServerSession getServerSession() throws JMSException
+      {
+         flag++;
+         if (flag%2 == 0)
+         {
+            return serverSession1;
+         }
+         return serverSession2;
+      }      
+   }
+
+   /**
+    * here we do not synchronize because this is called from onMessage().
+    * It is guaranteed that next message won't arrive until the previous 
+    * message processing is finished (meaning onMessage() returns in auto-ack mode)
+    */
+   public void addReceived(TextMessage message)
+   {
+      mList.add(message);
+      if (mList.size() == NUM_MESSAGES)
+      {
+         msgLatch.release();
+      }
+   }
+
+}




More information about the jboss-cvs-commits mailing list