[hornetq-commits] JBoss hornetq SVN: r8170 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 30 11:12:21 EDT 2009


Author: jmesnil
Date: 2009-10-30 11:12:21 -0400 (Fri, 30 Oct 2009)
New Revision: 8170

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Log:
fixed ClientConsumerImpl.receiveImmediate()

* do not force delivery if the session is stopped

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-10-30 10:56:29 UTC (rev 8169)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-10-30 15:12:21 UTC (rev 8170)
@@ -208,7 +208,11 @@
                   }
 
                   if (m == null && forcingDelivery)
-                  {                     
+                  {
+                     if (stopped)
+                     {
+                        break;
+                     }
                      // we only force delivery once per call to receive
                      if (!deliveryForced)
                      {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2009-10-30 10:56:29 UTC (rev 8169)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2009-10-30 15:12:21 UTC (rev 8170)
@@ -83,7 +83,40 @@
    {
       doConsumerReceiveImmediate(true);
    }
+   
+   public void testConsumerReceiveImmediateWithSessionStop() throws Exception
+   {
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnAcknowledge(true);
+      sf.setAckBatchSize(0);
+      
+      ClientSession session = sf.createSession(false, true, true);
 
+      session.createQueue(ADDRESS, QUEUE, null, false);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+      session.start();
+            
+      session.stop();
+      assertNull(consumer.receiveImmediate());
+      
+      session.start();
+      long start = System.currentTimeMillis();
+      ClientMessage msg = consumer.receive(2000);
+      long end = System.currentTimeMillis();
+      assertNull(msg);
+      // we waited for at least 2000ms
+      assertTrue("waited only " + (end - start), end - start >= 2000);
+
+      consumer.close();
+
+      session.close();
+
+      sf.close();
+
+   }
+
    private void doConsumerReceiveImmediateWithNoMessages(boolean browser) throws Exception
    {
       sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));



More information about the hornetq-commits mailing list