Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 12:54:56 -0400 (Fri, 14 Oct 2011)
New Revision: 11541
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
JBPAPP-7389 - flow control on large messages
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14
16:54:26 UTC (rev 11540)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14
16:54:56 UTC (rev 11541)
@@ -572,7 +572,7 @@
largeMessageCache.deleteOnExit();
}
- currentLargeMessageController = new LargeMessageControllerImpl(this,
packet.getLargeMessageSize(), 60, largeMessageCache);
+ currentLargeMessageController = new LargeMessageControllerImpl(this,
packet.getLargeMessageSize(), 5, largeMessageCache);
if (currentChunkMessage.isCompressed())
{
@@ -594,7 +594,18 @@
{
return;
}
- currentLargeMessageController.addPacket(chunk);
+ if (currentLargeMessageController == null)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending back credits for largeController = null " +
chunk.getPacketSize());
+ }
+ flowControl(chunk.getPacketSize(), false);
+ }
+ else
+ {
+ currentLargeMessageController.addPacket(chunk);
+ }
}
public void clear(boolean waitForOnMessage) throws HornetQException
@@ -607,12 +618,39 @@
while (iter.hasNext())
{
- ClientMessageInternal message = iter.next();
+ try
+ {
+ ClientMessageInternal message = iter.next();
+
+ if (message.isLargeMessage())
+ {
+ ClientLargeMessageInternal largeMessage =
(ClientLargeMessageInternal)message;
+ largeMessage.getLargeMessageController().cancel();
+ }
- flowControlBeforeConsumption(message);
+ flowControlBeforeConsumption(message);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
clearBuffer();
+
+ try
+ {
+ if (currentLargeMessageController != null)
+ {
+ currentLargeMessageController.cancel();
+ currentLargeMessageController = null;
+ }
+ }
+ catch (Throwable e)
+ {
+ // nothing that could be done here
+ log.warn(e.getMessage(), e);
+ }
}
// Need to send credits for the messages in the buffer
@@ -678,6 +716,11 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend +
", clientWindowSize = " + clientWindowSize + " messageBytes = " +
messageBytes);
+ }
if (creditsToSend >= clientWindowSize)
{
@@ -685,7 +728,7 @@
{
if (ClientConsumerImpl.isTrace)
{
- ClientConsumerImpl.log.trace("Sending " + creditsToSend +
" -1, for slow consumer");
+ ClientConsumerImpl.log.trace("FlowControl::Sending " +
creditsToSend + " -1, for slow consumer");
}
// sending the credits - 1 initially send to fire the slow consumer, or
the slow consumer would be
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14
16:54:26 UTC (rev 11540)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14
16:54:56 UTC (rev 11541)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UTF8Util;
@@ -141,7 +142,7 @@
{
checkForPacket(totalSize - 1);
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
}
}
@@ -227,6 +228,24 @@
public synchronized void cancel()
{
+
+ int totalSize = 0;
+ Packet polledPacket = null;
+ while ((polledPacket = packets.poll()) != null)
+ {
+ totalSize += polledPacket.getPacketSize();
+ }
+
+ try
+ {
+ consumerInternal.flowControl(totalSize, false);
+ }
+ catch (Exception ignored)
+ {
+ // what else can we do here?
+ log.warn(ignored.getMessage(), ignored);
+ }
+
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
streamClosed = true;
@@ -279,6 +298,11 @@
public synchronized void saveBuffer(final OutputStream output) throws
HornetQException
{
+ if (streamClosed)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "The large message lost connection with its
session, either because of a rollback or a closed session");
+ }
setOutputStream(output);
waitCompletion(0);
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14
16:54:26 UTC (rev 11540)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14
16:54:56 UTC (rev 11541)
@@ -64,7 +64,14 @@
*/
public byte[] getBody()
{
- return body;
+ if (size <= 0)
+ {
+ return new byte[0];
+ }
+ else
+ {
+ return body;
+ }
}
/**
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14
16:54:26 UTC (rev 11540)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14
16:54:56 UTC (rev 11541)
@@ -77,6 +77,21 @@
super.encodeRest(buffer);
buffer.writeLong(consumerID);
}
+ @Override
+ public int getPacketSize()
+ {
+ if (size == -1)
+ {
+ // This packet was created by the LargeMessageController
+ return 0;
+ }
+ else
+ {
+ return size;
+ }
+ }
+
+
@Override
public void decodeRest(final HornetQBuffer buffer)
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14
16:54:26 UTC (rev 11540)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14
16:54:56 UTC (rev 11541)
@@ -61,7 +61,7 @@
// Constants
------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Static
---------------------------------------------------------------------------------------
@@ -85,7 +85,7 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
public String debug()
{
return toString() + "::Delivering " + this.deliveringRefs.size();
@@ -115,7 +115,7 @@
private final Binding binding;
private boolean transferring = false;
-
+
/* As well as consumer credit based flow control, we also tap into TCP flow control
(assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want
to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
@@ -163,11 +163,11 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-
+
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
-
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -185,7 +185,7 @@
{
return id;
}
-
+
public boolean isBrowseOnly()
{
return browseOnly;
@@ -195,12 +195,12 @@
{
return creationTime;
}
-
+
public String getConnectionID()
{
return this.session.getConnectionID().toString();
}
-
+
public String getSessionID()
{
return this.session.getName();
@@ -210,20 +210,23 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (log.isDebugEnabled() )
+ if (log.isDebugEnabled())
{
- log.debug(this + " is busy for the lack of credits!!!");
+ log.debug(this + " is busy for the lack of credits. Current credits =
" +
+ availableCredits +
+ " Can't receive reference " +
+ ref);
}
-
+
return HandleStatus.BUSY;
}
-
-// TODO -
https://jira.jboss.org/browse/HORNETQ-533
-// if (!writeReady.get())
-// {
-// return HandleStatus.BUSY;
-// }
-
+
+ // TODO -
https://jira.jboss.org/browse/HORNETQ-533
+ // if (!writeReady.get())
+ // {
+ // return HandleStatus.BUSY;
+ // }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -233,11 +236,18 @@
{
return HandleStatus.BUSY;
}
-
+
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the
lock
if (largeMessageDeliverer != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is busy delivering large message " +
+ largeMessageDeliverer +
+ ", can't deliver reference " +
+ ref);
+ }
return HandleStatus.BUSY;
}
@@ -268,7 +278,9 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged())
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&& !ref.getQueue().isInternalQueue() && !ref.isPaged())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&&
+ !ref.getQueue().isInternalQueue() &&
+ !ref.isPaged())
{
storageManager.updateDeliveryCount(ref);
}
@@ -309,7 +321,7 @@
public void close(final boolean failed) throws Exception
{
callback.removeReadyListener(this);
-
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -355,8 +367,8 @@
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME,
binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null
? null
- :
filter.getFilterString());
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+ filter == null ? null :
filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -406,10 +418,28 @@
}
}
- public LinkedList<MessageReference> cancelRefs(final boolean failed, final
boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean failed,
+ final boolean lastConsumedAsDelivered,
+ final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
+ try
+ {
+ if (largeMessageDeliverer != null)
+ {
+ largeMessageDeliverer.finish();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on resetting large message deliver - " +
largeMessageDeliverer, e);
+ }
+ finally
+ {
+ largeMessageDeliverer = null;
+ }
+
LinkedList<MessageReference> refs = new
LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
@@ -430,8 +460,9 @@
{
if (!failed)
{
- //We don't decrement delivery count if the client failed, since
there's a possibility that refs were actually delivered but we just didn't get any
acks for them
- //before failure
+ // We don't decrement delivery count if the client failed, since
there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for
them
+ // before failure
ref.decrementDeliveryCount();
}
@@ -492,18 +523,23 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + ":: FlowControl::Received disable flow control
message");
+ }
// No flow control
availableCredits = null;
-
- //There may be messages already in the queue
+
+ // There may be messages already in the queue
promptDelivery();
}
else if (credits == 0)
{
- //reset, used on slow consumers
+ // reset, used on slow consumers
+ log.debug(this + ":: FlowControl::Received reset flow control
message");
availableCredits.set(0);
}
else
@@ -512,16 +548,17 @@
if (log.isDebugEnabled())
{
- log.debug(this + "::Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ log.debug(this + "::FlowControl::Received " +
+ credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
{
- if (log.isTraceEnabled() )
+ if (log.isTraceEnabled())
{
log.trace(this + "::calling promptDelivery from receiving
credits");
}
@@ -541,7 +578,7 @@
{
return;
}
-
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including
the one explicitly
// acknowledged
@@ -573,21 +610,21 @@
}
while (ref.getMessage().getMessageID() != messageID);
}
-
+
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx,
final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
MessageReference ref = removeReferenceByID(messageID);
-
+
if (ref == null)
{
throw new IllegalStateException("Cannot find ref to ack " +
messageID);
}
-
+
if (autoCommitAcks)
{
ref.getQueue().acknowledge(ref);
@@ -627,13 +664,13 @@
return ref;
}
-
+
public void readyForWriting(final boolean ready)
{
if (ready)
{
writeReady.set(true);
-
+
promptDelivery();
}
else
@@ -701,10 +738,17 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery standard taking " +
+ packetSize +
+ " from credits, available now is " +
+ availableCredits);
+ }
}
}
-
// Inner classes
// ------------------------------------------------------------------------
@@ -766,6 +810,12 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery largeMessage
interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+
return false;
}
@@ -774,7 +824,7 @@
context = largeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
-
+
context.open();
sentInitialPacket = true;
@@ -787,6 +837,15 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
// Execute the rest of the large message on a different thread so as not
to tie up the delivery thread
@@ -802,7 +861,8 @@
{
if (ServerConsumerImpl.isTrace)
{
- log.trace("deliverLargeMessage: Leaving loop of send
LargeMessage because of credits");
+ log.trace(this + "::FlowControl::deliverLargeMessage Leaving
loop of send LargeMessage because of credits, available=" +
+ availableCredits);
}
return false;
@@ -825,16 +885,17 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.isTrace)
- {
- log.trace("deliverLargeMessage: Sending " + packetSize +
- " availableCredits now is " +
- availableCredits);
- }
-
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::largeMessage deliver
continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
positionPendingLargeMessage += chunkLen;
@@ -898,7 +959,7 @@
}
private final LinkedListIterator<MessageReference> iterator;
-
+
public synchronized void close()
{
iterator.close();