[jboss-cvs] JBoss Messaging SVN: r4412 - trunk/src/main/org/jboss/messaging/core/client/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 9 13:22:40 EDT 2008
Author: timfox
Date: 2008-06-09 13:22:39 -0400 (Mon, 09 Jun 2008)
New Revision: 4412
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1351
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-06-09 17:22:39 UTC (rev 4412)
@@ -175,7 +175,6 @@
{
remotingConnection = remotingConnectionFactory.createRemotingConnection(location, connectionParams);
- log.info("calling start");
remotingConnection.start();
long sessionID = remotingConnection.getSessionID();
@@ -207,7 +206,6 @@
if (t instanceof MessagingException)
{
- log.info("got messaging excetption");
throw (MessagingException)t;
}
else
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-09 16:53:40 UTC (rev 4411)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-09 17:22:39 UTC (rev 4412)
@@ -235,7 +235,14 @@
this.handler = handler;
- flushBuffer(false);
+ //If there are any messages in the buffer, we need to queue up executors for them
+ synchronized (this)
+ {
+ for (int i = 0; i < buffer.size(); i++)
+ {
+ queueExecutor();
+ }
+ }
}
public void close() throws MessagingException
@@ -346,7 +353,7 @@
maxSize = Math.max(maxSize, buffer.size());
}
- flushBuffer(true);
+ queueExecutor();
}
}
else
@@ -382,9 +389,9 @@
// Private
// --------------------------------------------------------------------------------------
- private void flushBuffer(final boolean singleMessage)
+ private void queueExecutor()
{
- sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(singleMessage); } } );
+ sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
}
private void flowControl(final int messageBytes) throws MessagingException
@@ -441,39 +448,29 @@
throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
}
}
-
- // TODO (JBMESSAGING-1351): I have made this parameter singleMessage as I just wanted to keep the current behavior of handleMessages now,
- // and as I don't want to mess up with results (performance, tests... everything else)
- // But I have the impression that callOnMessage should aways run until the buffer is empty, on that case singleMessage
- // should be ignored
- private void callOnMessage(final boolean singleMessage)
+ private void callOnMessage()
{
try
{
- do
- {
- if (closed)
- {
- return;
- }
-
- //We pull the message from the buffer from inside the Runnable so we can ensure priority
- //ordering. If we just added a Runnable with the message to the executor immediately as we get it
- //we could not do that
-
- ClientMessage message;
-
- synchronized (this)
- {
- message = buffer.removeFirst();
- }
-
- if (message == null)
- {
- break;
- }
-
+ if (closed)
+ {
+ return;
+ }
+
+ //We pull the message from the buffer from inside the Runnable so we can ensure priority
+ //ordering. If we just added a Runnable with the message to the executor immediately as we get it
+ //we could not do that
+
+ ClientMessage message;
+
+ synchronized (this)
+ {
+ message = buffer.removeFirst();
+ }
+
+ if (message != null)
+ {
boolean expired = message.isExpired();
session.delivered(message.getDeliveryID(), expired);
@@ -486,8 +483,7 @@
handler.onMessage(message);
}
- }
- while (!singleMessage);
+ }
}
catch (MessagingException e)
{
More information about the jboss-cvs-commits
mailing list