[jboss-cvs] JBoss Messaging SVN: r4367 - trunk/src/main/org/jboss/messaging/core/client/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri May 30 22:14:51 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-05-30 22:14:51 -0400 (Fri, 30 May 2008)
New Revision: 4367
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
Log:
JBMESSAGING-1351
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-30 22:24:56 UTC (rev 4366)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-31 02:14:51 UTC (rev 4367)
@@ -232,7 +232,9 @@
waitForOnMessageToComplete();
- this.handler = handler;
+ this.handler = handler;
+
+ flushBuffer(false);
}
public void close() throws MessagingException
@@ -343,13 +345,12 @@
maxSize = Math.max(maxSize, buffer.size());
}
- sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
+ flushBuffer(true);
}
}
else
{
// Add it to the buffer
-
synchronized (this)
{
buffer.addLast(message, message.getPriority());
@@ -358,7 +359,7 @@
}
}
}
-
+
int maxSize = 0;
public void recover(final long lastDeliveryID)
@@ -380,6 +381,11 @@
// Private
// --------------------------------------------------------------------------------------
+ private void flushBuffer(final boolean singleMessage)
+ {
+ sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(singleMessage); } } );
+ }
+
private void flowControl(final int messageBytes) throws MessagingException
{
if (clientWindowSize > 0)
@@ -434,29 +440,39 @@
throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
}
}
-
- private void callOnMessage()
+
+
+ // TODO (JBMESSAGING-1351): I have made this parameter singleMessage as I just wanted to keep the current behavior of handleMessages now,
+ // and as I don't want to mess up with results (performance, tests... everything else)
+ // But I have the impression that callOnMessage should aways run until the buffer is empty, on that case singleMessage
+ // should be ignored
+ private void callOnMessage(final boolean singleMessage)
{
try
{
- if (closed)
- {
- return;
- }
-
- //We pull the message from the buffer from inside the Runnable so we can ensure priority
- //ordering. If we just added a Runnable with the message to the executor immediately as we get it
- //we could not do that
-
- ClientMessage message;
-
- synchronized (this)
- {
- message = buffer.removeFirst();
- }
-
- if (message != null)
- {
+ do
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ //We pull the message from the buffer from inside the Runnable so we can ensure priority
+ //ordering. If we just added a Runnable with the message to the executor immediately as we get it
+ //we could not do that
+
+ ClientMessage message;
+
+ synchronized (this)
+ {
+ message = buffer.removeFirst();
+ }
+
+ if (message == null)
+ {
+ break;
+ }
+
boolean expired = message.isExpired();
session.delivered(message.getDeliveryID(), expired);
@@ -469,7 +485,8 @@
handler.onMessage(message);
}
- }
+ }
+ while (!singleMessage);
}
catch (MessagingException e)
{
More information about the jboss-cvs-commits
mailing list