[jboss-cvs] JBoss Messaging SVN: r1854 - in trunk/src/main/org/jboss: jms/client/container jms/client/remoting jms/server/endpoint messaging/core

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Dec 23 09:48:48 EST 2006


Author: timfox
Date: 2006-12-23 09:48:44 -0500 (Sat, 23 Dec 2006)
New Revision: 1854

Modified:
   trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
Log:
Fixed cancellation order



Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-12-23 08:43:42 UTC (rev 1853)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-12-23 14:48:44 UTC (rev 1854)
@@ -101,10 +101,10 @@
    public Object handleClosing(Invocation invocation) throws Throwable
    {      
       ConsumerState consumerState = getState(invocation);
+        
       
       // First we call close on the messagecallbackhandler which waits for onMessage invocations      
-      // to complete and then cancels anything in the client buffer.
-      // any further messages received will be ignored
+      // to complete any further messages received will be ignored
       consumerState.getMessageCallbackHandler().close();
       
       long lastDeliveryId = consumerState.getMessageCallbackHandler().getLastDeliveryId();
@@ -128,7 +128,13 @@
       
       ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
          
+      //Now we need to cancel any inflight messages - this must be done before
+      //cancelling the message callback handler buffer, so that messages end up back in the channel
+      //in the right order
       del.cancelInflightMessages(lastDeliveryId);
+      
+      //And then we cancel any messages still in the message callback handler buffer
+      consumerState.getMessageCallbackHandler().cancelBuffer();
                                    
       return res;
    }      

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-23 08:43:42 UTC (rev 1853)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-23 14:48:44 UTC (rev 1854)
@@ -269,6 +269,43 @@
       }   
    }
    
+   public void cancelBuffer() throws JMSException
+   {
+      synchronized (mainLock)
+      {      
+         // Now we cancel anything left in the buffer. The reason we do this now is that otherwise the
+         // deliveries wouldn't get cancelled until session close (since we don't cancel consumer's
+         // deliveries until then), which is too late - since we need to preserve the order of messages
+         // delivered in a session.
+         
+         if (!buffer.isEmpty())
+         {            
+            // Now we cancel any deliveries that might be waiting in our buffer. This is because
+            // otherwise the messages wouldn't get cancelled until the corresponding session died.
+            // So if another consumer in another session tried to consume from the channel before that
+            // session died it wouldn't receive those messages.
+            // We can't just cancel all the messages in the SCE since some of those messages might
+            // have actually been delivered (unlike these) and we may want to acknowledge them
+            // later, after this consumer has been closed
+   
+            List cancels = new ArrayList();
+   
+            for(Iterator i = buffer.iterator(); i.hasNext();)
+            {
+               MessageProxy mp = (MessageProxy)i.next();
+               
+               DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
+               
+               cancels.add(ack);
+            }
+                  
+            sessionDelegate.cancelDeliveries(cancels);
+            
+            buffer.clear();
+         }    
+      }
+   }
+   
    public void close() throws JMSException
    {   
       synchronized (mainLock)
@@ -291,39 +328,8 @@
          this.listener = null;
       }
 
-      waitForOnMessageToComplete();
+      waitForOnMessageToComplete();                  
       
-      // Now we cancel anything left in the buffer. The reason we do this now is that otherwise the
-      // deliveries wouldn't get cancelled until session close (since we don't cancel consumer's
-      // deliveries until then), which is too late - since we need to preserve the order of messages
-      // delivered in a session.
-      
-      if (!buffer.isEmpty())
-      {            
-         // Now we cancel any deliveries that might be waiting in our buffer. This is because
-         // otherwise the messages wouldn't get cancelled until the corresponding session died.
-         // So if another consumer in another session tried to consume from the channel before that
-         // session died it wouldn't receive those messages.
-         // We can't just cancel all the messages in the SCE since some of those messages might
-         // have actually been delivered (unlike these) and we may want to acknowledge them
-         // later, after this consumer has been closed
-
-         List cancels = new ArrayList();
-
-         for(Iterator i = buffer.iterator(); i.hasNext();)
-         {
-            MessageProxy mp = (MessageProxy)i.next();
-            
-            DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
-            
-            cancels.add(ack);
-         }
-               
-         sessionDelegate.cancelDeliveries(cancels);
-         
-         buffer.clear();
-      }                
-      
       if (trace) { log.trace(this + " closed"); }
    }
      
@@ -665,7 +671,7 @@
          m = (MessageProxy)buffer.removeFirst();
       }
 
-      if (trace) { log.trace("InterruptedException, " + this + ".getMessage() returning " + m); }
+      if (trace) { log.trace(this + ".getMessage() returning " + m); }
       return m;
    }
    

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-23 08:43:42 UTC (rev 1853)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-23 14:48:44 UTC (rev 1854)
@@ -47,8 +47,8 @@
 import org.jboss.remoting.callback.HandleCallbackException;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
 
 /**
  * Concrete implementation of ConsumerEndpoint.
@@ -102,9 +102,7 @@
 
    // Must be volatile
    private volatile boolean clientAccepting;
-   
-   private Executor executor; /// TEMPORARILY
-   
+
    // Constructors --------------------------------------------------
 
    ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
@@ -130,9 +128,6 @@
       
       this.destination = dest;
       
-      //TEMP
-      this.executor = new QueuedExecutor();
-      
       //Always start as false - wait for consumer to initiate
       this.clientAccepting = false;
       
@@ -223,43 +218,18 @@
          MessagingMarshallable mm = new MessagingMarshallable(versionToUse, del);
          
          Callback callback = new Callback(mm);
-   
-         //FIXME - we need to use the asynch callback API, this is the Sync one
-
-         //This is temporary - to ensure deliveries happen in sequence!!!!!!!!
-         
-         class Runner implements Runnable
-         {
-            Callback cb;
-            
-            Runner(Callback cb)
-            {
-               this.cb = cb;
-            }
-         
-            public void run()
-            {
-               try
-               {            
-                  callbackHandler.handleCallback(cb);               
-               }
-               catch (HandleCallbackException e)
-               {
-                  log.error("Failed to handle callback", e);
-               }
-               
-            }  
-         }
-         
+           
          try
          {
-            executor.execute(new Runner(callback));
+            callbackHandler.handleCallback(callback);  
          }
-         catch (InterruptedException e)
+         catch (HandleCallbackException e)
          {
-            //Ignore
+            log.error("Failed to handle callback", e);
+            
+            return null;
          }
-           
+              
          return delivery;      
       }
    }      

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-23 08:43:42 UTC (rev 1853)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-23 14:48:44 UTC (rev 1854)
@@ -645,8 +645,6 @@
                      // state but before delivery being added (observed).
                      synchronized (del)
                      {
-                        if (trace) { log.trace(this + " incrementing delivery count for " + del); }
-
                         // FIXME - It's actually possible the delivery could be cancelled before it reaches
                         // here, in which case we wouldn't get a delivery but we still need to increment the
                         // delivery count. TODO http://jira.jboss.com/jira/browse/JBMESSAGING-355




More information about the jboss-cvs-commits mailing list