[jboss-cvs] JBoss Messaging SVN: r6063 - trunk/tests/src/org/jboss/messaging/tests/integration/client.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 11 12:21:32 EDT 2009
Author: ataylor
Date: 2009-03-11 12:21:32 -0400 (Wed, 11 Mar 2009)
New Revision: 6063
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/client/AutoGroupClientTest.java
Log:
some end to end tests - more to follow
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/client/AutoGroupClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/AutoGroupClientTest.java 2009-03-11 16:09:16 UTC (rev 6062)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/AutoGroupClientTest.java 2009-03-11 16:21:32 UTC (rev 6063)
@@ -1,262 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.client;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.Messaging;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.utils.SimpleString;
-
-import java.util.concurrent.CountDownLatch;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class AutoGroupClientTest extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(AutoGroupClientTest.class);
-
- public void testGroupIdAutomaticallySet() throws Exception
- {
- final SimpleString QUEUE = new SimpleString("testGroupQueue");
- AddressSettings qs = new AddressSettings();
- qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-
- Configuration conf = new ConfigurationImpl();
-
- conf.setSecurityEnabled(false);
-
- conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
-
- MessagingService messagingService = Messaging.newNullStorageMessagingService(conf);
-
- messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
- messagingService.start();
-
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
- sf.setAutoGroup(true);
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final CountDownLatch latch = new CountDownLatch(100);
-
- MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
- MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
-
- log.info("creating consuimer");
-
- ClientConsumer consumer = session.createConsumer(QUEUE);
- log.info("created consumer");
- consumer.setMessageHandler(myMessageHandler);
- ClientConsumer consumer2 = session.createConsumer(QUEUE);
- consumer2.setMessageHandler(myMessageHandler2);
-
- session.start();
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.getBody().writeString("testINVMCoreClient");
- message.setDurable(false);
- producer.send(message);
- }
- latch.await();
-
- session.close();
-
- messagingService.stop();
-
- assertEquals(myMessageHandler.messagesReceived, 100);
- assertEquals(myMessageHandler2.messagesReceived, 0);
- }
-
- public void testGroupIdAutomaticallySetMultipleProducers() throws Exception
- {
- final SimpleString QUEUE = new SimpleString("testGroupQueue");
- AddressSettings qs = new AddressSettings();
- qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-
- Configuration conf = new ConfigurationImpl();
-
- conf.setSecurityEnabled(false);
-
- conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
-
- MessagingService messagingService = Messaging.newNullStorageMessagingService(conf);
-
- messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
- messagingService.start();
-
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
- sf.setAutoGroup(true);
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
- ClientProducer producer2 = session.createProducer(QUEUE);
-
- final CountDownLatch latch = new CountDownLatch(200);
-
- MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
- MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
- MyMessageHandler myMessageHandler3 = new MyMessageHandler(latch);
-
- ClientConsumer consumer = session.createConsumer(QUEUE);
- consumer.setMessageHandler(myMessageHandler);
- ClientConsumer consumer2 = session.createConsumer(QUEUE);
- consumer2.setMessageHandler(myMessageHandler2);
- ClientConsumer consumer3 = session.createConsumer(QUEUE);
- consumer3.setMessageHandler(myMessageHandler3);
-
- session.start();
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.getBody().writeString("testINVMCoreClient");
- producer.send(message);
- }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.getBody().writeString("testINVMCoreClient");
- producer2.send(message);
- }
- latch.await();
-
- session.close();
-
- messagingService.stop();
-
- assertEquals(myMessageHandler.messagesReceived, 100);
- assertEquals(myMessageHandler2.messagesReceived, 100);
- assertEquals(myMessageHandler3.messagesReceived, 0);
- }
-
- public void testGroupIdAutomaticallyNotSet() throws Exception
- {
- final SimpleString QUEUE = new SimpleString("testGroupQueue");
- AddressSettings qs = new AddressSettings();
- qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
- Configuration conf = new ConfigurationImpl();
-
- conf.setSecurityEnabled(false);
-
- conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
-
- MessagingService messagingService = Messaging.newNullStorageMessagingService(conf);
- messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
- messagingService.start();
-
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final CountDownLatch latch = new CountDownLatch(100);
-
- MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
- MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
-
- ClientConsumer consumer = session.createConsumer(QUEUE);
- consumer.setMessageHandler(myMessageHandler);
- ClientConsumer consumer2 = session.createConsumer(QUEUE);
- consumer2.setMessageHandler(myMessageHandler2);
-
- session.start();
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.getBody().writeString("testINVMCoreClient");
- message.setDurable(false);
- producer.send(message);
- }
- latch.await();
-
- session.close();
-
- messagingService.stop();
-
- assertEquals(myMessageHandler.messagesReceived, 50);
- assertEquals(myMessageHandler2.messagesReceived, 50);
- }
-
-
- private static class MyMessageHandler implements MessageHandler
- {
- volatile int messagesReceived = 0;
-
- private final CountDownLatch latch;
-
- public MyMessageHandler(CountDownLatch latch)
- {
- this.latch = latch;
- }
-
- public void onMessage(ClientMessage message)
- {
- messagesReceived++;
- try
- {
- message.acknowledge();
- }
- catch (MessagingException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- latch.countDown();
- }
- }
-}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java 2009-03-11 16:21:32 UTC (rev 6063)
@@ -0,0 +1,359 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientEndToEndTest extends ServiceTestBase
+{
+ public final SimpleString addressA = new SimpleString("addressA");
+ public final SimpleString queueA = new SimpleString("queueA");
+ private final SimpleString groupTestQ = new SimpleString("testGroupQueue");;
+
+ /*ackbatchSize tests*/
+
+ /*
+ * tests that wed don't acknowledge until the correct ackBatchSize is reached
+ * */
+ public void testAckBatchSize() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientMessage message = sendSession.createClientMessage(false);
+ //we need to set the destination so we can calculate the encodesize correctly
+ message.setDestination(addressA);
+ int encodeSize = message.getEncodeSize();
+ int numMessages = 100;
+ cf.setAckBatchSize(numMessages * encodeSize);
+ cf.setBlockOnAcknowledge(true);
+ ClientSession session = cf.createSession(false, true, true);
+ session.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ for(int i = 0 ; i < numMessages; i ++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+
+ ClientConsumer consumer = session.createConsumer(queueA);
+ session.start();
+ for(int i = 0; i < numMessages - 1; i++)
+ {
+ ClientMessage m = consumer.receive(5000);
+ m.acknowledge();
+ }
+
+ ClientMessage m = consumer.receive(5000);
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(numMessages, q.getDeliveringCount());
+ m.acknowledge();
+ assertEquals(0, q.getDeliveringCount());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if(messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ /*
+ * tests that when the ackBatchSize is 0 we ack every message directly
+ * */
+ public void testAckBatchSizeZero() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientMessage message = sendSession.createClientMessage(false);
+ message.setDestination(addressA);
+ int numMessages = 100;
+ cf.setAckBatchSize(0);
+ cf.setBlockOnAcknowledge(true);
+ ClientSession session = cf.createSession(false, true, true);
+ session.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ for(int i = 0 ; i < numMessages; i ++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+
+ ClientConsumer consumer = session.createConsumer(queueA);
+ session.start();
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ ClientMessage[] messages = new ClientMessage[numMessages];
+ for(int i = 0; i < numMessages; i++)
+ {
+ messages[i] = consumer.receive(5000);
+ assertNotNull(messages[i]);
+ }
+ for(int i = 0; i < numMessages; i++)
+ {
+ messages[i].acknowledge();
+ assertEquals(numMessages - i - 1, q.getDeliveringCount());
+ }
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if(messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ /* auto group id tests*/
+
+ /*
+ * tests when the autogroupid is set only 1 consumer (out of 2) gets all the messages from a single producer
+ * */
+ public void testGroupIdAutomaticallySet() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ AddressSettings qs = new AddressSettings();
+ qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
+ messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
+ messagingService.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setAutoGroup(true);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(groupTestQ, groupTestQ, null, false, false);
+
+ ClientProducer producer = session.createProducer(groupTestQ);
+
+ final CountDownLatch latch = new CountDownLatch(100);
+
+ MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+ MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+
+ ClientConsumer consumer = session.createConsumer(groupTestQ);
+ consumer.setMessageHandler(myMessageHandler);
+ ClientConsumer consumer2 = session.createConsumer(groupTestQ);
+ consumer2.setMessageHandler(myMessageHandler2);
+
+ session.start();
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(session.createClientMessage(false));
+ }
+ latch.await();
+
+ session.close();
+
+ assertEquals(myMessageHandler.messagesReceived, 100);
+ assertEquals(myMessageHandler2.messagesReceived, 0);
+ }
+ finally
+ {
+ if(messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+
+ }
+
+ /*
+ * tests when the autogroupid is set only 2 consumers (out of 3) gets all the messages from 2 producers
+ * */
+ public void testGroupIdAutomaticallySetMultipleProducers() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ AddressSettings qs = new AddressSettings();
+ qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
+ messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
+ messagingService.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setAutoGroup(true);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(groupTestQ, groupTestQ, null, false, false);
+
+ ClientProducer producer = session.createProducer(groupTestQ);
+ ClientProducer producer2 = session.createProducer(groupTestQ);
+
+ final CountDownLatch latch = new CountDownLatch(200);
+
+ MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+ MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+ MyMessageHandler myMessageHandler3 = new MyMessageHandler(latch);
+
+ ClientConsumer consumer = session.createConsumer(groupTestQ);
+ consumer.setMessageHandler(myMessageHandler);
+ ClientConsumer consumer2 = session.createConsumer(groupTestQ);
+ consumer2.setMessageHandler(myMessageHandler2);
+ ClientConsumer consumer3 = session.createConsumer(groupTestQ);
+ consumer3.setMessageHandler(myMessageHandler3);
+
+ session.start();
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(session.createClientMessage(false));
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer2.send(session.createClientMessage(false));
+ }
+ latch.await();
+
+ session.close();
+
+ assertEquals(myMessageHandler.messagesReceived, 100);
+ assertEquals(myMessageHandler2.messagesReceived, 100);
+ assertEquals(myMessageHandler3.messagesReceived, 0);
+ }
+ finally
+ {
+ if(messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+
+ }
+
+ /*
+ * tests that even tho we have an grouping round robin distributor we don't pin the consumer as autogroup is false
+ * */
+ public void testGroupIdAutomaticallyNotSet() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ AddressSettings qs = new AddressSettings();
+ qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
+
+ messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
+ messagingService.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(groupTestQ, groupTestQ, null, false, false);
+
+ ClientProducer producer = session.createProducer(groupTestQ);
+
+ final CountDownLatch latch = new CountDownLatch(100);
+
+ MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+ MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+
+ ClientConsumer consumer = session.createConsumer(groupTestQ);
+ consumer.setMessageHandler(myMessageHandler);
+ ClientConsumer consumer2 = session.createConsumer(groupTestQ);
+ consumer2.setMessageHandler(myMessageHandler2);
+
+ session.start();
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(session.createClientMessage(false));
+ }
+ latch.await();
+
+ session.close();
+
+ assertEquals(myMessageHandler.messagesReceived, 50);
+ assertEquals(myMessageHandler2.messagesReceived, 50);
+ }
+ finally
+ {
+ if(messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+
+ }
+
+ private static class MyMessageHandler implements MessageHandler
+ {
+ volatile int messagesReceived = 0;
+
+ private final CountDownLatch latch;
+
+ public MyMessageHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ messagesReceived++;
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ e.printStackTrace();
+ }
+ latch.countDown();
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list