[jboss-cvs] JBoss Messaging SVN: r6102 - trunk/tests/src/org/jboss/messaging/tests/integration/client.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Mar 17 16:33:44 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-17 16:33:44 -0400 (Tue, 17 Mar 2009)
New Revision: 6102
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
Log:
Adding test as discussed here:
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-17 14:44:18 UTC (rev 6101)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-17 20:33:44 UTC (rev 6102)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.tests.integration.client;
+import java.util.concurrent.CountDownLatch;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -41,6 +43,7 @@
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*/
public class ClientConsumerTest extends UnitTestCase
{
@@ -430,6 +433,90 @@
session.close();
}
+ public void testStopConsumer() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createMessage(session, "m" + i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+
+ session.start();
+
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ boolean failed;
+ boolean started = true;
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+
+ latch.countDown();
+
+ if (latch.getCount() == 0)
+ {
+ session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
+ started = false;
+ consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+ }
+
+ message.acknowledge();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+
+ for (int i = 0; i < 90; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ session.close();
+ }
+
+
public void testConsumerAckImmediateAutoCommitTrue() throws Exception
{
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
More information about the jboss-cvs-commits
mailing list