[jboss-cvs] JBoss Messaging SVN: r6102 - trunk/tests/src/org/jboss/messaging/tests/integration/client.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Mar 17 16:33:44 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-17 16:33:44 -0400 (Tue, 17 Mar 2009)
New Revision: 6102

Modified:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
Log:
Adding test as discussed here:

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java	2009-03-17 14:44:18 UTC (rev 6101)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java	2009-03-17 20:33:44 UTC (rev 6102)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.tests.integration.client;
 
+import java.util.concurrent.CountDownLatch;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -41,6 +43,7 @@
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
 public class ClientConsumerTest extends UnitTestCase
 {
@@ -430,6 +433,90 @@
       session.close();
    }
 
+   public void testStopConsumer() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createMessage(session, "m" + i);
+         producer.send(message);
+      }
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+
+      session.start();
+      
+      final CountDownLatch latch = new CountDownLatch(10);
+
+      // Message should be in consumer
+
+      class MyHandler implements MessageHandler
+      {
+         boolean failed;
+         boolean started = true;
+         
+         public void onMessage(final ClientMessage message)
+         {
+            
+            try
+            {
+               if (!started)
+               {
+                  failed = true;
+               }
+               
+               latch.countDown();
+               
+               if (latch.getCount() == 0)
+               {
+                  session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
+                  started = false;
+                  consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+               }
+
+               message.acknowledge();
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      }
+      
+      MyHandler handler = new MyHandler();
+
+      consumer.setMessageHandler(handler);
+
+      latch.await();
+      
+      Thread.sleep(100);
+
+      assertFalse(handler.failed);
+
+      // Make sure no exceptions were thrown from onMessage
+      assertNull(consumer.getLastException());
+      
+      for (int i = 0; i < 90; i++)
+      {
+         ClientMessage msg = consumer.receive(1000);
+         assertNotNull(msg);
+         msg.acknowledge();
+      }
+      
+      assertNull(consumer.receiveImmediate());
+
+      session.close();
+   }
+
+
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception
    {
       ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));




More information about the jboss-cvs-commits mailing list