[jboss-cvs] JBoss Messaging SVN: r4213 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri May 16 06:24:44 EDT 2008
Author: timfox
Date: 2008-05-16 06:24:44 -0400 (Fri, 16 May 2008)
New Revision: 4213
Modified:
trunk/build-messaging.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
Log:
Refactored consumer flow control to work with bytes
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/build-messaging.xml 2008-05-16 10:24:44 UTC (rev 4213)
@@ -633,6 +633,7 @@
<fileset dir="${test.classes.dir}">
<include name="**/org/jboss/messaging/tests/integration/**/*${test-mask}.class"/>
<include name="**/org/jboss/messaging/tests/unit/**/*${test-mask}.class"/>
+ <exclude name="**/org/jboss/messaging/tests/local/**/*${test-mask}.class"/>
</fileset>
</batchtest>
</junit>
@@ -669,14 +670,13 @@
<fileset dir="${test.jms.classes.dir}">
<include name="**/messaging/**/${test-mask}.class"/>
<include name="**/jms/**/${test-mask}.class"/>
- <!-- FIXME temporarily exclude the ack tests since they hang the test suite at the moment -->
- <exclude name="**/jms/AcknowledgementTest.class"/>
+ <include name="**/messaging/util/**/${test-mask}.class"/>
<!-- We exclude the recovery tests for now, until we get recovery up and running again -->
<exclude name="**/jms/XARecoveryTest.class"/>
<exclude name="**/jms/XAResourceRecoveryTest.class"/>
<exclude name="**/jms/XATest.class"/>
<exclude name="**/jms/ConnectionConsumerTest.class"/>
- <include name="**/messaging/util/**/${test-mask}.class"/>
+
<exclude name="**/*NativeTest.class"/>
<exclude name="**/jms/MemLeakTest.class"/>
<exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-05-16 10:24:44 UTC (rev 4213)
@@ -100,28 +100,26 @@
public ClientConnectionFactoryImpl(final Location location)
{
- this.location = location;
- this.strictTck = false;
- this.defaultConsumerWindowSize = 1000;
- this.defaultConsumerMaxRate = -1;
- this.defaultProducerWindowSize = 1000;
- this.defaultProducerMaxRate = -1;
- this.dispatcher = new PacketDispatcherImpl(null);
- connectionParams = new ConnectionParamsImpl();
+ this(location, new ConnectionParamsImpl(), false);
}
public ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams)
{
- this.location = location;
+ this(location, connectionParams, false);
+ }
+
+ protected ClientConnectionFactoryImpl(final Location location, final ConnectionParams connectionParams, final boolean dummy)
+ {
this.strictTck = false;
- this.defaultConsumerWindowSize = 1000;
+ this.defaultConsumerWindowSize = 1024 * 1024;
this.defaultConsumerMaxRate = -1;
this.defaultProducerWindowSize = 1000;
this.defaultProducerMaxRate = -1;
this.dispatcher = new PacketDispatcherImpl(null);
+ this.location = location;
this.connectionParams = connectionParams;
}
-
+
public ClientConnection createConnection() throws MessagingException
{
return createConnection(null, null);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-16 10:24:44 UTC (rev 4213)
@@ -68,7 +68,7 @@
private final RemotingConnection remotingConnection;
- private final int tokenBatchSize;
+ private final int clientWindowSize;
private final PriorityLinkedList<ClientMessage> buffer = new PriorityLinkedListImpl<ClientMessage>(10);
@@ -104,7 +104,7 @@
final long clientTargetID,
final ExecutorService sessionExecutor,
final RemotingConnection remotingConnection,
- final int tokenBatchSize,
+ final int clientWindowSize,
final boolean direct)
{
this.targetID = targetID;
@@ -117,7 +117,7 @@
this.remotingConnection = remotingConnection;
- this.tokenBatchSize = tokenBatchSize;
+ this.clientWindowSize = clientWindowSize;
this.direct = direct;
}
@@ -175,7 +175,7 @@
session.delivered(m.getDeliveryID(), expired);
- flowControl();
+ flowControl(m.encodeSize());
if (expired)
{
@@ -240,7 +240,9 @@
{
return;
}
-
+
+ log.info("** max size is " + this.maxSize);
+
try
{
// Now we wait for any current handler runners to run.
@@ -324,7 +326,7 @@
session.delivered(message.getDeliveryID(), expired);
- flowControl();
+ flowControl(message.encodeSize());
if (!expired)
{
@@ -338,6 +340,8 @@
synchronized (this)
{
buffer.addLast(message, message.getPriority());
+
+ maxSize = Math.max(maxSize, buffer.size());
}
sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
@@ -355,6 +359,8 @@
}
}
}
+
+ int maxSize = 0;
public void recover(final long lastDeliveryID)
{
@@ -375,17 +381,17 @@
// Private
// --------------------------------------------------------------------------------------
- private void flowControl() throws MessagingException
+ private void flowControl(final int messageBytes) throws MessagingException
{
- if (tokenBatchSize > 0)
+ if (clientWindowSize > 0)
{
- tokensToSend++;
+ tokensToSend += messageBytes;
- if (tokensToSend == tokenBatchSize)
- {
- tokensToSend = 0;
+ if (tokensToSend >= clientWindowSize)
+ {
+ remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowTokenMessage(tokensToSend));
- remotingConnection.sendOneWay(targetID, session.getServerTargetID(), new ConsumerFlowTokenMessage(tokenBatchSize));
+ tokensToSend = 0;
}
}
}
@@ -456,7 +462,7 @@
session.delivered(message.getDeliveryID(), expired);
- flowControl();
+ flowControl(message.encodeSize());
if (!expired)
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-16 10:24:44 UTC (rev 4213)
@@ -275,10 +275,31 @@
SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
- int tokenBatchSize = response.getWindowSize() == -1 ? 0 : 1;
+ int windowSize = response.getWindowSize();
+ int clientWindowSize;
+ if (windowSize == -1)
+ {
+ //No flow control - buffer can increase without bound! Only use with caution for very fast consumers
+ clientWindowSize = 0;
+ }
+ else if (windowSize == 1)
+ {
+ //Slow consumer - no buffering
+ clientWindowSize = 1;
+ }
+ else if (windowSize > 1)
+ {
+ //Client window size is half server window size
+ clientWindowSize = windowSize >> 1;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid window size " + windowSize);
+ }
+
ClientConsumerInternal consumer =
- new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, tokenBatchSize, direct);
+ new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, clientWindowSize, direct);
consumers.put(response.getConsumerTargetID(), consumer);
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-05-16 10:24:44 UTC (rev 4213)
@@ -40,6 +40,4 @@
void setStarted(boolean started) throws Exception;
void receiveTokens(int tokens) throws Exception;
-
- void promptDelivery();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-16 10:24:44 UTC (rev 4213)
@@ -224,7 +224,7 @@
// if (count == 500000)
// {
messageReferences.removeFirst();
- // }
+ // }
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-05-16 10:24:44 UTC (rev 4213)
@@ -93,7 +93,7 @@
private final AtomicInteger availableTokens;
private boolean started;
-
+
// Constructors ---------------------------------------------------------------------------------
ServerConsumerImpl(final long id, final long clientTargetID, final Queue messageQueue, final boolean noLocal, final Filter filter,
@@ -162,12 +162,12 @@
}
public HandleStatus handle(MessageReference ref) throws Exception
- {
- if (availableTokens != null && availableTokens.get() == 0)
+ {
+ if (availableTokens != null && availableTokens.get() <= 0)
{
return HandleStatus.BUSY;
}
-
+
if (ref.getMessage().isExpired())
{
ref.expire(persistenceManager, postOffice, queueSettingsRepository);
@@ -209,7 +209,7 @@
if (availableTokens != null)
{
- availableTokens.decrementAndGet();
+ availableTokens.addAndGet(-message.encodeSize());
}
try
@@ -274,18 +274,16 @@
public void receiveTokens(final int tokens) throws Exception
{
- int previous = availableTokens != null ? availableTokens.getAndAdd(tokens) : 0;
-
- if (previous == 0)
+ if (availableTokens != null)
{
- promptDelivery();
- }
- }
-
- public void promptDelivery()
- {
- sessionEndpoint.promptDelivery(messageQueue);
- }
+ int previous = availableTokens.getAndAdd(tokens);
+
+ if (previous <= 0)
+ {
+ promptDelivery();
+ }
+ }
+ }
// Public -----------------------------------------------------------------------------
@@ -296,4 +294,8 @@
// Private --------------------------------------------------------------------------------------
+ private void promptDelivery()
+ {
+ sessionEndpoint.promptDelivery(messageQueue);
+ }
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-05-16 09:47:57 UTC (rev 4212)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-05-16 10:24:44 UTC (rev 4213)
@@ -126,7 +126,7 @@
String clientID = null;
int dupsOKBatchSize = 1000;
- int consumerWindowSize = 1000;
+ int consumerWindowSize = 1024 * 1024;
int consumerMaxRate = -1;
int producerWindowSize = 1000;
int producerMaxRate = -1;
More information about the jboss-cvs-commits
mailing list