[jboss-cvs] JBoss Messaging SVN: r6155 - trunk/tests/src/org/jboss/messaging/tests/integration/client.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 25 05:55:13 EDT 2009
Author: ataylor
Date: 2009-03-25 05:55:12 -0400 (Wed, 25 Mar 2009)
New Revision: 6155
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageCounterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientQueueBrowserTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
Log:
repackaged tests
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-25 01:00:01 UTC (rev 6154)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-25 09:55:12 UTC (rev 6155)
@@ -27,8 +27,6 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.Queue;
@@ -70,343 +68,7 @@
super.tearDown();
}
- public void testSimpleConsumerBrowser() throws Exception
- {
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
- sf.setBlockOnNonPersistentSend(true);
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
-
- assertEquals("m" + i, message2.getBody().readString());
- }
-
- consumer.close();
-
- consumer = session.createConsumer(QUEUE, null, true);
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
-
- assertEquals("m" + i, message2.getBody().readString());
- }
-
- consumer.close();
-
- session.close();
-
- }
-
- public void testMessageCounter() throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
-
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnPersistentSend(true);
-
- ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
-
- session.commit();
- session.start();
-
- assertEquals(100, getMessageCount(messagingService.getServer().getPostOffice(), QUEUE.toString()));
-
- ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- assertNotNull(message);
- message.acknowledge();
-
- session.commit();
-
- assertEquals("m" + i, message.getBody().readString());
- }
-
- session.close();
-
- assertEquals(0, getMessageCount(messagingService.getServer().getPostOffice(), QUEUE.toString()));
-
- }
-
- public void testConsumerBrowserWithSelector() throws Exception
- {
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("x"), i);
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("x >= 50"), true);
-
- for (int i = 50; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
-
- assertEquals("m" + i, message2.getBody().readString());
- }
-
- consumer.close();
-
- consumer = session.createConsumer(QUEUE, null, true);
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
-
- assertEquals("m" + i, message2.getBody().readString());
- }
-
- consumer.close();
-
- session.close();
-
- }
-
- public void testConsumerBrowserWithStringSelector() throws Exception
- {
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- if (i % 2 == 0)
- {
- message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
- }
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("color = 'RED'"), true);
-
- for (int i = 0; i < numMessages; i += 2)
- {
- ClientMessage message2 = consumer.receive(1000);
-
- assertEquals("m" + i, message2.getBody().readString());
- }
-
- session.close();
-
- }
-
- public void testConsumerMultipleBrowser() throws Exception
- {
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
- ClientConsumer consumer2 = session.createConsumer(QUEUE, null, true);
- ClientConsumer consumer3 = session.createConsumer(QUEUE, null, true);
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- message2 = consumer2.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- message2 = consumer3.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
-
- session.close();
-
- }
-
- public void testConsumerMultipleBrowserWithSelector() throws Exception
- {
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("x"), i);
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("x < 50"), true);
- ClientConsumer consumer2 = session.createConsumer(QUEUE, new SimpleString("x >= 50"), true);
- ClientConsumer consumer3 = session.createConsumer(QUEUE, null, true);
-
- for (int i = 0; i < 50; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
- for (int i = 50; i < numMessages; i++)
- {
- ClientMessage message2 = consumer2.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer3.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
-
- session.close();
-
- }
-
- public void testConsumerBrowserMessages() throws Exception
- {
- testConsumerBrowserMessagesArentAcked(false);
- }
-
- public void testConsumerBrowserMessagesPreACK() throws Exception
- {
- testConsumerBrowserMessagesArentAcked(false);
- }
-
- private void testConsumerBrowserMessagesArentAcked(boolean preACK) throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(null, null, false, true, true, preACK, 0);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
-
- assertEquals("m" + i, message2.getBody().readString());
- }
- // assert that all the messages are there and none have been acked
- assertEquals(0,
- ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(100,
- ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
-
- session.close();
- }
-
- public void testConsumerBrowserMessageAckDoesNothing() throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
-
- message2.acknowledge();
-
- assertEquals("m" + i, message2.getBody().readString());
- }
- // assert that all the messages are there and none have been acked
- assertEquals(0,
- ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(100,
- ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
-
- session.close();
- }
-
+
public void testSetMessageHandlerWithMessagesPending() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -467,284 +129,9 @@
session.close();
}
- public void testStopStartConsumerSyncReceiveImmediate() throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
- final ClientSession session = sf.createSession(false, true, true);
- session.createQueue(QUEUE, QUEUE, null, false, false);
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("i"), i);
- producer.send(message);
- }
-
- final ClientConsumer consumer = session.createConsumer(QUEUE);
-
- session.start();
-
-
- for(int i = 0; i < numMessages/2; i++)
- {
- ClientMessage cm = consumer.receive(5000);
- assertNotNull(cm);
- cm.acknowledge();
- }
- session.stop();
- ClientMessage cm = consumer.receiveImmediate();
- assertNull(cm);
-
- session.close();
- }
-
- public void testStopStartConsumerSyncReceive() throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
-
- final ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("i"), i);
- producer.send(message);
- }
-
- final ClientConsumer consumer = session.createConsumer(QUEUE);
-
- session.start();
-
-
- for(int i = 0; i < numMessages/2; i++)
- {
- ClientMessage cm = consumer.receive(5000);
- assertNotNull(cm);
- cm.acknowledge();
- }
- session.stop();
- long time = System.currentTimeMillis();
- ClientMessage cm = consumer.receive(1000);
- long taken = System.currentTimeMillis() - time;
- assertTrue(taken >= 1000);
- assertNull(cm);
-
- session.close();
- }
-
- public void testStopStartConsumerAsyncSync() throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
-
- final ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("i"), i);
- producer.send(message);
- }
-
- final ClientConsumer consumer = session.createConsumer(QUEUE);
-
- Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
- int ccount = q.getConsumerCount();
-
- session.start();
-
- final CountDownLatch latch = new CountDownLatch(10);
-
- // Message should be in consumer
-
- class MyHandler implements MessageHandler
- {
- boolean failed;
-
- boolean started = true;
-
- public void onMessage(final ClientMessage message)
- {
-
- try
- {
- if (!started)
- {
- failed = true;
- }
-
- latch.countDown();
-
- if (latch.getCount() == 0)
- {
-
- message.acknowledge();
- session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
- started = false;
- //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
- }
-
- }
- catch (Exception e)
- {
- }
- }
- }
-
- MyHandler handler = new MyHandler();
-
- consumer.setMessageHandler(handler);
-
- latch.await();
-
- Thread.sleep(100);
-
- assertFalse(handler.failed);
-
- // Make sure no exceptions were thrown from onMessage
- assertNull(consumer.getLastException());
- consumer.setMessageHandler(null);
- session.start();
- for (int i = 0; i < 90; i++)
- {
- ClientMessage msg = consumer.receive(1000);
- ccount = q.getConsumerCount();
- if(msg == null)
- {
- System.out.println("ClientConsumerTest.testStopConsumer");
- }
- assertNotNull("message " + i, msg);
- msg.acknowledge();
- }
-
- assertNull(consumer.receiveImmediate());
-
- session.close();
- }
-
- public void testStopStartConsumerAsyncASync() throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
-
- final ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("i"), i);
- producer.send(message);
- }
-
- final ClientConsumer consumer = session.createConsumer(QUEUE);
-
- Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
- int ccount = q.getConsumerCount();
-
- session.start();
-
- CountDownLatch latch = new CountDownLatch(10);
-
- // Message should be in consumer
-
- class MyHandler implements MessageHandler
- {
- int messageReceived = 0;
- boolean failed;
-
- boolean started = true;
-
- private final CountDownLatch latch;
-
- private boolean stop = true;
-
- public MyHandler(CountDownLatch latch)
- {
- this.latch = latch;
- }
-
- public MyHandler(CountDownLatch latch, boolean stop)
- {
- this(latch);
- this.stop = stop;
- }
-
- public void onMessage(final ClientMessage message)
- {
-
- try
- {
- if (!started)
- {
- failed = true;
- }
- messageReceived++;
- latch.countDown();
-
- if (stop && latch.getCount() == 0)
- {
-
- message.acknowledge();
- session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
- started = false;
- //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
- }
-
- }
- catch (Exception e)
- {
- }
- }
- }
-
- MyHandler handler = new MyHandler(latch);
-
- consumer.setMessageHandler(handler);
-
- latch.await();
-
- Thread.sleep(100);
-
- assertFalse(handler.failed);
-
- // Make sure no exceptions were thrown from onMessage
- assertNull(consumer.getLastException());
- latch = new CountDownLatch(90);
- handler = new MyHandler(latch, false);
- consumer.setMessageHandler(handler);
- session.start();
- assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
-
- Thread.sleep(100);
-
- assertFalse(handler.failed);
- assertNull(consumer.getLastException());
- session.close();
- }
-
-
public void testSetUnsetMessageHandler() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageCounterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageCounterTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageCounterTest.java 2009-03-25 09:55:12 UTC (rev 6155)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class ClientMessageCounterTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
+
+ private MessagingService messagingService;
+
+ private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ messagingService = createService(false);
+
+ messagingService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ messagingService.stop();
+
+ messagingService = null;
+
+ super.tearDown();
+ }
+
+ public void testMessageCounter() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+
+ ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ session.commit();
+ session.start();
+
+ assertEquals(100, getMessageCount(messagingService.getServer().getPostOffice(), QUEUE.toString()));
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertNotNull(message);
+ message.acknowledge();
+
+ session.commit();
+
+ assertEquals("m" + i, message.getBody().readString());
+ }
+
+ session.close();
+
+ assertEquals(0, getMessageCount(messagingService.getServer().getPostOffice(), QUEUE.toString()));
+
+ }
+
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientQueueBrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientQueueBrowserTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientQueueBrowserTest.java 2009-03-25 09:55:12 UTC (rev 6155)
@@ -0,0 +1,361 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class ClientQueueBrowserTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
+
+ private MessagingService messagingService;
+
+ private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ messagingService = createService(false);
+
+ messagingService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ messagingService.stop();
+
+ messagingService = null;
+
+ super.tearDown();
+ }
+
+ public void testSimpleConsumerBrowser() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setBlockOnNonPersistentSend(true);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+
+ consumer.close();
+
+ consumer = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+
+ consumer.close();
+
+ session.close();
+
+ }
+
+
+ public void testConsumerBrowserWithSelector() throws Exception
+ {
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("x"), i);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("x >= 50"), true);
+
+ for (int i = 50; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+
+ consumer.close();
+
+ consumer = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+
+ consumer.close();
+
+ session.close();
+
+ }
+
+ public void testConsumerBrowserWithStringSelector() throws Exception
+ {
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ if (i % 2 == 0)
+ {
+ message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
+ }
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("color = 'RED'"), true);
+
+ for (int i = 0; i < numMessages; i += 2)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+
+ session.close();
+
+ }
+
+ public void testConsumerMultipleBrowser() throws Exception
+ {
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+ ClientConsumer consumer2 = session.createConsumer(QUEUE, null, true);
+ ClientConsumer consumer3 = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ message2 = consumer2.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ message2 = consumer3.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+
+ session.close();
+
+ }
+
+ public void testConsumerMultipleBrowserWithSelector() throws Exception
+ {
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("x"), i);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("x < 50"), true);
+ ClientConsumer consumer2 = session.createConsumer(QUEUE, new SimpleString("x >= 50"), true);
+ ClientConsumer consumer3 = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+ for (int i = 50; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer2.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer3.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+
+ session.close();
+
+ }
+
+ public void testConsumerBrowserMessages() throws Exception
+ {
+ testConsumerBrowserMessagesArentAcked(false);
+ }
+
+ public void testConsumerBrowserMessagesPreACK() throws Exception
+ {
+ testConsumerBrowserMessagesArentAcked(false);
+ }
+
+ private void testConsumerBrowserMessagesArentAcked(boolean preACK) throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(null, null, false, true, true, preACK, 0);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+ // assert that all the messages are there and none have been acked
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(100,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+
+ session.close();
+ }
+
+ public void testConsumerBrowserMessageAckDoesNothing() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ message2.acknowledge();
+
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+ // assert that all the messages are there and none have been acked
+ assertEquals(0,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(100,
+ ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+
+ session.close();
+ }
+
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java 2009-03-25 09:55:12 UTC (rev 6155)
@@ -0,0 +1,347 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientSessionStopStartTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
+
+ private MessagingService messagingService;
+
+ private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ messagingService = createService(false);
+
+ messagingService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ messagingService.stop();
+
+ messagingService = null;
+
+ super.tearDown();
+ }
+
+ public void testStopStartConsumerSyncReceiveImmediate() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+
+ for(int i = 0; i < numMessages/2; i++)
+ {
+ ClientMessage cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+ session.stop();
+ ClientMessage cm = consumer.receiveImmediate();
+ assertNull(cm);
+
+ session.close();
+ }
+
+ public void testStopStartConsumerSyncReceive() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+
+ for(int i = 0; i < numMessages/2; i++)
+ {
+ ClientMessage cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+ session.stop();
+ long time = System.currentTimeMillis();
+ ClientMessage cm = consumer.receive(1000);
+ long taken = System.currentTimeMillis() - time;
+ assertTrue(taken >= 1000);
+ assertNull(cm);
+
+ session.close();
+ }
+
+ public void testStopStartConsumerAsyncSync() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
+ int ccount = q.getConsumerCount();
+
+ session.start();
+
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ boolean failed;
+
+ boolean started = true;
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+
+ latch.countDown();
+
+ if (latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
+ started = false;
+ //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+ }
+
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ consumer.setMessageHandler(null);
+ session.start();
+ for (int i = 0; i < 90; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ ccount = q.getConsumerCount();
+ if(msg == null)
+ {
+ System.out.println("ClientConsumerTest.testStopConsumer");
+ }
+ assertNotNull("message " + i, msg);
+ msg.acknowledge();
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ session.close();
+ }
+
+ public void testStopStartConsumerAsyncASync() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
+ int ccount = q.getConsumerCount();
+
+ session.start();
+
+ CountDownLatch latch = new CountDownLatch(10);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ int messageReceived = 0;
+ boolean failed;
+
+ boolean started = true;
+
+ private final CountDownLatch latch;
+
+ private boolean stop = true;
+
+ public MyHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public MyHandler(CountDownLatch latch, boolean stop)
+ {
+ this(latch);
+ this.stop = stop;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+ messageReceived++;
+ latch.countDown();
+
+ if (stop && latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
+ started = false;
+ //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+ }
+
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler(latch);
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ latch = new CountDownLatch(90);
+ handler = new MyHandler(latch, false);
+ consumer.setMessageHandler(handler);
+ session.start();
+ assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+ assertNull(consumer.getLastException());
+ session.close();
+ }
+
+}
More information about the jboss-cvs-commits
mailing list