[jboss-cvs] JBoss Messaging SVN: r3477 - in branches/Branch_Stable/src/main/org/jboss: jms/server/connectionfactory and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 11 18:40:56 EST 2007
Author: timfox
Date: 2007-12-11 18:40:55 -0500 (Tue, 11 Dec 2007)
New Revision: 3477
Modified:
branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1199
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-12-11 23:40:55 UTC (rev 3477)
@@ -205,27 +205,32 @@
private int ackMode;
private boolean closed;
private Object mainLock;
- private int maxBufferSize;
- private int minBufferSize;
private QueuedExecutor sessionExecutor;
private boolean listenerRunning;
private int maxDeliveries;
private String queueName;
private long lastDeliveryId = -1;
- private volatile boolean serverSending = true;
+ private int bufferSize;
private boolean waitingForLastDelivery;
private boolean shouldAck;
- private boolean handleFlowControl;
private long redeliveryDelay;
- private volatile int currentToken;
-
+ private volatile int currentToken;
+ private boolean paused;
+ private int consumeCount;
+ private boolean firstTime = true;
+
+ public int getBufferSize()
+ {
+ return buffer.size();
+ }
+
// Constructors ---------------------------------------------------------------------------------
public ClientConsumer(boolean isCC, int ackMode,
SessionDelegate sess, ConsumerDelegate cons, String consumerID,
String queueName,
int bufferSize, QueuedExecutor sessionExecutor,
- int maxDeliveries, boolean shouldAck, boolean handleFlowControl,
+ int maxDeliveries, boolean shouldAck,
long redeliveryDelay)
{
if (bufferSize < 1)
@@ -233,8 +238,7 @@
throw new IllegalArgumentException(this + " bufferSize must be > 0");
}
- this.maxBufferSize = bufferSize;
- this.minBufferSize = bufferSize / 2;
+ this.bufferSize = bufferSize;
buffer = new BasicPriorityLinkedList(10);
isConnectionConsumer = isCC;
this.ackMode = ackMode;
@@ -246,7 +250,6 @@
this.sessionExecutor = sessionExecutor;
this.maxDeliveries = maxDeliveries;
this.shouldAck = shouldAck;
- this.handleFlowControl = handleFlowControl;
this.redeliveryDelay = redeliveryDelay;
}
@@ -515,14 +518,7 @@
receiverThread = null;
}
}
-
- //This needs to be outside the lock
-
- if (handleFlowControl)
- {
- checkStart();
- }
-
+
if (trace) { log.trace(this + " receive() returning " + m); }
return m;
@@ -554,6 +550,8 @@
{
buffer.addFirst(proxy, proxy.getJMSPriority());
+ consumeCount--;
+
messageAdded();
}
}
@@ -575,16 +573,59 @@
buffer.clear();
- // need to reset toggle state
- serverSending = true;
+ consumeCount = 0;
}
public long getRedeliveryDelay()
{
return redeliveryDelay;
}
+
+ public void pause()
+ {
+ synchronized (mainLock)
+ {
+ paused = true;
+
+ sendChangeRateMessage(0f);
+ }
+ }
+ public void resume()
+ {
+ synchronized (mainLock)
+ {
+ paused = false;
+
+ if (firstTime)
+ {
+ consumeCount = 0;
+
+ firstTime = false;
+ }
+ else
+ {
+ consumeCount = bufferSize / 3 - buffer.size();
+ }
+
+ sendChangeRateMessage(1f);
+ }
+ }
+ /*
+ * When I unpause I have x messages in buffer, I will then receive another buffersize messages
+ *
+ * that will give x + buffersize messages
+ *
+ * if x = buffersize / 3 then consumeCount = 0
+ *
+ * consumerCount = x - buffer size / 3
+ *
+ *
+ *
+ *
+ */
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
@@ -639,39 +680,6 @@
}
}
- private void checkStop()
- {
- int size = buffer.size();
-
- if (serverSending && size >= maxBufferSize)
- {
- //Our buffer is full - we need to tell the server to stop sending if we haven't
- //done so already
-
- sendChangeRateMessage(0f);
-
- if (trace) { log.trace("Sent changeRate 0 message"); }
-
- serverSending = false;
- }
- }
-
- private void checkStart()
- {
- int size = buffer.size();
-
- if (!serverSending && size <= minBufferSize)
- {
- //We need more messages - we need to tell the server this if we haven't done so already
-
- sendChangeRateMessage(1.0f);
-
- if (trace) { log.trace("Sent changeRate 1.0 message"); }
-
- serverSending = true;
- }
- }
-
private void sendChangeRateMessage(float newRate)
{
try
@@ -834,11 +842,25 @@
if (!closed && !buffer.isEmpty())
{
m = (MessageProxy)buffer.removeFirst();
+
+ checkSendChangeRate();
}
return m;
}
+ private void checkSendChangeRate()
+ {
+ consumeCount++;
+
+ if (!paused && consumeCount == bufferSize)
+ {
+ consumeCount = 0;
+
+ sendChangeRateMessage(1.0f);
+ }
+ }
+
// Inner classes --------------------------------------------------------------------------------
/*
@@ -911,11 +933,6 @@
if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
messageAdded();
-
- if (handleFlowControl)
- {
- checkStop();
- }
}
}
catch (Exception e)
@@ -951,7 +968,9 @@
// remove a message from the buffer
- mp = (MessageProxy)buffer.removeFirst();
+ mp = (MessageProxy)buffer.removeFirst();
+
+ checkSendChangeRate();
}
/*
@@ -999,11 +1018,6 @@
}
}
- if (handleFlowControl)
- {
- checkStart();
- }
-
if (trace) { log.trace("Exiting run()"); }
}
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-12-11 23:40:55 UTC (rev 3477)
@@ -110,7 +110,7 @@
new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
sessionDelegate, consumerDelegate, consumerID, queueName,
prefetchSize, sessionExecutor, maxDeliveries, consumerState.isShouldAck(),
- autoFlowControl, redeliveryDelay);
+ redeliveryDelay);
sessionState.addCallbackHandler(messageHandler);
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-12-11 23:40:55 UTC (rev 3477)
@@ -190,10 +190,6 @@
if (refCount == 1 && enablePing)
{
- // TODO Something is not quite right here, we can detect failure even if pinging is not
- // enabled, for example if we try to send a callback to the client and sending the
- // calback fails
-
// install the connection listener that listens for failed connections
server.invoke(connectorObjectName, "addConnectionListener",
new Object[] {connectionManager},
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-12-11 23:40:55 UTC (rev 3477)
@@ -107,17 +107,17 @@
private boolean replicating;
- private boolean slow;
-
private volatile boolean dead;
+ private int prefetchSize;
+
// Constructors ---------------------------------------------------------------------------------
ServerConsumerEndpoint(String id, Queue messageQueue, String queueName,
ServerSessionEndpoint sessionEndpoint, String selector,
boolean noLocal, JBossDestination dest, Queue dlq,
Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts,
- boolean remote, boolean replicating) throws InvalidSelectorException
+ boolean remote, boolean replicating, int prefetchSize) throws InvalidSelectorException
{
if (trace)
{
@@ -154,9 +154,17 @@
this.preserveOrdering = sessionEndpoint.getConnectionEndpoint().getServerPeer().isDefaultPreserveOrdering();
this.replicating = replicating;
+
+ this.prefetchSize = prefetchSize;
- this.slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
+ boolean slow = sessionEndpoint.getConnectionEndpoint().getConnectionFactoryEndpoint().isSlowConsumers();
+ if (slow)
+ {
+ //Slow is same as setting prefetch size to 1 - can deprecate this in 2.0
+ prefetchSize = 1;
+ }
+
if (dest.isTopic() && !messageQueue.isRecoverable())
{
// This is a consumer of a non durable topic subscription. We don't need to store
@@ -291,17 +299,24 @@
}
}
- if (slow)
+
+ sendCount++;
+
+ int num = prefetchSize;
+
+ if (firstTime)
{
- //If this is a slow consumer, we do not want to do any message buffering, so we immediately
- //set clientAccepting to false
- //When the client has consumed the message it will send a changeRate + message which will set
- //clientAccepting to true again
- //We cannot just rely on setting the prefetchSize to 1, since this is not a hard guarantee that only one message
- //will be buffered at once due to the asynchronous nature of sending changeRate
- this.clientAccepting = false;
+ //We make sure we have a little extra buffer on the client side
+ num = num + num / 3 ;
}
+ if (sendCount == num)
+ {
+ clientAccepting = false;
+
+ firstTime = false;
+ }
+
try
{
sessionEndpoint.handleDelivery(delivery, this);
@@ -312,11 +327,15 @@
this.started = false; // DO NOT return null or the message might get delivered more than once
}
-
+
return delivery;
}
}
+ private volatile int sendCount;
+
+ private boolean firstTime = true;
+
// Filter implementation ------------------------------------------------------------------------
public boolean accept(Message msg)
@@ -386,21 +405,9 @@
try
{
- // For now we just support a binary on/off.
- // The client will send newRate = 0, to say it does not want any more messages when its
- // client side buffer gets full or it will send an arbitrary non zero number to say it
- // does want more messages, when its client side buffer empties to half its full size.
- // Note the client does not wait until the client side buffer is empty before sending a
- // newRate(+ve) message since this would add extra latency.
-
- // In the future we can fine tune this by allowing the client to specify an actual rate in
- // the newRate value so this is basically a placeholder for the future so we don't have to
- // change the wire format when we support it.
-
- // No need to synchronize - clientAccepting is volatile.
-
if (newRate > 0)
{
+ sendCount = 0;
clientAccepting = true;
}
else
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-12-11 23:40:55 UTC (rev 3477)
@@ -1781,7 +1781,7 @@
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, binding.queue,
binding.queue.getName(), this, selectorString, false,
- dest, null, null, 0, -1, true, false);
+ dest, null, null, 0, -1, true, false, prefetchSize);
ConsumerAdvised advised;
@@ -2085,7 +2085,7 @@
new ServerConsumerEndpoint(consumerID, queue,
queue.getName(), this, selectorString, noLocal,
jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelayToUse,
- maxDeliveryAttemptsToUse, false, replicating);
+ maxDeliveryAttemptsToUse, false, replicating, prefetchSize);
if (queue.isClustered() && postOffice.isClustered() && jmsDestination.isTopic() && subscriptionName != null)
{
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-12-11 17:56:59 UTC (rev 3476)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-12-11 23:40:55 UTC (rev 3477)
@@ -22,17 +22,17 @@
package org.jboss.messaging.core.impl.clusterconnection;
-import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.client.container.ClientConsumer;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.client.state.ConsumerState;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
@@ -76,8 +76,10 @@
private boolean consuming;
- private ConsumerDelegate consumer;
+ private ClientConsumerDelegate consumer;
+ private ClientConsumer clientConsumer;
+
private boolean preserveOrdering;
private long sourceChannelID;
@@ -158,8 +160,10 @@
//manually using changeRate() methods
//The local queue itself will manually send these messages depending on its state -
//So effectively the message buffering is handled by the local queue, not the ClientConsumer
- consumer = sourceSession.createConsumerDelegate(dest, null, false, null, false, false);
-
+ consumer = (ClientConsumerDelegate)sourceSession.createConsumerDelegate(dest, null, false, null, false, false);
+
+ clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
+
consumer.setMessageListener(this);
//Register ourselves with the local queue - this queue will handle flow control for us
@@ -218,22 +222,18 @@
{
if (consume && !consuming)
{
- //Send a changeRate(1) message - to start consumption
+ if (trace) { log.trace(this + " resuming client consumer"); }
+
+ clientConsumer.resume();
- consumer.changeRate(1f);
-
- if (trace) { log.trace(this + " sent changeRate(1) message"); }
-
consuming = true;
}
else if (!consume && consuming)
{
- //Send a changeRate(0) message to stop consumption
+ if (trace) { log.trace(this + " pausing client consumer"); }
+
+ clientConsumer.pause();
- consumer.changeRate(0f);
-
- if (trace) { log.trace(this + " sent changeRate(0) message"); }
-
consuming = false;
}
}
More information about the jboss-cvs-commits
mailing list