Author: timfox
Date: 2010-09-27 05:37:14 -0400 (Mon, 27 Sep 2010)
New Revision: 9723
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-450
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-09-24
12:16:25 UTC (rev 9722)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-09-27
09:37:14 UTC (rev 9723)
@@ -113,7 +113,7 @@
private boolean stopped = false;
- private final AtomicLong forceDeliveryCount = new AtomicLong(0);
+ private long forceDeliveryCount;
private final SessionQueueQueryResponseMessage queueInfo;
@@ -226,7 +226,7 @@
// we only force delivery once per call to receive
if (!deliveryForced)
{
- session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
+ session.forceDelivery(id, forceDeliveryCount++);
deliveryForced = true;
}
@@ -260,18 +260,17 @@
if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
{
long seq =
m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
- if (seq >= forceDeliveryCount.longValue())
+
+ if (forcingDelivery && seq == forceDeliveryCount - 1)
{
// forced delivery messages are discarded, nothing has been
delivered by the queue
- if (forcingDelivery)
- {
- resetIfSlowConsumer();
- return null;
- }
+ resetIfSlowConsumer();
+
+ return null;
}
else
{
- // ignore any previous forced delivery message
+ // Ignore the message
continue;
}
}
@@ -425,7 +424,7 @@
lastAckedMessage = null;
creditsToSend = 0;
-
+
ackIndividually = false;
}
@@ -468,7 +467,7 @@
{
return browseOnly;
}
-
+
public synchronized void handleMessage(final ClientMessageInternal message) throws
Exception
{
if (closing)
@@ -571,7 +570,7 @@
while (iter.hasNext())
{
ClientMessageInternal message = iter.next();
-
+
flowControlBeforeConsumption(message);
}
@@ -603,7 +602,7 @@
{
flushAcks();
}
-
+
session.individualAcknowledge(id, message.getMessageID());
}
else
@@ -708,7 +707,7 @@
private void resetIfSlowConsumer()
{
- if(clientWindowSize == 0)
+ if (clientWindowSize == 0)
{
slowConsumerInitialCreditSent = false;
sendCredits(0);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-09-24 12:16:25
UTC (rev 9722)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-09-27 09:37:14
UTC (rev 9723)
@@ -73,7 +73,7 @@
}
/*
- * Construct a MessageImpl from storage, or notification
+ * Construct a MessageImpl from storage, or notification, or before routing
*/
public ServerMessageImpl(final long messageID, final int initialMessageBufferSize)
{
@@ -82,11 +82,6 @@
this.messageID = messageID;
}
- protected ServerMessageImpl(final int initialMessageBufferSize)
- {
- super(initialMessageBufferSize);
- }
-
/*
* Copy constructor
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-09-24
12:16:25 UTC (rev 9722)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-09-27
09:37:14 UTC (rev 9723)
@@ -16,7 +16,12 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
@@ -113,7 +118,42 @@
sf.close();
}
+
+ //
https://jira.jboss.org/browse/HORNETQ-450
+ public void testReceivedImmediateFollowedByReceive() throws Exception
+ {
+ sf = HornetQClient.createClientSessionFactory(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonDurableSend(true);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage message = session.createMessage(false);
+
+ producer.send(message);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+ session.start();
+
+ ClientMessage received = consumer.receiveImmediate();
+
+ assertNotNull(received);
+
+ received.acknowledge();
+
+ received = consumer.receive(1);
+
+ assertNull(received);
+
+ session.close();
+
+ sf.close();
+ }
+
private void doConsumerReceiveImmediateWithNoMessages(final boolean browser) throws
Exception
{
sf = HornetQClient.createClientSessionFactory(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));