Author: jmesnil
Date: 2009-10-30 11:12:21 -0400 (Fri, 30 Oct 2009)
New Revision: 8170
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Log:
fixed ClientConsumerImpl.receiveImmediate()
* do not force delivery if the session is stopped
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-10-30
10:56:29 UTC (rev 8169)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-10-30
15:12:21 UTC (rev 8170)
@@ -208,7 +208,11 @@
}
if (m == null && forcingDelivery)
- {
+ {
+ if (stopped)
+ {
+ break;
+ }
// we only force delivery once per call to receive
if (!deliveryForced)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2009-10-30
10:56:29 UTC (rev 8169)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2009-10-30
15:12:21 UTC (rev 8170)
@@ -83,7 +83,40 @@
{
doConsumerReceiveImmediate(true);
}
+
+ public void testConsumerReceiveImmediateWithSessionStop() throws Exception
+ {
+ sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ sf.setAckBatchSize(0);
+
+ ClientSession session = sf.createSession(false, true, true);
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+ session.start();
+
+ session.stop();
+ assertNull(consumer.receiveImmediate());
+
+ session.start();
+ long start = System.currentTimeMillis();
+ ClientMessage msg = consumer.receive(2000);
+ long end = System.currentTimeMillis();
+ assertNull(msg);
+ // we waited for at least 2000ms
+ assertTrue("waited only " + (end - start), end - start >= 2000);
+
+ consumer.close();
+
+ session.close();
+
+ sf.close();
+
+ }
+
private void doConsumerReceiveImmediateWithNoMessages(boolean browser) throws
Exception
{
sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
Show replies by date