Author: timfox
Date: 2010-09-27 08:27:02 -0400 (Mon, 27 Sep 2010)
New Revision: 9724
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-450
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-09-27
09:37:14 UTC (rev 9723)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-09-27
12:27:02 UTC (rev 9724)
@@ -16,7 +16,6 @@
import java.io.File;
import java.util.Iterator;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
@@ -728,6 +727,7 @@
{
ClientConsumerImpl.log.trace("Adding Runner on Executor for
delivery");
}
+
sessionExecutor.execute(runner);
}
@@ -805,6 +805,12 @@
if (message != null)
{
+ if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+ {
+ //Ignore, this could be a relic from a previous receiveImmediate();
+ return;
+ }
+
boolean expired = message.isExpired();
flowControlBeforeConsumption(message);
@@ -932,7 +938,6 @@
{
public void run()
{
-
try
{
callOnMessage();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-09-27
09:37:14 UTC (rev 9723)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-09-27
12:27:02 UTC (rev 9724)
@@ -12,6 +12,8 @@
*/
package org.hornetq.tests.integration.client;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
@@ -22,6 +24,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
@@ -153,7 +156,52 @@
sf.close();
}
+
+ //
https://jira.jboss.org/browse/HORNETQ-450
+ public void testReceivedImmediateFollowedByAsyncConsume() throws Exception
+ {
+ sf = HornetQClient.createClientSessionFactory(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonDurableSend(true);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage message = session.createMessage(false);
+
+ producer.send(message);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+ session.start();
+
+ ClientMessage received = consumer.receiveImmediate();
+
+ assertNotNull(received);
+
+ received.acknowledge();
+
+ final AtomicBoolean receivedAsync = new AtomicBoolean(false);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ receivedAsync.set(true);
+ }
+ });
+
+ Thread.sleep(1000);
+
+ assertFalse(receivedAsync.get());
+
+ session.close();
+
+ sf.close();
+ }
+
private void doConsumerReceiveImmediateWithNoMessages(final boolean browser) throws
Exception
{
sf = HornetQClient.createClientSessionFactory(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));