[jboss-cvs] JBoss Messaging SVN: r1854 - in trunk/src/main/org/jboss: jms/client/container jms/client/remoting jms/server/endpoint messaging/core
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Dec 23 09:48:48 EST 2006
Author: timfox
Date: 2006-12-23 09:48:44 -0500 (Sat, 23 Dec 2006)
New Revision: 1854
Modified:
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
Log:
Fixed cancellation order
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-12-23 08:43:42 UTC (rev 1853)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-12-23 14:48:44 UTC (rev 1854)
@@ -101,10 +101,10 @@
public Object handleClosing(Invocation invocation) throws Throwable
{
ConsumerState consumerState = getState(invocation);
+
// First we call close on the messagecallbackhandler which waits for onMessage invocations
- // to complete and then cancels anything in the client buffer.
- // any further messages received will be ignored
+ // to complete any further messages received will be ignored
consumerState.getMessageCallbackHandler().close();
long lastDeliveryId = consumerState.getMessageCallbackHandler().getLastDeliveryId();
@@ -128,7 +128,13 @@
ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
+ //Now we need to cancel any inflight messages - this must be done before
+ //cancelling the message callback handler buffer, so that messages end up back in the channel
+ //in the right order
del.cancelInflightMessages(lastDeliveryId);
+
+ //And then we cancel any messages still in the message callback handler buffer
+ consumerState.getMessageCallbackHandler().cancelBuffer();
return res;
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-23 08:43:42 UTC (rev 1853)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-23 14:48:44 UTC (rev 1854)
@@ -269,6 +269,43 @@
}
}
+ public void cancelBuffer() throws JMSException
+ {
+ synchronized (mainLock)
+ {
+ // Now we cancel anything left in the buffer. The reason we do this now is that otherwise the
+ // deliveries wouldn't get cancelled until session close (since we don't cancel consumer's
+ // deliveries until then), which is too late - since we need to preserve the order of messages
+ // delivered in a session.
+
+ if (!buffer.isEmpty())
+ {
+ // Now we cancel any deliveries that might be waiting in our buffer. This is because
+ // otherwise the messages wouldn't get cancelled until the corresponding session died.
+ // So if another consumer in another session tried to consume from the channel before that
+ // session died it wouldn't receive those messages.
+ // We can't just cancel all the messages in the SCE since some of those messages might
+ // have actually been delivered (unlike these) and we may want to acknowledge them
+ // later, after this consumer has been closed
+
+ List cancels = new ArrayList();
+
+ for(Iterator i = buffer.iterator(); i.hasNext();)
+ {
+ MessageProxy mp = (MessageProxy)i.next();
+
+ DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
+
+ cancels.add(ack);
+ }
+
+ sessionDelegate.cancelDeliveries(cancels);
+
+ buffer.clear();
+ }
+ }
+ }
+
public void close() throws JMSException
{
synchronized (mainLock)
@@ -291,39 +328,8 @@
this.listener = null;
}
- waitForOnMessageToComplete();
+ waitForOnMessageToComplete();
- // Now we cancel anything left in the buffer. The reason we do this now is that otherwise the
- // deliveries wouldn't get cancelled until session close (since we don't cancel consumer's
- // deliveries until then), which is too late - since we need to preserve the order of messages
- // delivered in a session.
-
- if (!buffer.isEmpty())
- {
- // Now we cancel any deliveries that might be waiting in our buffer. This is because
- // otherwise the messages wouldn't get cancelled until the corresponding session died.
- // So if another consumer in another session tried to consume from the channel before that
- // session died it wouldn't receive those messages.
- // We can't just cancel all the messages in the SCE since some of those messages might
- // have actually been delivered (unlike these) and we may want to acknowledge them
- // later, after this consumer has been closed
-
- List cancels = new ArrayList();
-
- for(Iterator i = buffer.iterator(); i.hasNext();)
- {
- MessageProxy mp = (MessageProxy)i.next();
-
- DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
-
- cancels.add(ack);
- }
-
- sessionDelegate.cancelDeliveries(cancels);
-
- buffer.clear();
- }
-
if (trace) { log.trace(this + " closed"); }
}
@@ -665,7 +671,7 @@
m = (MessageProxy)buffer.removeFirst();
}
- if (trace) { log.trace("InterruptedException, " + this + ".getMessage() returning " + m); }
+ if (trace) { log.trace(this + ".getMessage() returning " + m); }
return m;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-23 08:43:42 UTC (rev 1853)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-23 14:48:44 UTC (rev 1854)
@@ -47,8 +47,8 @@
import org.jboss.remoting.callback.HandleCallbackException;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
/**
* Concrete implementation of ConsumerEndpoint.
@@ -102,9 +102,7 @@
// Must be volatile
private volatile boolean clientAccepting;
-
- private Executor executor; /// TEMPORARILY
-
+
// Constructors --------------------------------------------------
ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
@@ -130,9 +128,6 @@
this.destination = dest;
- //TEMP
- this.executor = new QueuedExecutor();
-
//Always start as false - wait for consumer to initiate
this.clientAccepting = false;
@@ -223,43 +218,18 @@
MessagingMarshallable mm = new MessagingMarshallable(versionToUse, del);
Callback callback = new Callback(mm);
-
- //FIXME - we need to use the asynch callback API, this is the Sync one
-
- //This is temporary - to ensure deliveries happen in sequence!!!!!!!!
-
- class Runner implements Runnable
- {
- Callback cb;
-
- Runner(Callback cb)
- {
- this.cb = cb;
- }
-
- public void run()
- {
- try
- {
- callbackHandler.handleCallback(cb);
- }
- catch (HandleCallbackException e)
- {
- log.error("Failed to handle callback", e);
- }
-
- }
- }
-
+
try
{
- executor.execute(new Runner(callback));
+ callbackHandler.handleCallback(callback);
}
- catch (InterruptedException e)
+ catch (HandleCallbackException e)
{
- //Ignore
+ log.error("Failed to handle callback", e);
+
+ return null;
}
-
+
return delivery;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-23 08:43:42 UTC (rev 1853)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-23 14:48:44 UTC (rev 1854)
@@ -645,8 +645,6 @@
// state but before delivery being added (observed).
synchronized (del)
{
- if (trace) { log.trace(this + " incrementing delivery count for " + del); }
-
// FIXME - It's actually possible the delivery could be cancelled before it reaches
// here, in which case we wouldn't get a delivery but we still need to increment the
// delivery count. TODO http://jira.jboss.com/jira/browse/JBMESSAGING-355
More information about the jboss-cvs-commits
mailing list