[jboss-cvs] JBoss Messaging SVN: r6591 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 27 23:04:35 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-27 23:04:34 -0400 (Mon, 27 Apr 2009)
New Revision: 6591
Added:
trunk/tests/src/org/jboss/messaging/tests/unit/ra/
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1573 - Making standard messages to also use the packet-size
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-28 03:02:03 UTC (rev 6590)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-28 03:04:34 UTC (rev 6591)
@@ -377,6 +377,11 @@
ClientMessageInternal messageToHandle = message;
messageToHandle.onReceipt(this);
+
+ if (trace)
+ {
+ log.trace("Adding message " + message + " into buffer");
+ }
// Add it to the buffer
buffer.addLast(messageToHandle, messageToHandle.getPriority());
@@ -501,12 +506,20 @@
{
if (clientWindowSize == 0)
{
+ if (trace)
+ {
+ log.trace("Sending full credits - 1 for slow consumer");
+ }
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
// always buffering one after received the first message
sendCredits(creditsToSend - 1);
}
else
{
+ if (trace)
+ {
+ log.trace("Sending full credits");
+ }
sendCredits(creditsToSend);
}
creditsToSend = 0;
@@ -537,6 +550,10 @@
private void queueExecutor()
{
+ if (trace)
+ {
+ log.trace("Adding Runner on Executor for delivery");
+ }
sessionExecutor.execute(runner);
}
@@ -545,6 +562,10 @@
*/
private void sendCredits(final int credits)
{
+ if (trace)
+ {
+ log.trace("Sending " + credits + " back");
+ }
channel.send(new SessionConsumerFlowCreditMessage(id, credits));
}
@@ -619,8 +640,17 @@
{
onMessageThread = Thread.currentThread();
+ if (trace)
+ {
+ log.trace("Calling handler.onMessage");
+ }
theHandler.onMessage(message);
+ if (trace)
+ {
+ log.trace("Handler.onMessage done");
+ }
+
if (message.isLargeMessage())
{
message.discardLargeBody();
@@ -634,6 +664,10 @@
// If slow consumer, we need to send 1 credit to make sure we get another message
if (clientWindowSize == 0)
{
+ if (trace)
+ {
+ log.trace("Sending 1 credit back after consuming message on slow consumer (no-buffering)");
+ }
sendCredits(1);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-04-28 03:02:03 UTC (rev 6590)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-04-28 03:04:34 UTC (rev 6591)
@@ -636,7 +636,7 @@
{
ClientMessageInternal clMessage = message.getClientMessage();
- clMessage.setFlowControlSize(clMessage.getEncodeSize());
+ clMessage.setFlowControlSize(message.getPacketSize());
consumer.handleMessage(message.getClientMessage());
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-04-28 03:02:03 UTC (rev 6590)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-04-28 03:04:34 UTC (rev 6591)
@@ -77,8 +77,7 @@
// Static ---------------------------------------------------------------------------------------
- // private static final boolean trace = log.isTraceEnabled();
- private static final boolean trace = false;
+ private static final boolean trace = log.isTraceEnabled();
private static void trace(final String message)
{
@@ -668,13 +667,17 @@
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
{
+ final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
+
if (availableCredits != null)
{
- availableCredits.addAndGet(-message.getEncodeSize());
+ availableCredits.addAndGet(-packet.getRequiredBufferSize());
+ if (trace)
+ {
+ log.trace("Taking " + packet.getRequiredBufferSize() + " out of flow control");
+ }
}
- final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
-
if (replicatingChannel == null)
{
// Not replicated - just send now
More information about the jboss-cvs-commits
mailing list