[jboss-cvs] JBoss Messaging SVN: r3541 - in trunk: src/main/org/jboss/jms/server/endpoint and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 7 10:59:53 EST 2008
Author: timfox
Date: 2008-01-07 10:59:53 -0500 (Mon, 07 Jan 2008)
New Revision: 3541
Removed:
trunk/tests/src/org/jboss/test/thirdparty/remoting/
Modified:
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
Log:
Merged flow control change from Branch_Stable and removed old remoting transport tests
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-01-07 13:30:09 UTC (rev 3540)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-01-07 15:59:53 UTC (rev 3541)
@@ -201,18 +201,18 @@
private int ackMode;
private boolean closed;
private Object mainLock;
- private int maxBufferSize;
- private int minBufferSize;
private QueuedExecutor sessionExecutor;
private boolean listenerRunning;
private int maxDeliveries;
private String queueName;
private long lastDeliveryId = -1;
- private volatile boolean serverSending = true;
private boolean waitingForLastDelivery;
private boolean shouldAck;
- private boolean handleFlowControl;
private long redeliveryDelay;
+ private boolean paused;
+ private int consumeCount;
+ private boolean firstTime = true;
+ private int bufferSize;
// Constructors ---------------------------------------------------------------------------------
@@ -228,8 +228,6 @@
throw new IllegalArgumentException(this + " bufferSize must be > 0");
}
- this.maxBufferSize = bufferSize;
- this.minBufferSize = bufferSize / 2;
buffer = new BasicPriorityLinkedList(10);
isConnectionConsumer = isCC;
this.ackMode = ackMode;
@@ -241,8 +239,8 @@
this.sessionExecutor = sessionExecutor;
this.maxDeliveries = maxDeliveries;
this.shouldAck = shouldAck;
- this.handleFlowControl = handleFlowControl;
this.redeliveryDelay = redeliveryDelay;
+ this.bufferSize = bufferSize;
}
// Public ---------------------------------------------------------------------------------------
@@ -281,11 +279,6 @@
if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
messageAdded();
-
- if (handleFlowControl)
- {
- checkStop();
- }
}
}
@@ -526,13 +519,6 @@
}
}
- //This needs to be outside the lock
-
- if (handleFlowControl)
- {
- checkStart();
- }
-
if (trace) { log.trace(this + " receive() returning " + m); }
return m;
@@ -564,6 +550,8 @@
{
buffer.addFirst(proxy, proxy.getJMSPriority());
+ consumeCount--;
+
messageAdded();
}
}
@@ -583,8 +571,7 @@
buffer.clear();
- // need to reset toggle state
- serverSending = true;
+ consumeCount = 0;
}
public long getRedeliveryDelay()
@@ -592,6 +579,37 @@
return redeliveryDelay;
}
+ public void pause()
+ {
+ synchronized (mainLock)
+ {
+ paused = true;
+
+ sendChangeRateMessage(0f);
+ }
+ }
+
+ public void resume()
+ {
+ synchronized (mainLock)
+ {
+ paused = false;
+
+ if (firstTime)
+ {
+ consumeCount = 0;
+
+ firstTime = false;
+ }
+ else
+ {
+ consumeCount = bufferSize / 3 - buffer.size();
+ }
+
+ sendChangeRateMessage(1f);
+ }
+ }
+
// Package protected ----------------------------------------------------------------------------
@@ -599,6 +617,18 @@
// Private --------------------------------------------------------------------------------------
+ private void checkSendChangeRate()
+ {
+ consumeCount++;
+
+ if (!paused && consumeCount == bufferSize)
+ {
+ consumeCount = 0;
+
+ sendChangeRateMessage(1.0f);
+ }
+ }
+
/*
* Wait for the last delivery to arrive
*/
@@ -647,39 +677,6 @@
}
}
- private void checkStop()
- {
- int size = buffer.size();
-
- if (serverSending && size >= maxBufferSize)
- {
- //Our buffer is full - we need to tell the server to stop sending if we haven't
- //done so already
-
- sendChangeRateMessage(0f);
-
- if (trace) { log.trace("Sent changeRate 0 message"); }
-
- serverSending = false;
- }
- }
-
- private void checkStart()
- {
- int size = buffer.size();
-
- if (!serverSending && size <= minBufferSize)
- {
- //We need more messages - we need to tell the server this if we haven't done so already
-
- sendChangeRateMessage(1.0f);
-
- if (trace) { log.trace("Sent changeRate 1.0 message"); }
-
- serverSending = true;
- }
- }
-
private void sendChangeRateMessage(float newRate)
{
try
@@ -842,6 +839,8 @@
if (!closed && !buffer.isEmpty())
{
m = (JBossMessage)buffer.removeFirst();
+
+ checkSendChangeRate();
}
return m;
@@ -898,7 +897,9 @@
// remove a message from the buffer
- msg = (JBossMessage)buffer.removeFirst();
+ msg = (JBossMessage)buffer.removeFirst();
+
+ checkSendChangeRate();
}
/*
@@ -946,11 +947,6 @@
}
}
- if (handleFlowControl)
- {
- checkStart();
- }
-
if (trace) { log.trace("Exiting run()"); }
}
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-07 13:30:09 UTC (rev 3540)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-07 15:59:53 UTC (rev 3541)
@@ -127,13 +127,19 @@
private ServerPeer sp;
+ private int prefetchSize;
+
+ private volatile int sendCount;
+
+ private boolean firstTime = true;
+
// Constructors ---------------------------------------------------------------------------------
ServerConsumerEndpoint(ServerPeer sp,String id, Queue messageQueue, String queueName,
ServerSessionEndpoint sessionEndpoint, String selector,
boolean noLocal, JBossDestination dest, Queue dlq,
Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts,
- boolean remote, boolean replicating) throws InvalidSelectorException
+ boolean remote, boolean replicating, int prefetchSize) throws InvalidSelectorException
{
if (trace)
{
@@ -171,6 +177,16 @@
this.replicating = replicating;
+ this.prefetchSize = prefetchSize;
+
+ boolean slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
+
+ if (slow)
+ {
+ //Slow is same as setting prefetch size to 1 - can deprecate this in 2.0
+ prefetchSize = 1;
+ }
+
this.slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
if (dest.isTopic() && !messageQueue.isRecoverable())
@@ -307,17 +323,23 @@
}
}
- if (slow)
+ sendCount++;
+
+ int num = prefetchSize;
+
+ if (firstTime)
{
- //If this is a slow consumer, we do not want to do any message buffering, so we immediately
- //set clientAccepting to false
- //When the client has consumed the message it will send a changeRate + message which will set
- //clientAccepting to true again
- //We cannot just rely on setting the prefetchSize to 1, since this is not a hard guarantee that only one message
- //will be buffered at once due to the asynchronous nature of sending changeRate
- this.clientAccepting = false;
+ //We make sure we have a little extra buffer on the client side
+ num = num + num / 3 ;
}
+ if (sendCount == num)
+ {
+ clientAccepting = false;
+
+ firstTime = false;
+ }
+
try
{
sessionEndpoint.handleDelivery(delivery, this);
@@ -402,21 +424,10 @@
try
{
- // For now we just support a binary on/off.
- // The client will send newRate = 0, to say it does not want any more messages when its
- // client side buffer gets full or it will send an arbitrary non zero number to say it
- // does want more messages, when its client side buffer empties to half its full size.
- // Note the client does not wait until the client side buffer is empty before sending a
- // newRate(+ve) message since this would add extra latency.
-
- // In the future we can fine tune this by allowing the client to specify an actual rate in
- // the newRate value so this is basically a placeholder for the future so we don't have to
- // change the wire format when we support it.
-
- // No need to synchronize - clientAccepting is volatile.
-
if (newRate > 0)
{
+ sendCount = 0;
+
clientAccepting = true;
}
else
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-07 13:30:09 UTC (rev 3540)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-07 15:59:53 UTC (rev 3541)
@@ -66,8 +66,6 @@
import org.jboss.messaging.core.contract.Condition;
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.DeliveryObserver;
-import org.jboss.messaging.newcore.Message;
-import org.jboss.messaging.newcore.MessageReference;
import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.contract.PostOffice;
@@ -81,6 +79,8 @@
import org.jboss.messaging.core.impl.tx.TxCallback;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.newcore.Message;
+import org.jboss.messaging.newcore.MessageReference;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.messaging.util.MessageQueueNameHelper;
@@ -379,12 +379,6 @@
{
if (thisSequence != -1)
{
- //Need to make sure it is in correct order since np messages are sent
- //one way so they can arrive out of sequence
-
- //This is a workaround to allow us to use one way messages for np messages for performance
- //reasons
-
synchronized (waitLock)
{
connectionEndpoint.sendMessage(message, null, checkForDuplicates);
@@ -1776,7 +1770,7 @@
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(sp, consumerID, binding.queue,
binding.queue.getName(), this, selectorString, false,
- dest, null, null, 0, -1, true, false);
+ dest, null, null, 0, -1, true, false, prefetchSize);
ConsumerAdvised advised;
@@ -2078,7 +2072,7 @@
new ServerConsumerEndpoint(sp, consumerID, queue,
queue.getName(), this, selectorString, noLocal,
jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelayToUse,
- maxDeliveryAttemptsToUse, false, replicating);
+ maxDeliveryAttemptsToUse, false, replicating, prefetchSize);
if (queue.isClustered() && sp.getConfiguration().isClustered() && jmsDestination.isTopic() && subscriptionName != null)
{
More information about the jboss-cvs-commits
mailing list