[jboss-cvs] JBoss Messaging SVN: r5199 - branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 29 05:25:57 EDT 2008
Author: gaohoward
Date: 2008-10-29 05:25:57 -0400 (Wed, 29 Oct 2008)
New Revision: 5199
Added:
branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
Modified:
branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
Log:
JBMESSAGING-1416
Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2008-10-29 05:50:58 UTC (rev 5198)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2008-10-29 09:25:57 UTC (rev 5199)
@@ -550,41 +550,41 @@
}
- class MockServerSessionPool implements ServerSessionPool
+}
+
+class MockServerSession implements ServerSession
+{
+ Session session;
+
+ MockServerSession(Session sess)
{
- private ServerSession serverSession;
-
- MockServerSessionPool(Session sess)
- {
- serverSession = new MockServerSession(sess);
- }
-
- public ServerSession getServerSession() throws JMSException
- {
- return serverSession;
- }
+ this.session = sess;
}
- class MockServerSession implements ServerSession
+
+ public Session getSession() throws JMSException
{
- Session session;
-
- MockServerSession(Session sess)
- {
- this.session = sess;
- }
-
+ return session;
+ }
- public Session getSession() throws JMSException
- {
- return session;
- }
-
- public void start() throws JMSException
- {
- session.run();
- }
-
+ public void start() throws JMSException
+ {
+ session.run();
}
}
+
+class MockServerSessionPool implements ServerSessionPool
+{
+ private ServerSession serverSession;
+
+ MockServerSessionPool(Session sess)
+ {
+ serverSession = new MockServerSession(sess);
+ }
+
+ public ServerSession getServerSession() throws JMSException
+ {
+ return serverSession;
+ }
+}
Added: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java (rev 0)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java 2008-10-29 09:25:57 UTC (rev 5199)
@@ -0,0 +1,222 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
+import org.jboss.jms.client.JBossConnectionConsumer;
+import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A OrderingGroupConnectionConsumerTest
+ *
+ * @author howard
+ *
+ * Created Oct 29, 2008 2:45:38 PM
+ *
+ *
+ */
+public class OrderingGroupConnectionConsumerTest extends JMSTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public OrderingGroupConnectionConsumerTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ mList.clear();
+ }
+
+ /*
+ * Make sure the ordering group messages are received in order
+ * thru a ConnectionConsumer.
+ */
+ public void testSimpleReceive() throws Exception
+ {
+ if (ServerManagement.isRemote()) return;
+
+ Connection consumerConn = null;
+
+ Connection producerConn = null;
+
+ try
+ {
+ consumerConn = cf.createConnection();
+
+ consumerConn.start();
+
+ Session sessCons1 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sessCons2 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ OrderingGroupMessageListener listener = new OrderingGroupMessageListener(this);
+
+ sessCons1.setMessageListener(listener);
+ sessCons2.setMessageListener(listener);
+
+ ServerSessionPool pool = new OrderingServerSessionPool(sessCons1, sessCons2);
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1, null, pool, 1);
+
+ producerConn = cf.createConnection();
+
+ Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+ prod.enableOrderingGroup(null);
+
+ forceGC();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage m = sessProd.createTextMessage("testing" + i);
+ prod.send(m, Message.DEFAULT_DELIVERY_MODE, i%10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ //waiting enough time to allow delivery complete.
+ msgLatch.attempt(10000);
+
+ //check the order
+ assertEquals(mList.size(), NUM_MESSAGES);
+
+ for (int i = 0; i < NUM_MESSAGES; ++i)
+ {
+ TextMessage txm = mList.get(i);
+ assertEquals(txm.getText(), "testing" + i);
+ }
+
+ cc.close();
+
+ consumerConn.close();
+ consumerConn = null;
+ producerConn.close();
+ producerConn = null;
+ }
+ finally
+ {
+ if (consumerConn != null) consumerConn.close();
+ if (producerConn != null) producerConn.close();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+ Latch msgLatch = new Latch();
+ ArrayList<TextMessage> mList = new ArrayList<TextMessage>();
+ final int NUM_MESSAGES = 100;
+
+ // Inner classes -------------------------------------------------
+ class OrderingGroupMessageListener implements MessageListener
+ {
+
+ OrderingGroupConnectionConsumerTest owner;
+
+ OrderingGroupMessageListener(OrderingGroupConnectionConsumerTest theTest)
+ {
+ owner = theTest;
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ try
+ {
+ owner.addReceived((TextMessage)message);
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ }
+ }
+ }
+
+ class OrderingServerSessionPool implements ServerSessionPool
+ {
+ private ServerSession serverSession1;
+ private ServerSession serverSession2;
+ private long flag;
+
+ OrderingServerSessionPool(Session sess1, Session sess2)
+ {
+ serverSession1 = new MockServerSession(sess1);
+ serverSession2 = new MockServerSession(sess2);
+ flag = 0L;
+ }
+
+ public synchronized ServerSession getServerSession() throws JMSException
+ {
+ flag++;
+ if (flag%2 == 0)
+ {
+ return serverSession1;
+ }
+ return serverSession2;
+ }
+ }
+
+ /**
+ * here we do not synchronize because this is called from onMessage().
+ * It is guaranteed that next message won't arrive until the previous
+ * message processing is finished (meaning onMessage() returns in auto-ack mode)
+ */
+ public void addReceived(TextMessage message)
+ {
+ mList.add(message);
+ if (mList.size() == NUM_MESSAGES)
+ {
+ msgLatch.release();
+ }
+ }
+
+}
More information about the jboss-cvs-commits
mailing list