Author: borges
Date: 2012-01-12 07:10:02 -0500 (Thu, 12 Jan 2012)
New Revision: 12015
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
Log:
Make synchronization consistent.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2012-01-12
12:09:47 UTC (rev 12014)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2012-01-12
12:10:02 UTC (rev 12015)
@@ -267,7 +267,7 @@
{
// forced delivery messages are discarded, nothing has been
delivered by the queue
resetIfSlowConsumer();
-
+
if (isTrace)
{
log.trace("There was nothing on the queue, leaving it now::
returning null");
@@ -315,7 +315,7 @@
{
largeMessageReceived = m;
}
-
+
if (isTrace)
{
log.trace("Returning " + m);
@@ -367,7 +367,7 @@
return receive(0, true);
}
- public MessageHandler getMessageHandler() throws HornetQException
+ public synchronized MessageHandler getMessageHandler() throws HornetQException
{
checkClosed();
@@ -479,7 +479,7 @@
{
return session;
}
-
+
public SessionQueueQueryResponseMessage getQueueInfo()
{
return queueInfo;
@@ -621,7 +621,7 @@
try
{
ClientMessageInternal message = iter.next();
-
+
if (message.isLargeMessage())
{
ClientLargeMessageInternal largeMessage =
(ClientLargeMessageInternal)message;
@@ -637,7 +637,7 @@
}
clearBuffer();
-
+
try
{
if (currentLargeMessageController != null)
@@ -704,11 +704,11 @@
}
}
- /**
- *
+ /**
+ *
* LargeMessageBuffer will call flowcontrol here, while other handleMessage will also
be calling flowControl.
* So, this operation needs to be atomic.
- *
+ *
* @param discountSlowConsumer When dealing with slowConsumers, we need to discount
one credit that was pre-sent when the first receive was called. For largeMessage that is
only done at the latest packet
*/
public void flowControl(final int messageBytes, final boolean discountSlowConsumer)
throws HornetQException
@@ -716,7 +716,7 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
-
+
if (log.isTraceEnabled())
{
log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend +
", clientWindowSize = " + clientWindowSize + " messageBytes = " +
messageBytes);
@@ -774,7 +774,7 @@
// Private
//
---------------------------------------------------------------------------------------
- /**
+ /**
* Sending a initial credit for slow consumers
* */
private void startSlowConsumer()
@@ -808,7 +808,7 @@
{
ClientConsumerImpl.log.trace("Adding Runner on Executor for
delivery");
}
-
+
sessionExecutor.execute(runner);
}
@@ -891,7 +891,7 @@
//Ignore, this could be a relic from a previous receiveImmediate();
return;
}
-
+
boolean expired = message.isExpired();
flowControlBeforeConsumption(message);