[jboss-cvs] JBoss Messaging SVN: r6625 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 29 17:18:47 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-29 17:18:47 -0400 (Wed, 29 Apr 2009)
New Revision: 6625
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
Log:
SlowConsumer & ClientConsumerWindowSize fix
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-29 21:18:47 UTC (rev 6625)
@@ -92,6 +92,8 @@
private volatile boolean closed;
private volatile int creditsToSend;
+
+ private volatile boolean slowConsumerInitialCreditSent = false;
private volatile Exception lastException;
@@ -154,10 +156,10 @@
throw new MessagingException(MessagingException.ILLEGAL_STATE,
"Cannot call receive(...) - a MessageHandler is set");
}
-
- if (clientWindowSize == 0 && buffer.isEmpty())
+
+ if (clientWindowSize == 0)
{
- sendCredits(1);
+ startSlowConsumer();
}
receiverThread = Thread.currentThread();
@@ -221,6 +223,11 @@
session.expire(id, m.getMessageID());
+ if (clientWindowSize == 0)
+ {
+ startSlowConsumer();
+ }
+
if (toWait > 0)
{
continue;
@@ -283,7 +290,7 @@
if (handler != theHandler && clientWindowSize == 0)
{
- sendCredits(1);
+ startSlowConsumer();
}
handler = theHandler;
@@ -409,7 +416,7 @@
}
// Flow control for the first packet, we will have others
- flowControl(packet.getPacketSize(), true);
+ flowControl(packet.getPacketSize(), false);
currentChunkMessage = new ClientMessageImpl();
@@ -482,8 +489,10 @@
* flow control is synchornized because of LargeMessage and streaming.
* LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
* So, this operation needs to be atomic.
+ *
+ * @parameter discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
* */
- public void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException
+ public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws MessagingException
{
if (clientWindowSize >= 0)
{
@@ -492,37 +501,35 @@
if (creditsToSend >= clientWindowSize)
{
- if (isLargeMessage)
+ if (clientWindowSize == 0 && discountSlowConsumer)
{
- // Flowcontrol on largeMessages continuations needs to be done in a separate thread or failover would
- // block
- final int credits = creditsToSend;
+ if (trace)
+ {
+ log.trace("Sending " + creditsToSend + " -1, for slow consumer");
+ }
+
+ slowConsumerInitialCreditSent = false;
+ // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
+ // always buffering one after received the first message
+ final int credits = creditsToSend - 1;
+
creditsToSend = 0;
-
+
sendCredits(credits);
}
else
{
- if (clientWindowSize == 0)
+ if (trace)
{
- if (trace)
- {
- log.trace("Sending full credits - 1 for slow consumer");
- }
- // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
- // always buffering one after received the first message
- sendCredits(creditsToSend - 1);
+ log.trace("Sending " + messageBytes + " from flow-control");
}
- else
- {
- if (trace)
- {
- log.trace("Sending full credits");
- }
- sendCredits(creditsToSend);
- }
+
+ final int credits = creditsToSend;
+
creditsToSend = 0;
+
+ sendCredits(credits);
}
}
}
@@ -540,6 +547,22 @@
// Private
// ---------------------------------------------------------------------------------------
+ /**
+ * Sending a initial credit for slow consumers
+ * */
+ private void startSlowConsumer()
+ {
+ if (!slowConsumerInitialCreditSent)
+ {
+ if (trace)
+ {
+ log.trace("Sending 1 credit to start delivering of one message to slow consumer");
+ }
+ slowConsumerInitialCreditSent = true;
+ sendCredits(1);
+ }
+ }
+
private void requeueExecutors()
{
for (int i = 0; i < buffer.size(); i++)
@@ -564,7 +587,7 @@
{
if (trace)
{
- log.trace("Sending " + credits + " back");
+ log.trace("Sending " + credits + " credits back", new Exception ("trace"));
}
channel.send(new SessionConsumerFlowCreditMessage(id, credits));
}
@@ -664,11 +687,7 @@
// If slow consumer, we need to send 1 credit to make sure we get another message
if (clientWindowSize == 0)
{
- if (trace)
- {
- log.trace("Sending 1 credit back after consuming message on slow consumer (no-buffering)");
- }
- sendCredits(1);
+ startSlowConsumer();
}
}
}
@@ -683,7 +702,7 @@
// Chunk messages will execute the flow control while receiving the chunks
if (message.getFlowControlSize() != 0)
{
- flowControl(message.getFlowControlSize(), false);
+ flowControl(message.getFlowControlSize(), true);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2009-04-29 21:18:47 UTC (rev 6625)
@@ -45,7 +45,7 @@
void handleLargeMessageContinuation(SessionReceiveContinuationMessage continuation) throws Exception;
- void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException;
+ void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws MessagingException;
void clear();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java 2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java 2009-04-29 21:18:47 UTC (rev 6625)
@@ -123,38 +123,56 @@
* Add a buff to the List, or save it to the OutputStream if set
* @param packet
*/
- public synchronized void addPacket(final SessionReceiveContinuationMessage packet)
+ public void addPacket(final SessionReceiveContinuationMessage packet)
{
- if (outStream != null)
+ int flowControlCredit = 0;
+ boolean continues = false;
+
+ synchronized (this)
{
- try
+ if (outStream != null)
{
- if (!packet.isContinues())
+ try
{
- streamEnded = true;
- }
+ if (!packet.isContinues())
+ {
+ streamEnded = true;
+ }
- outStream.write(packet.getBody());
+ outStream.write(packet.getBody());
- consumerInternal.flowControl(packet.getPacketSize(), true);
+ flowControlCredit = packet.getPacketSize();
+ continues = packet.isContinues();
- notifyAll();
+ notifyAll();
- if (streamEnded)
+ if (streamEnded)
+ {
+ outStream.close();
+ }
+ }
+ catch (Exception e)
{
- outStream.close();
+ handledException = e;
}
}
+ else
+ {
+ packets.offer(packet);
+ }
+ }
+
+ if (flowControlCredit != 0)
+ {
+ try
+ {
+ consumerInternal.flowControl(flowControlCredit, !continues);
+ }
catch (Exception e)
{
handledException = e;
-
}
}
- else
- {
- packets.offer(packet);
- }
}
public synchronized void close()
@@ -164,25 +182,35 @@
notifyAll();
}
- public synchronized void setOutputStream(final OutputStream output) throws MessagingException
+ public void setOutputStream(final OutputStream output) throws MessagingException
{
- if (currentPacket != null)
+
+ int totalFlowControl = 0;
+ boolean continues = false;
+
+ synchronized (this)
{
- sendPacketToOutput(output, currentPacket);
- currentPacket = null;
- }
- while (true)
- {
- SessionReceiveContinuationMessage packet = packets.poll();
- if (packet == null)
+ if (currentPacket != null)
{
- break;
+ sendPacketToOutput(output, currentPacket);
+ currentPacket = null;
}
- consumerInternal.flowControl(packet.getPacketSize(), true);
- sendPacketToOutput(output, packet);
+ while (true)
+ {
+ SessionReceiveContinuationMessage packet = packets.poll();
+ if (packet == null)
+ {
+ break;
+ }
+ totalFlowControl += packet.getPacketSize();
+ continues = packet.isContinues();
+ sendPacketToOutput(output, packet);
+ }
+
+ outStream = output;
}
- outStream = output;
+ consumerInternal.flowControl(totalFlowControl, !continues);
}
public synchronized void saveBuffer(final OutputStream output) throws MessagingException
@@ -1134,7 +1162,7 @@
throw new IndexOutOfBoundsException();
}
- consumerInternal.flowControl(currentPacket.getPacketSize(), true);
+ consumerInternal.flowControl(currentPacket.getPacketSize(), !currentPacket.isContinues());
packetPosition += sizeToAdd;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-04-29 21:18:47 UTC (rev 6625)
@@ -327,6 +327,11 @@
{
int previous = availableCredits.getAndAdd(credits);
+ if (trace)
+ {
+ log.trace("Received " + credits + " credits, previous value = " + previous + " currentValue = " + availableCredits.get());
+ }
+
if (previous <= 0 && previous + credits > 0)
{
promptDelivery();
@@ -485,6 +490,10 @@
private void promptDelivery()
{
+ if (trace)
+ {
+ log.trace("Starting prompt delivery");
+ }
lock.lock();
try
{
@@ -932,6 +941,10 @@
// Since we are not sending anything to the client during this calculation, this is unlikely to happen
if (availableCredits.compareAndSet(currentCredit, currentCredit - precalculatedCredits))
{
+ if (trace)
+ {
+ log.trace("Taking " + precalculatedCredits + " credits out on preCalculateFlowControl (largeMessage)");
+ }
return precalculatedCredits;
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-04-29 21:18:47 UTC (rev 6625)
@@ -240,13 +240,41 @@
ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+ ClientProducer prod = session.createProducer(ADDRESS);
+
+ // This will force a credit to be sent, but if the message wasn't received we need to take out that credit from the server
+ // or the client will be buffering messages
+ assertNull(consNeverUsed.receive(1));
+
+ ClientMessage msg = createTextMessage(session, "This one will expire");
+ if (largeMessages)
+ {
+ msg.getBody().writeBytes(new byte[600]);
+ }
+
+ msg.setExpiration(System.currentTimeMillis() + 100);
+ prod.send(msg);
+
+ msg = createTextMessage(session, "First-on-non-buffered");
+
+ prod.send(msg);
+
+ Thread.sleep(110);
+
+ // It will be able to receive another message, but it shouldn't send a credit again, as the credit was already sent
+ msg = consNeverUsed.receive(TIMEOUT * 1000);
+ assertNotNull(msg);
+ assertEquals("First-on-non-buffered", getTextMessage(msg));
+ msg.acknowledge();
+
+
ClientConsumer cons1 = session.createConsumer(ADDRESS);
- ClientProducer prod = session.createProducer(ADDRESS);
+
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage msg = createTextMessage(session, "Msg" + i);
+ msg = createTextMessage(session, "Msg" + i);
if (largeMessages)
{
@@ -258,7 +286,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage msg = cons1.receive(1000);
+ msg = cons1.receive(1000);
assertNotNull("expected message at i = " + i, msg);
assertEquals("Msg" + i, getTextMessage(msg));
msg.acknowledge();
More information about the jboss-cvs-commits
mailing list