[jboss-cvs] JBoss Messaging SVN: r6168 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 26 05:02:26 EDT 2009
Author: ataylor
Date: 2009-03-26 05:02:25 -0400 (Thu, 26 Mar 2009)
New Revision: 6168
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageHandlerTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
Log:
tests and fixes
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-26 02:47:40 UTC (rev 6167)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-26 09:02:25 UTC (rev 6168)
@@ -156,7 +156,7 @@
synchronized (this)
{
while ((stopped || (m = buffer.poll()) == null) &&
- !closed && toWait > 0 )
+ !closed && toWait > 0)
{
if (start == -1)
{
@@ -247,22 +247,21 @@
"Cannot set MessageHandler - consumer is in receive(...)");
}
- // If no handler before then need to queue them up
- boolean queueUp = handler == null;
+ boolean noPreviousHandler = handler == null;
handler = theHandler;
- if (queueUp)
+ //if no previous handler existed queue up messages for delivery
+ if (handler != null && noPreviousHandler)
{
- stopped = false;
for (int i = 0; i < buffer.size(); i++)
{
queueExecutor();
}
}
- else
+ //if unsetting a previous handler may be in onMessage so wait for completion
+ else if (handler == null && !noPreviousHandler)
{
- stopped = true;
waitForOnMessageToComplete();
}
}
@@ -299,7 +298,7 @@
public synchronized void stop() throws MessagingException
{
- if(stopped)
+ if (stopped)
{
return;
}
@@ -336,7 +335,7 @@
// This is ok - we just ignore the message
return;
}
-
+
ClientMessageInternal messageToHandle = message;
if (isFileConsumer())
@@ -350,11 +349,11 @@
{
// Execute using executor
- buffer.add(messageToHandle);
- if(!stopped)
- {
- queueExecutor();
- }
+ buffer.add(messageToHandle);
+ if (!stopped)
+ {
+ queueExecutor();
+ }
}
else
{
@@ -393,7 +392,7 @@
if (isFileConsumer())
{
- ClientFileMessageInternal fileMessage = (ClientFileMessageInternal)currentChunkMessage;
+ ClientFileMessageInternal fileMessage = (ClientFileMessageInternal) currentChunkMessage;
addBytesBody(fileMessage, chunk.getBody());
}
else
@@ -413,7 +412,7 @@
// Close the file that was being generated
if (isFileConsumer())
{
- ((ClientFileMessageInternal)currentChunkMessage).closeChannel();
+ ((ClientFileMessageInternal) currentChunkMessage).closeChannel();
}
currentChunkMessage.setFlowControlSize(chunk.getPacketSize());
@@ -432,7 +431,7 @@
{
buffer.clear();
}
-
+
waitForOnMessageToComplete();
}
@@ -574,7 +573,7 @@
MessageHandler theHandler = handler;
if (theHandler != null)
- {
+ {
synchronized (this)
{
message = buffer.poll();
@@ -665,7 +664,7 @@
{
if (message instanceof ClientFileMessage)
{
- ((ClientFileMessage)message).getFile().delete();
+ ((ClientFileMessage) message).getFile().delete();
}
}
}
@@ -692,7 +691,7 @@
{
int propertiesSize = message.getPropertiesEncodeSize();
- MessagingBuffer bufferProperties = session.createBuffer(propertiesSize);
+ MessagingBuffer bufferProperties = session.createBuffer(propertiesSize);
// FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
// MessagingBuffer.
@@ -729,7 +728,7 @@
private ClientMessageInternal createFileMessage(final byte[] header) throws Exception
{
- MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
+ MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
if (isFileConsumer())
{
@@ -771,7 +770,7 @@
{
public void run()
{
-
+
try
{
callOnMessage();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-26 02:47:40 UTC (rev 6167)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-26 09:02:25 UTC (rev 6168)
@@ -26,16 +26,12 @@
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -69,165 +65,7 @@
}
- public void testSetMessageHandlerWithMessagesPending() throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
-
- session.start();
-
- Thread.sleep(100);
-
- // Message should be in consumer
-
- class MyHandler implements MessageHandler
- {
- public void onMessage(final ClientMessage message)
- {
- try
- {
- Thread.sleep(10);
-
- message.acknowledge();
- }
- catch (Exception e)
- {
- }
- }
- }
-
- consumer.setMessageHandler(new MyHandler());
-
- // Let a few messages get processed
- Thread.sleep(100);
-
- // Now set null
-
- consumer.setMessageHandler(null);
-
- // Give a bit of time for some queued executors to run
-
- Thread.sleep(500);
-
- // Make sure no exceptions were thrown from onMessage
- assertNull(consumer.getLastException());
-
- session.close();
- }
-
-
-
- public void testSetUnsetMessageHandler() throws Exception
- {
- ClientSessionFactory sf = createInVMFactory();
-
- final ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(QUEUE, QUEUE, null, false, false);
-
- ClientProducer producer = session.createProducer(QUEUE);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("i"), i);
- producer.send(message);
- }
-
- final ClientConsumer consumer = session.createConsumer(QUEUE);
-
- Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
- int ccount = q.getConsumerCount();
-
- session.start();
-
- CountDownLatch latch = new CountDownLatch(50);
-
- // Message should be in consumer
-
- class MyHandler implements MessageHandler
- {
- int messageReceived = 0;
- boolean failed;
-
- boolean started = true;
-
- private final CountDownLatch latch;
-
- public MyHandler(CountDownLatch latch)
- {
- this.latch = latch;
- }
-
- public void onMessage(final ClientMessage message)
- {
-
- try
- {
- if (!started)
- {
- failed = true;
- }
- messageReceived++;
- latch.countDown();
-
- if (latch.getCount() == 0)
- {
-
- message.acknowledge();
- started = false;
- consumer.setMessageHandler(null);
- }
-
- }
- catch (Exception e)
- {
- }
- }
- }
-
- MyHandler handler = new MyHandler(latch);
-
- consumer.setMessageHandler(handler);
-
- latch.await();
-
- Thread.sleep(100);
-
- assertFalse(handler.failed);
-
- // Make sure no exceptions were thrown from onMessage
- assertNull(consumer.getLastException());
- latch = new CountDownLatch(50);
- handler = new MyHandler(latch);
- consumer.setMessageHandler(handler);
- session.start();
- assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
-
- Thread.sleep(100);
-
- assertFalse(handler.failed);
- assertNull(consumer.getLastException());
- session.close();
- }
-
+
public void testConsumerAckImmediateAutoCommitTrue() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageHandlerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientMessageHandlerTest.java 2009-03-26 09:02:25 UTC (rev 6168)
@@ -0,0 +1,412 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientMessageHandlerTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
+
+ private MessagingService messagingService;
+
+ private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ messagingService = createService(false);
+
+ messagingService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ messagingService.stop();
+
+ messagingService = null;
+
+ super.tearDown();
+ }
+
+ public void testSetMessageHandlerWithMessagesPending() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+
+ session.start();
+
+ Thread.sleep(100);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ public void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ Thread.sleep(10);
+
+ message.acknowledge();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ consumer.setMessageHandler(new MyHandler());
+
+ // Let a few messages get processed
+ Thread.sleep(100);
+
+ // Now set null
+
+ consumer.setMessageHandler(null);
+
+ // Give a bit of time for some queued executors to run
+
+ Thread.sleep(500);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+
+ session.close();
+ }
+
+
+ public void testSetResetMessageHandler() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+
+ session.start();
+
+ CountDownLatch latch = new CountDownLatch(50);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ int messageReceived = 0;
+
+ boolean failed;
+
+ boolean started = true;
+
+ private final CountDownLatch latch;
+
+ public MyHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+ messageReceived++;
+ latch.countDown();
+
+ if (latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ started = false;
+ consumer.setMessageHandler(null);
+ }
+
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler(latch);
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ latch = new CountDownLatch(50);
+ handler = new MyHandler(latch);
+ consumer.setMessageHandler(handler);
+ session.start();
+ assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+ assertNull(consumer.getLastException());
+ session.close();
+ }
+
+ public void testSetUnsetMessageHandler() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+
+ session.start();
+
+ CountDownLatch latch = new CountDownLatch(50);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ int messageReceived = 0;
+
+ boolean failed;
+
+ boolean started = true;
+
+ private final CountDownLatch latch;
+
+ public MyHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+ messageReceived++;
+ latch.countDown();
+
+ if (latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ started = false;
+ consumer.setMessageHandler(null);
+ }
+
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler(latch);
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ consumer.setMessageHandler(null);
+ ClientMessage cm = consumer.receiveImmediate();
+ assertNotNull(cm);
+
+ session.close();
+ }
+
+ public void testSetUnsetResetMessageHandler() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+
+ session.start();
+
+ CountDownLatch latch = new CountDownLatch(50);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ int messageReceived = 0;
+
+ boolean failed;
+
+ boolean started = true;
+
+ private final CountDownLatch latch;
+
+ public MyHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+ messageReceived++;
+ latch.countDown();
+
+ if (latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ started = false;
+ consumer.setMessageHandler(null);
+ }
+
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler(latch);
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ consumer.setMessageHandler(null);
+ ClientMessage cm = consumer.receiveImmediate();
+ assertNotNull(cm);
+ latch = new CountDownLatch(49);
+ handler = new MyHandler(latch);
+ consumer.setMessageHandler(handler);
+ session.start();
+ assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+ assertNull(consumer.getLastException());
+ session.close();
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java 2009-03-26 02:47:40 UTC (rev 6167)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java 2009-03-26 09:02:25 UTC (rev 6168)
@@ -29,7 +29,6 @@
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
@@ -43,31 +42,31 @@
{
private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
- private MessagingService messagingService;
+ private MessagingService messagingService;
- private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+ private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
- messagingService = createService(false);
+ messagingService = createService(false);
- messagingService.start();
- }
+ messagingService.start();
+ }
- @Override
- protected void tearDown() throws Exception
- {
- messagingService.stop();
+ @Override
+ protected void tearDown() throws Exception
+ {
+ messagingService.stop();
- messagingService = null;
+ messagingService = null;
- super.tearDown();
- }
+ super.tearDown();
+ }
- public void testStopStartConsumerSyncReceiveImmediate() throws Exception
+ public void testStopStartConsumerSyncReceiveImmediate() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -91,7 +90,7 @@
session.start();
- for(int i = 0; i < numMessages/2; i++)
+ for (int i = 0; i < numMessages / 2; i++)
{
ClientMessage cm = consumer.receive(5000);
assertNotNull(cm);
@@ -101,6 +100,14 @@
ClientMessage cm = consumer.receiveImmediate();
assertNull(cm);
+ session.start();
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+
session.close();
}
@@ -128,7 +135,7 @@
session.start();
- for(int i = 0; i < numMessages/2; i++)
+ for (int i = 0; i < numMessages / 2; i++)
{
ClientMessage cm = consumer.receive(5000);
assertNotNull(cm);
@@ -141,10 +148,18 @@
assertTrue(taken >= 1000);
assertNull(cm);
+ session.start();
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+
session.close();
}
- public void testStopStartConsumerAsyncSync() throws Exception
+ public void testStopStartConsumerAsyncSyncStoppedByHandler() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -165,11 +180,97 @@
final ClientConsumer consumer = session.createConsumer(QUEUE);
- Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
- int ccount = q.getConsumerCount();
+ session.start();
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ boolean failed;
+
+ boolean started = true;
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+
+ latch.countDown();
+
+ if (latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ session.stop();
+ started = false;
+ }
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ consumer.setMessageHandler(null);
session.start();
+ for (int i = 0; i < 90; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ if (msg == null)
+ {
+ System.out.println("ClientConsumerTest.testStopConsumer");
+ }
+ assertNotNull("message " + i, msg);
+ msg.acknowledge();
+ }
+ assertNull(consumer.receiveImmediate());
+
+ session.close();
+ }
+
+ public void testStopStartConsumerAsyncSync() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
final CountDownLatch latch = new CountDownLatch(10);
// Message should be in consumer
@@ -196,9 +297,8 @@
{
message.acknowledge();
- session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
started = false;
- //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+ consumer.setMessageHandler(null);
}
}
@@ -214,7 +314,7 @@
latch.await();
- Thread.sleep(100);
+ session.stop();
assertFalse(handler.failed);
@@ -225,8 +325,7 @@
for (int i = 0; i < 90; i++)
{
ClientMessage msg = consumer.receive(1000);
- ccount = q.getConsumerCount();
- if(msg == null)
+ if (msg == null)
{
System.out.println("ClientConsumerTest.testStopConsumer");
}
@@ -239,7 +338,7 @@
session.close();
}
- public void testStopStartConsumerAsyncASync() throws Exception
+ public void testStopStartConsumerAsyncASyncStoppeeByHandler() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -260,11 +359,110 @@
final ClientConsumer consumer = session.createConsumer(QUEUE);
- Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
- int ccount = q.getConsumerCount();
+ session.start();
+ CountDownLatch latch = new CountDownLatch(10);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ int messageReceived = 0;
+
+ boolean failed;
+
+ boolean started = true;
+
+ private final CountDownLatch latch;
+
+ private boolean stop = true;
+
+ public MyHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public MyHandler(CountDownLatch latch, boolean stop)
+ {
+ this(latch);
+ this.stop = stop;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+ messageReceived++;
+ latch.countDown();
+
+ if (stop && latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ session.stop();
+ started = false;
+ }
+
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler(latch);
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ latch = new CountDownLatch(90);
+ handler = new MyHandler(latch, false);
+ consumer.setMessageHandler(handler);
session.start();
+ assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+ assertNull(consumer.getLastException());
+ session.close();
+ }
+
+ public void testStopStartConsumerAsyncASync() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
CountDownLatch latch = new CountDownLatch(10);
// Message should be in consumer
@@ -272,6 +470,7 @@
class MyHandler implements MessageHandler
{
int messageReceived = 0;
+
boolean failed;
boolean started = true;
@@ -307,9 +506,8 @@
{
message.acknowledge();
- session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
+ consumer.setMessageHandler(null);
started = false;
- //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
}
}
@@ -344,4 +542,149 @@
session.close();
}
+ public void testStopStartMultipleConsumers() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setConsumerWindowSize(10);
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ ClientConsumer consumer2 = session.createConsumer(QUEUE);
+ ClientConsumer consumer3 = session.createConsumer(QUEUE);
+
+ session.start();
+
+ ClientMessage cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ cm = consumer2.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ cm = consumer3.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+
+ session.stop();
+ cm = consumer.receiveImmediate();
+ assertNull(cm);
+ cm = consumer2.receiveImmediate();
+ assertNull(cm);
+ cm = consumer3.receiveImmediate();
+ assertNull(cm);
+
+ session.start();
+ cm = consumer.receiveImmediate();
+ assertNotNull(cm);
+ cm = consumer2.receiveImmediate();
+ assertNotNull(cm);
+ cm = consumer3.receiveImmediate();
+ assertNotNull(cm);
+ session.close();
+ }
+
+
+ public void testStopStartAlreadyStartedSession() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ ClientMessage cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+
+ session.start();
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ ClientMessage cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+
+ session.close();
+ }
+
+ public void testStopAlreadyStoppedSession() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ ClientMessage cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+ session.stop();
+ ClientMessage cm = consumer.receiveImmediate();
+ assertNull(cm);
+
+ session.stop();
+ cm = consumer.receiveImmediate();
+ assertNull(cm);
+
+ session.start();
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+
+ session.close();
+ }
+
}
More information about the jboss-cvs-commits
mailing list