[jboss-cvs] JBoss Messaging SVN: r2346 - in trunk/src/main/org/jboss/jms: server/endpoint and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 19 07:30:36 EST 2007
Author: timfox
Date: 2007-02-19 07:30:36 -0500 (Mon, 19 Feb 2007)
New Revision: 2346
Modified:
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
Log:
Tweaks to flow control
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-19 11:59:00 UTC (rev 2345)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-19 12:30:36 UTC (rev 2346)
@@ -198,8 +198,9 @@
private boolean listenerRunning;
private int maxDeliveries;
private long channelID;
- private boolean startSendingMessageSent;
private long lastDeliveryId = -1;
+ //private volatile boolean sentFull;
+ //private volatile boolean sentEmpty;
// Constructors ---------------------------------------------------------------------------------
@@ -226,7 +227,6 @@
mainLock = new Object();
this.sessionExecutor = sessionExecutor;
this.maxDeliveries = maxDeliveries;
- this.startSendingMessageSent = true;
}
// Public ---------------------------------------------------------------------------------------
@@ -260,18 +260,9 @@
if (trace) { log.trace(this + " added message(s) to the buffer"); }
- messageAdded();
+ messageAdded();
- if (buffer.size() >= maxBufferSize)
- {
- if (trace) { log.trace(this + " is full"); }
-
- //We are full. Send message to server to tell it to stop sending
-
- startSendingMessageSent = false;
-
- sendChangeRateMessage(0);
- }
+ checkBufferSize();
}
}
@@ -480,22 +471,60 @@
}
//This needs to be outside the lock
- if (!startSendingMessageSent && buffer.size() <= minBufferSize)
- {
- //Tell the server we need more messages - but we don't want to keep sending the message
- //if we've already sent it - hence the check
- startSendingMessageSent = true;
-
- if (trace) { log.trace("telling server to start resume sending messages, buffer size is " + buffer.size()); }
-
- sendChangeRateMessage(1);
- }
+ checkBufferSize();
if (trace) { log.trace(this + " receive() returning " + m); }
return m;
- }
+ }
+ //We can optimise so it just uses one int to store both flags
+
+ private volatile boolean sentStop;
+
+ private volatile boolean sentStart = true;
+
+ private void checkBufferSize()
+ {
+ int size = buffer.size();
+
+ if (!sentStart && size <= minBufferSize)
+ {
+ //We need more messages - we need to tell the server this if we haven't done so already
+
+ sendChangeRateMessage(1.0f);
+
+ sentStart = true;
+
+ sentStop = false;
+ }
+ else if (!sentStop && size >= maxBufferSize)
+ {
+ //Our buffer is full - we need to tell the server to stop sending if we haven't
+ //done so already
+
+ sendChangeRateMessage(0f);
+
+ sentStop = true;
+
+ sentStart = false;
+ }
+ }
+
+ private void sendChangeRateMessage(float newRate)
+ {
+ try
+ {
+ // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
+ // job to detect it and turn it into a remoting one way invocation.
+ consumerDelegate.changeRate(newRate);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to send changeRate message", e);
+ }
+ }
+
public MessageListener getMessageListener()
{
return listener;
@@ -584,19 +613,7 @@
}
}
- private void sendChangeRateMessage(float newRate)
- {
- try
- {
- // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
- // job to detect it and turn it into a remoting one way invocation.
- consumerDelegate.changeRate(newRate);
- }
- catch (JMSException e)
- {
- log.error("Failed to send changeRate message", e);
- }
- }
+
private void queueRunner(ListenerRunner runner)
{
@@ -794,19 +811,9 @@
log.error("Failed to deliver message", e);
}
}
+
+ checkBufferSize();
-
- // Tell the server we need more messages - but we don't want to keep sending the message
- // if we've already sent it - hence the check
- if (!startSendingMessageSent && buffer.size() <= minBufferSize)
- {
- startSendingMessageSent = true;
-
- if (trace) { log.trace("Telling server to start resume sending messages, buffer size is " + buffer.size()); }
-
- sendChangeRateMessage(1);
- }
-
if (again)
{
// Queue it up again
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-19 11:59:00 UTC (rev 2345)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-19 12:30:36 UTC (rev 2346)
@@ -384,8 +384,13 @@
if (clientAccepting)
{
+ log.info(this + "Toggle on");
promptDelivery();
}
+ else
+ {
+ log.info(this + "Toggle off");
+ }
}
catch (Throwable t)
{
More information about the jboss-cvs-commits
mailing list