[jboss-cvs] JBoss Messaging SVN: r2507 - in trunk: src/main/org/jboss/jms/client/container and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 28 18:46:51 EST 2007


Author: timfox
Date: 2007-02-28 18:46:51 -0500 (Wed, 28 Feb 2007)
New Revision: 2507

Added:
   trunk/src/main/org/jboss/jms/wireformat/ClosingResponse.java
Removed:
   trunk/src/main/org/jboss/jms/wireformat/ConsumerCancelInflightMessagesRequest.java
Modified:
   trunk/src/main/org/jboss/jms/client/Closeable.java
   trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   trunk/src/main/org/jboss/jms/server/remoting/DirectThreadPool.java
   trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionStartRequest.java
   trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/QueueTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
   trunk/tests/src/org/jboss/test/thirdparty/remoting/RemotingConnectionFailureTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-899


Modified: trunk/src/main/org/jboss/jms/client/Closeable.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/Closeable.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/Closeable.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -46,5 +46,5 @@
     * 
     * @throws JMSException
     */
-   void closing() throws JMSException;
+   long closing() throws JMSException;
 }

Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -135,7 +135,7 @@
       {         
          if (checkClosingAlreadyDone())
          {
-            return null;
+            return new Long(-1);
          }
       }
       else if (isClose)

Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -115,24 +115,18 @@
    public Object handleClosing(Invocation invocation) throws Throwable
    {      
       ConsumerState consumerState = getState(invocation);
-        
-      // First we call close on the messagecallbackhandler which waits for onMessage invocations      
-      // to complete any further messages received will be ignored
-      consumerState.getMessageCallbackHandler().close();
-                        
-      // Then we make sure closing is called on the ServerConsumerEndpoint.
-
-      Object res = invocation.invokeNext();
+                       
+      // We make sure closing is called on the ServerConsumerEndpoint.
+      // This returns us the last delivery id sent
       
-      //Now we send a message to the server consumer with the last delivery id so
-      //it can cancel any inflight messages after that
-      //This needs to be done *after* the call to closing has been executed on the server
-      //maybe it can be combined with closing
+      Long l  = (Long)invocation.invokeNext();
       
-      ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
+      long lastDeliveryId = l.longValue();
       
-      long lastDeliveryId = consumerState.getMessageCallbackHandler().getLastDeliveryId();
-      
+      // First we call close on the messagecallbackhandler which waits for onMessage invocations      
+      // to complete and the last delivery to arrive
+      consumerState.getMessageCallbackHandler().close(lastDeliveryId);
+                
       SessionState sessionState = (SessionState)consumerState.getParent();
       ConnectionState connectionState = (ConnectionState)sessionState.getParent();
                  
@@ -141,15 +135,10 @@
       CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
       cm.unregisterHandler(consumerState.getConsumerID());
          
-      //Now we need to cancel any inflight messages - this must be done before
-      //cancelling the message callback handler buffer, so that messages end up back in the channel
-      //in the right order
-      del.cancelInflightMessages(lastDeliveryId);
-      
       //And then we cancel any messages still in the message callback handler buffer     
       consumerState.getMessageCallbackHandler().cancelBuffer();
                                    
-      return res;
+      return l;
    }      
    
    public Object handleReceive(Invocation invocation) throws Throwable

Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -134,8 +134,7 @@
          // already be cancelled after failover.
 
          if (methodName.equals("cancelDelivery") ||
-            methodName.equals("cancelDeliveries") ||
-            methodName.equals("cancelInflightMessages"))
+            methodName.equals("cancelDeliveries"))
          {
             log.debug(this + " NOT resuming " + methodName + "(), let it wither and die");
             

Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -331,7 +331,7 @@
    
    public Object handleClosing(Invocation invocation) throws Throwable
    {
-      return null;
+      return new Long(-1);
    }
    
    public Object handleClose(Invocation invocation) throws Throwable

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -103,11 +103,11 @@
       doInvoke(client, req);
    }
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       RequestSupport req = new ClosingRequest(id, version);
 
-      doInvoke(client, req);
+      return ((Long)doInvoke(client, req)).longValue();
    }
 
    // BrowserDelegate implementation ---------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -151,11 +151,11 @@
       doInvoke(client, req);
    }
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       RequestSupport req = new ClosingRequest(id, version);
 
-      doInvoke(client, req);
+      return ((Long)doInvoke(client, req)).longValue();
    }
 
    // ConnectionDelegate implementation ------------------------------------------------------------
@@ -239,7 +239,7 @@
    {
       RequestSupport req = new ConnectionStartRequest(id, version);
 
-      doInvokeOneway(client, req);
+      doInvoke(client, req);
    }
 
    public void stop() throws JMSException

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -34,7 +34,6 @@
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.wireformat.CloseRequest;
 import org.jboss.jms.wireformat.ClosingRequest;
-import org.jboss.jms.wireformat.ConsumerCancelInflightMessagesRequest;
 import org.jboss.jms.wireformat.ConsumerChangeRateRequest;
 import org.jboss.jms.wireformat.RequestSupport;
 import org.jboss.logging.Logger;
@@ -117,22 +116,15 @@
       doInvoke(client, req);
    }
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       RequestSupport req = new ClosingRequest(id, version);
 
-      doInvoke(client, req);
+      return ((Long)doInvoke(client, req)).longValue();
    }
 
    // ConsumerDelegate implementation --------------------------------------------------------------
 
-   public void cancelInflightMessages(long lastDeliveryId) throws JMSException
-   {
-      RequestSupport req = new ConsumerCancelInflightMessagesRequest(id, version, lastDeliveryId);
-
-      doInvoke(client, req);
-   }
-
    public void changeRate(float newRate) throws JMSException
    {
       RequestSupport req = new ConsumerChangeRateRequest(id, version, newRate);

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -85,7 +85,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -142,11 +142,11 @@
       doInvoke(client, req);
    }
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       RequestSupport req = new ClosingRequest(id, version);
 
-      doInvoke(client, req);
+      return ((Long)doInvoke(client, req)).longValue();
    }
 
    // SessionDelegate implementation ---------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -96,11 +96,10 @@
 
          if (handler == null)
          {
-            // This is OK and can happen if the callback handler is deregistered on consumer close,
-            // but there are messages still in transit which arrive later. In this case it is just
-            // safe to ignore the message.
+            // This should never happen since we wait for all deliveries to arrive before closing
+            // the consumer
 
-            if (trace) { log.trace(this + " callback handler not found, message arrived after consumer is closed"); }
+            log.warn(this + " callback handler not found, message arrived after consumer is closed. Cancelling it bacdk to queue");
             
             return;
          }

Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -332,6 +332,8 @@
       // cleaning up the callback listener
 
       client.setDisconnectTimeout(0);
+      
+      client.disconnect();
 
       try
       {

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -61,6 +61,9 @@
    
    private static boolean trace;      
    
+   private static final int WAIT_TIMEOUT = 30000;
+   
+   
    static
    {
       log = Logger.getLogger(MessageCallbackHandler.class);
@@ -109,7 +112,7 @@
          return false;
       }
    }
-     
+        
    //This is static so it can be called by the asf layer too
    public static void callOnMessage(SessionDelegate sess,
                                     MessageListener listener,
@@ -200,8 +203,9 @@
    private String queueName;
    private long lastDeliveryId = -1;
    private volatile boolean serverSending = true;
+   private boolean waitingForLastDelivery;
+        
    
-        
    // Constructors ---------------------------------------------------------------------------------
 
    public MessageCallbackHandler(boolean isCC, int ackMode,                                
@@ -306,10 +310,10 @@
             {
                MessageProxy mp = (MessageProxy)i.next();
                
-               DefaultCancel ack =
+               DefaultCancel cancel =
                   new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount(), false, false);
                
-               cancels.add(ack);
+               cancels.add(cancel);
             }
                   
             sessionDelegate.cancelDeliveries(cancels);
@@ -319,8 +323,12 @@
       }
    }
    
-   public void close() throws JMSException
-   {   
+   public void close(long lastDeliveryId) throws JMSException
+   {     
+      waitForLastDelivery(lastDeliveryId);
+      
+      waitForOnMessageToComplete();   
+      
       synchronized (mainLock)
       {
          log.debug(this + " closing");
@@ -340,9 +348,7 @@
          
          this.listener = null;
       }
-
-      waitForOnMessageToComplete();                  
-      
+                           
       if (trace) { log.trace(this + " closed"); }
    }
      
@@ -521,20 +527,54 @@
       serverSending = true;      
    }
    
-   public long getLastDeliveryId()
-   {
-      synchronized (mainLock)
-      {
-         return lastDeliveryId;
-      }
-   }
-     
    // Package protected ----------------------------------------------------------------------------
    
    // Protected ------------------------------------------------------------------------------------
             
    // Private --------------------------------------------------------------------------------------
 
+   /*
+    * Wait for the last delivery to arrive
+    */
+   private void waitForLastDelivery(long id)
+   {
+      if (trace) { log.trace("Waiting for last delivery id " + id); }
+      
+      synchronized (mainLock)
+      {          
+         waitingForLastDelivery = true;
+         try
+         {
+            long wait = WAIT_TIMEOUT;
+            while (lastDeliveryId != id && wait > 0)
+            {
+               long start = System.currentTimeMillis();  
+               try
+               {
+                  mainLock.wait(wait);
+               }
+               catch (InterruptedException e)
+               {               
+               }
+               wait -= (System.currentTimeMillis() - start);
+            }      
+            if (trace && lastDeliveryId == id)
+            {
+               log.trace("Got last delivery");
+            }
+             
+            if (lastDeliveryId != id)
+            {
+               log.warn("Timed out waiting for last delivery"); 
+            }
+         }
+         finally
+         {
+            waitingForLastDelivery = false;
+         }
+      }
+   }
+   
    private void handleMessageInternal(Object message) throws Exception
    {
       MessageProxy proxy = (MessageProxy) message;
@@ -545,8 +585,10 @@
       {
          if (closed)
          {
-            // Ignore
-            if (trace) { log.trace(this + " is closed, so ignore message"); }
+            // This should never happen - we should always wait for all deliveries to arrive
+            // when closing
+            log.warn(this + " is closed, so ignoring message");
+            
             return;
          }
 
@@ -556,7 +598,7 @@
          buffer.addLast(proxy, proxy.getJMSPriority());
 
          lastDeliveryId = proxy.getDeliveryId();
-
+         
          if (trace) { log.trace(this + " added message(s) to the buffer"); }
 
          messageAdded();
@@ -654,11 +696,16 @@
    
    private void messageAdded()
    {
+      boolean notified = false;
+      
       // If we have a thread waiting on receive() we notify it
       if (receiverThread != null)
       {
-         if (trace) { log.trace(this + " notifying receiver thread"); }            
-         mainLock.notify();
+         if (trace) { log.trace(this + " notifying receiver/waiter thread"); }   
+         
+         mainLock.notifyAll();
+         
+         notified = true;
       }     
       else if (listener != null)
       { 
@@ -673,6 +720,12 @@
          
          //TODO - Execute onMessage on same thread for even better throughput 
       }
+      
+      // Make sure we notify any thread waiting for last delivery
+      if (waitingForLastDelivery && !notified)
+      {
+         mainLock.notifyAll();
+      }
    }
    
    private long waitOnLock(Object lock, long waitTime) throws InterruptedException

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -42,12 +42,4 @@
     * Sent to the server to specify a new maximum rate at which to send messages at
     */
    void changeRate(float newRate) throws JMSException;
-   
-   
-   /**
-    * Cancels any deliveries with a delivery id > lastDeliveryId - these are inflight
-    * @param lastDeliveryId
-    */
-   void cancelInflightMessages(long lastDeliveryId) throws JMSException;
-
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -206,9 +206,10 @@
       }
    }
          
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       // Do nothing
+      return -1;
    }
    
    // Public ---------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -424,9 +424,11 @@
       } 
    }
    
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       log.trace(this + " closing (noop)");    
+      
+      return -1;
    }
 
    public void sendTransaction(TransactionRequest request,

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -58,15 +58,13 @@
  * 
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt> $Id: ServerConsumerEndpoint.java 2399
- *          2007-02-23 01:21:29Z ovidiu.feodorov at jboss.com $
+ * @version <tt>$Revision$</tt> $Id$
  */
 public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint
 {
    // Constants ------------------------------------------------------------------------------------
 
-   private static final Logger log = Logger
-            .getLogger(ServerConsumerEndpoint.class);
+   private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
 
    // Static ---------------------------------------------------------------------------------------
 
@@ -106,6 +104,8 @@
 
    private boolean storeDeliveries;
    
+   private long lastDeliveryID = -1;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
@@ -150,11 +150,13 @@
          // This is a consumer of a non durable topic subscription. We don't need to store
          // deliveries since if the consumer is closed or dies the refs go too.
          this.storeDeliveries = false;
-      } else
+      }
+      else
       {
          this.storeDeliveries = true;
       }
 
+      //For now always true - revisit later
       storeDeliveries = true;
 
       if (selector != null)
@@ -166,7 +168,7 @@
       }
 
       this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
-
+      
       // adding the consumer to the queue
       this.messageQueue.add(this);
 
@@ -204,7 +206,8 @@
          try
          {
             sessionEndpoint.expireDelivery(delivery, expiryQueue);
-         } catch (Throwable t)
+         }
+         catch (Throwable t)
          {
             log.error("Failed to expire delivery: " + delivery, t);
          }
@@ -222,7 +225,7 @@
 
             return null;
          }
-
+         
          if (trace) { log.trace(this + " has startStopLock lock, preparing the message for delivery"); }
 
          Message message = ref.getMessage();
@@ -241,9 +244,9 @@
 
          if (storeDeliveries)
          {
-            deliveryId = sessionEndpoint.addDelivery(delivery, id, dlq,
-                     expiryQueue, redeliveryDelay);
-         } else
+            deliveryId = sessionEndpoint.addDelivery(delivery, id, dlq, expiryQueue, redeliveryDelay);
+         }
+         else
          {
             deliveryId = -1;
          }
@@ -273,20 +276,28 @@
             if (callbackClient != null)
             {
                invoker = callbackClient.getInvoker();
-            } else
+                              
+            }
+            else
             {
                // TODO: dummy synchronization object, in case there's no clientInvoker. This will
                // happen during the first invocation anyway. It's a kludge, I know, but this whole
                // synchronization thing is a huge kludge. Needs to be reviewed.
                invoker = new Object();
             }
-
+            
             synchronized (invoker)
             {
                // one way invocation, no acknowledgment sent back by the client
-               if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }               callbackHandler.handleCallbackOneway(callback);
+               if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }
+               
+               callbackHandler.handleCallbackOneway(callback);
+               
+               //We store the delivery id so we know to wait for any deliveries in transit on close
+               this.lastDeliveryID = deliveryId;
             }
-         } catch (HandleCallbackException e)
+         }
+         catch (HandleCallbackException e)
          {
             // it's an oneway callback, so exception could only have happened on the server, while
             // trying to send the callback. This is a good reason to smack the whole connection.
@@ -345,13 +356,15 @@
 
    // Closeable implementation ---------------------------------------------------------------------
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       try
       {
          if (trace) { log.trace(this + " closing");}
 
          stop();
+         
+         return lastDeliveryID;
       }
       catch (Throwable t)
       {
@@ -422,27 +435,6 @@
       }
    }
 
-   /**
-    * This method is always called between closing() and close() being called. Instead of having a
-    * new method we could perhaps somehow pass the last delivery id in with closing - then we don't
-    * need another message.
-    */
-   public void cancelInflightMessages(long lastDeliveryId) throws JMSException
-   {
-      if (trace) { log.trace(this + " cancelInflightMessages: " + lastDeliveryId); }
-
-      try
-      {
-         // Cancel all deliveries made by this consumer with delivery id > lastDeliveryId
-
-         sessionEndpoint.cancelDeliveriesForConsumerAfterDeliveryId(id, lastDeliveryId);
-      }
-      catch (Throwable t)
-      {
-         throw ExceptionUtil.handleJMSInvocation(t, this + " cancelInflightMessages");
-      }
-   }
-
    // Public ---------------------------------------------------------------------------------------
 
    public String toString()
@@ -552,7 +544,7 @@
          }
 
          started = false;
-
+         
          // Any message deliveries already transit to the consumer, will just be ignored by the
          // MessageCallbackHandler since it will be closed.
          //
@@ -564,8 +556,7 @@
          // 2) ServerConsumerEndpoint::closing() causes stop() this flushes any deliveries yet to
          // deliver to the client callback handler.
          //
-         // 3) MessageCallbackHandler::cancelInflightMessages(long lastDeliveryId) - any deliveries
-         // after lastDeliveryId for the consumer will be considered in flight and cancelled.
+         // 3) MessageCallbackHandler waits for all deliveries to arrive at client side
          //
          // 4) ServerConsumerEndpoint:close() - endpoint is deregistered.
          //

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -29,7 +29,6 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -302,10 +301,12 @@
       }
    }
       
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
       // currently does nothing
       if (trace) log.trace(this + " closing (noop)");
+      
+      return -1;
    }
  
    public void send(JBossMessage message, boolean checkForDuplicates) throws JMSException
@@ -747,41 +748,6 @@
       }
    }
       
-   void cancelDeliveriesForConsumerAfterDeliveryId(int consumerId, long lastDeliveryId)
-      throws Throwable
-   {
-      //Need to cancel in reverse
-      
-      LinkedList toCancel = new LinkedList();
-      
-      Iterator iter = deliveries.entrySet().iterator();
-            
-      while (iter.hasNext())
-      {
-         Map.Entry entry = (Map.Entry)iter.next();
-         
-         Long deliveryId = (Long)entry.getKey();
-         
-         DeliveryRecord record = (DeliveryRecord)entry.getValue();
-         
-         if (record.consumerId == consumerId && deliveryId.longValue() > lastDeliveryId)
-         {
-            iter.remove();
-            
-            toCancel.addFirst(record);
-         }
-      }
-      
-      iter = toCancel.iterator();
-      
-      while (iter.hasNext())
-      {
-         DeliveryRecord record = (DeliveryRecord)iter.next();
-         
-         record.del.cancel();
-      }
-   }
-   
    void removeBrowser(int browserId) throws Exception
    {
       synchronized (browsers)

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -60,9 +60,9 @@
       endpoint.close();
    }
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
-      endpoint.closing();
+      return endpoint.closing();
    }
 
    public void reset() throws JMSException

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -63,9 +63,9 @@
       endpoint.close();
    }
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
-      endpoint.closing();
+      return endpoint.closing();
    }
 
    public SessionDelegate createSessionDelegate(boolean transacted,

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -59,9 +59,9 @@
       endpoint.close();
    }
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
-      endpoint.closing();
+      return endpoint.closing();
    }
 
    public void changeRate(float newRate) throws JMSException
@@ -69,11 +69,6 @@
       endpoint.changeRate(newRate);
    }
    
-   public void cancelInflightMessages(long lastDeliveryId) throws JMSException
-   {
-      endpoint.cancelInflightMessages(lastDeliveryId);
-   }
-
    // AdvisedSupport overrides --------------------------------------
 
    public Object getEndpoint()

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -70,9 +70,9 @@
       endpoint.close();
    }
 
-   public void closing() throws JMSException
+   public long closing() throws JMSException
    {
-      endpoint.closing();
+      return endpoint.closing();
    }
 
    public void send(JBossMessage msg, boolean checkForDuplicates) throws JMSException

Modified: trunk/src/main/org/jboss/jms/server/remoting/DirectThreadPool.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/DirectThreadPool.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/remoting/DirectThreadPool.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -6,6 +6,8 @@
  */
 package org.jboss.jms.server.remoting;
 
+import org.jboss.jms.server.endpoint.ServerConsumerEndpoint;
+import org.jboss.logging.Logger;
 import org.jboss.util.threadpool.ThreadPool;
 import org.jboss.util.threadpool.TaskWrapper;
 import org.jboss.util.threadpool.Task;
@@ -22,6 +24,8 @@
 {
    // Constants ------------------------------------------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(DirectThreadPool.class);
+   
    // Static ---------------------------------------------------------------------------------------
 
    // Attributes -----------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -59,8 +59,9 @@
          throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
       }
       
-      endpoint.closing();
-      return null;
+      long id = endpoint.closing();
+      
+      return new ClosingResponse(id);
    }
 
    public void write(DataOutputStream os) throws Exception

Added: trunk/src/main/org/jboss/jms/wireformat/ClosingResponse.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ClosingResponse.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ClosingResponse.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.wireformat;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+
+public class ClosingResponse extends ResponseSupport
+{
+   private long id;
+   
+   public ClosingResponse()
+   {      
+   }
+   
+   public ClosingResponse(long id)
+   {
+      super(PacketSupport.RESP_CLOSING);
+      
+      this.id = id;
+   }
+
+   public Object getResponse()
+   {
+      return new Long(id);
+   }
+   
+   public void write(DataOutputStream os) throws Exception
+   {
+      super.write(os);
+      
+      os.writeLong(id);
+      
+      os.flush();
+   }
+   
+   public void read(DataInputStream is) throws Exception
+   {
+      id = is.readLong();
+   }
+
+}
+

Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionStartRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionStartRequest.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionStartRequest.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -80,16 +80,5 @@
       os.flush();
    }
    
-   public Object getPayload()
-   {
-      OnewayInvocation oi = new OnewayInvocation(this);
-
-      InvocationRequest request =
-         new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
-                               oi, ONE_WAY_METADATA, null, null);
-      
-      return request;     
-   }
-
 }
 

Deleted: trunk/src/main/org/jboss/jms/wireformat/ConsumerCancelInflightMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConsumerCancelInflightMessagesRequest.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/wireformat/ConsumerCancelInflightMessagesRequest.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -1,91 +0,0 @@
-
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.wireformat;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.jboss.jms.server.endpoint.ConsumerEndpoint;
-
-/**
- * 
- * A ConsumerCancelInflightMessagesRequest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class ConsumerCancelInflightMessagesRequest extends RequestSupport
-{
-   private long lastDeliveryId;
-   
-   public ConsumerCancelInflightMessagesRequest()
-   {      
-   }
-   
-   public ConsumerCancelInflightMessagesRequest(int objectId,
-                                    byte version,
-                                    long lastDeliveryId)
-   {
-      super(objectId, PacketSupport.REQ_CONSUMER_CANCELINFLIGHTMESSAGES, version);
-      
-      this.lastDeliveryId = lastDeliveryId;
-   }
-
-   public void read(DataInputStream is) throws Exception
-   {
-      super.read(is);
-      
-      lastDeliveryId = is.readLong();
-   }
-
-   public ResponseSupport serverInvoke() throws Exception
-   {
-      ConsumerEndpoint endpoint = 
-         (ConsumerEndpoint)Dispatcher.instance.getTarget(objectId);
-      
-      if (endpoint == null)
-      {
-         throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
-      }
-      
-      endpoint.cancelInflightMessages(lastDeliveryId);
-      
-      return null;
-   }
-
-   public void write(DataOutputStream os) throws Exception
-   {
-      super.write(os);
-      
-      os.writeLong(lastDeliveryId);
-      
-      os.flush();
-   }
-
-}
-
-
-

Modified: trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -110,7 +110,6 @@
    // --------
    
    public static final int REQ_CONSUMER_CHANGERATE = 401;
-   public static final int REQ_CONSUMER_CANCELINFLIGHTMESSAGES = 402;
    
    // Browser
    // -------
@@ -160,6 +159,8 @@
    public static final int RESP_BROWSER_NEXTMESSAGE = 100500;   
    public static final int RESP_BROWSER_HASNEXTMESSAGE = 100501;   
    public static final int RESP_BROWSER_NEXTMESSAGEBLOCK = 100502;
+   
+   public static final int RESP_CLOSING = 100601;
   
    
    public static PacketSupport createPacket(int id)
@@ -262,11 +263,7 @@
             break;  
             
          // Consumer
-
-         case REQ_CONSUMER_CANCELINFLIGHTMESSAGES:
-            packet = new ConsumerCancelInflightMessagesRequest();
-            break;
-                        
+    
          // Browser   
             
          case REQ_BROWSER_NEXTMESSAGE:
@@ -343,7 +340,10 @@
             packet = new BrowserNextMessageBlockResponse();
             break;     
             
-            
+         case RESP_CLOSING:
+            packet = new ClosingResponse();
+            break;
+                        
          case SERIALIZED:
             packet = new SerializedPacket();
             break;            

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -3361,31 +3361,31 @@
          TextMessage tm = (TextMessage)m;
          count++;
 
-         log.trace("Got message:" + count);
-
+         log.info(this + " Got message:" + count);
+         
          try
          {
-            log.trace("message:" + tm.getText());
+            log.info(this + " message:" + tm.getText());
             if (count == 1)
             {
                if (!("a".equals(tm.getText())))
                {
-                  log.trace("Should be a but was " + tm.getText());
+                  log.info("Should be a but was " + tm.getText());
                   failed = true;
                   latch.release();
                }
-               log.trace("Throwing exception");
+               log.info("Throwing exception");
                throw new RuntimeException("Aardvark");
             }
             else if (count == 2)
             {
-               log.trace("ack mode:" + sess.getAcknowledgeMode());
+               log.info("ack mode:" + sess.getAcknowledgeMode());
                if (sess.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE || sess.getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE)
                {
                   //Message should be immediately redelivered
                   if (!("a".equals(tm.getText())))
                   {
-                     log.trace("Should be a but was " + tm.getText());
+                     log.info("Should be a but was " + tm.getText());
                      failed = true;
                      latch.release();
                   }
@@ -3400,7 +3400,7 @@
                   //Transacted or CLIENT_ACKNOWLEDGE - next message should be delivered
                   if (!("b".equals(tm.getText())))
                   {
-                     log.trace("Should be b but was " + tm.getText());
+                     log.info("Should be b but was " + tm.getText());
                      failed = true;
                      latch.release();
                   }
@@ -3412,7 +3412,7 @@
                {
                   if (!("b".equals(tm.getText())))
                   {
-                     log.trace("Should be b but was " + tm.getText());
+                     log.info("Should be b but was " + tm.getText());
                      failed = true;
                      latch.release();
                   }
@@ -3421,7 +3421,7 @@
                {
                   if (!("c".equals(tm.getText())))
                   {
-                     log.trace("Should be c but was " + tm.getText());
+                     log.info("Should be c but was " + tm.getText());
                      failed = true;
                      latch.release();
                   }
@@ -3435,7 +3435,7 @@
                {
                   if (!("c".equals(tm.getText())))
                   {
-                     log.trace("Should be c but was " + tm.getText());
+                     log.info("Should be c but was " + tm.getText());
                      failed = true;
                      latch.release();
                   }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/QueueTest.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/messaging/jms/QueueTest.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -21,15 +21,19 @@
   */
 package org.jboss.test.messaging.jms;
 
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
-import javax.jms.Connection;
 import javax.jms.Session;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 import javax.naming.InitialContext;
 
+import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
@@ -122,54 +126,84 @@
    {
       Queue queue = (Queue)ic.lookup("/queue/TestQueue");
 
-      Connection conn1 = cf.createConnection();
+      // Maybe we could remove this counter after we are sure this test is fixed!
+      // I had to use a counter because this can work in some iterations.
+      for (int counter = 0; counter < 20; counter++)
+      {
+         log.info("Iteration = " + counter);
 
-      Connection conn2 = cf.createConnection();
+         Connection conn1 = cf.createConnection();
 
-      try
-      {
-         Session s = conn1.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         assertEquals(0, ((JBossConnection)conn1).getServerID());
 
-         MessageProducer p = s.createProducer(queue);
+         Connection conn2 = cf.createConnection();
 
-         for (int i = 0; i < 20; i++)
+         assertEquals(0, ((JBossConnection)conn2).getServerID());
+
+         try
          {
-            p.send(s.createTextMessage("message " + i));
-         }
+            Session s = conn1.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
-         s.commit();
+            MessageProducer p = s.createProducer(queue);
 
-         Session s2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            for (int i = 0; i < 20; i++)
+            {
+               p.send(s.createTextMessage("message " + i));
+            }
 
-         // Create a consumer, start the session, close the consumer..
-         // This shouldn't cause any message to be lost
-         MessageConsumer c2 = s2.createConsumer(queue);
-         conn2.start();
-         c2.close();
+            s.commit();
 
-         // open a new consumer
-         c2 = s2.createConsumer(queue);
+            Session s2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
-         for (int i = 0; i < 20; i++)
-         {
-            TextMessage txt = (TextMessage)c2.receive(5000);
-            assertNotNull(txt);
-            assertEquals("message " + i, txt.getText());
-         }
+            // these next three lines are an anti-pattern but they shouldn't loose any messages
+            MessageConsumer c2 = s2.createConsumer(queue);
+            conn2.start();
+            c2.close();
 
-         assertNull(c2.receive(1000));
+            c2 = s2.createConsumer(queue);
 
-         s2.commit();
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
+            //There is a possibility the messages arrive out of order if they hit the closed
+            //consumer and are cancelled back before delivery to the other consumer has finished.
+            //There is nothing much we can do about this
+            Set texts = new HashSet();
+            
+            for (int i = 0; i < 20; i++)
+            {
+               TextMessage txt = (TextMessage)c2.receive(5000);
+               assertNotNull(txt);
+               texts.add(txt.getText());               
+            }
+            
+            for (int i = 0; i < 20; i++)
+            {
+               assertTrue(texts.contains("message " + i));            
+            }
+            
+            // Ovidiu: the test was originally invalid, a locally transacted session that is closed 
+            //         rolls back its transaction. I added s2.commit() to correct the test.
+            // JMS 1.1 Specifications, Section 4.3.5:
+            // "Closing a connection must roll back the transactions in progress on its
+            // transacted sessions*.
+            // *) The term 'transacted session' refers to the case where a session's commit and
+            // rollback methods are used to demarcate a transaction local to the session. In the
+            // case where a session's work is coordinated by an external transaction manager, a
+            // session's commit and rollback methods are not used and the result of a closed
+            // session's work is determined later by the transaction manager.
+
+            s2.commit();
+
+            assertNull(c2.receive(1000));
          }
-         if (conn2 != null)
+         finally
          {
-            conn2.close();
+            if (conn1 != null)
+            {
+               conn1.close();
+            }
+            if (conn2 != null)
+            {
+               conn2.close();
+            }
          }
       }
    }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -54,23 +54,23 @@
 import org.jboss.jms.wireformat.BrowserNextMessageResponse;
 import org.jboss.jms.wireformat.CloseRequest;
 import org.jboss.jms.wireformat.ClosingRequest;
+import org.jboss.jms.wireformat.ClosingResponse;
 import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest;
 import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateResponse;
 import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
 import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateResponse;
 import org.jboss.jms.wireformat.ConnectionFactoryGetClientAOPStackRequest;
 import org.jboss.jms.wireformat.ConnectionFactoryGetClientAOPStackResponse;
-import org.jboss.jms.wireformat.ConnectionGetIDBlockRequest;
-import org.jboss.jms.wireformat.ConnectionGetIDBlockResponse;
 import org.jboss.jms.wireformat.ConnectionGetClientIDRequest;
 import org.jboss.jms.wireformat.ConnectionGetClientIDResponse;
+import org.jboss.jms.wireformat.ConnectionGetIDBlockRequest;
+import org.jboss.jms.wireformat.ConnectionGetIDBlockResponse;
 import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsRequest;
 import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsResponse;
 import org.jboss.jms.wireformat.ConnectionSendTransactionRequest;
 import org.jboss.jms.wireformat.ConnectionSetClientIDRequest;
 import org.jboss.jms.wireformat.ConnectionStartRequest;
 import org.jboss.jms.wireformat.ConnectionStopRequest;
-import org.jboss.jms.wireformat.ConsumerCancelInflightMessagesRequest;
 import org.jboss.jms.wireformat.ConsumerChangeRateRequest;
 import org.jboss.jms.wireformat.NullResponse;
 import org.jboss.jms.wireformat.PacketSupport;
@@ -275,11 +275,6 @@
       wf.testConsumerChangeRateRequest();
    }
    
-   public void testConsumerCancelInflightMessagesRequest() throws Exception
-   {                       
-      wf.testConsumerCancelInflightMessagesRequest();
-   }
-   
    // Browser
    
    public void testBrowserNextMessageRequest() throws Exception
@@ -392,7 +387,12 @@
       wf.testNullResponse();
    }
    
+   public void testClosingResponse() throws Exception
+   {                 
+      wf.testClosingResponse();
+   }
    
+   
    //We just check the first byte to make sure serialization is not be used.
    
    private class TestWireFormat extends JMSWireFormat
@@ -665,14 +665,6 @@
          testPacket(req, PacketSupport.REQ_CONSUMER_CHANGERATE);                           
       }
       
-      public void testConsumerCancelInflightMessagesRequest() throws Exception
-      { 
-         RequestSupport req =
-            new ConsumerCancelInflightMessagesRequest(23, (byte)77, 123);
-                 
-         testPacket(req, PacketSupport.REQ_CONSUMER_CANCELINFLIGHTMESSAGES);                           
-      }
-      
       // Browser
       
       public void testBrowserNextMessageRequest() throws Exception
@@ -859,6 +851,13 @@
          testPacket(resp, PacketSupport.NULL_RESPONSE);                           
       }
       
+      public void testClosingResponse() throws Exception
+      { 
+         ResponseSupport resp =  new ClosingResponse(23);
+                 
+         testPacket(resp, PacketSupport.RESP_CLOSING);                           
+      }
+      
    }
    
    public static class SerializableObject implements Serializable

Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -319,9 +319,9 @@
          closed = true;
       }
 
-      public void closing() throws JMSException
+      public long closing() throws JMSException
       {
-         
+         return -1;
       }
 
       public IDBlock getIdBlock(int size) throws JMSException

Modified: trunk/tests/src/org/jboss/test/thirdparty/remoting/RemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/thirdparty/remoting/RemotingConnectionFailureTest.java	2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/thirdparty/remoting/RemotingConnectionFailureTest.java	2007-02-28 23:46:51 UTC (rev 2507)
@@ -243,7 +243,7 @@
 
       client.setDisconnectTimeout(0);
       client.disconnect();
-
+      
       // the client should be "dead", in that both the connection validator and the lease pinger
       // are silenced
 




More information about the jboss-cvs-commits mailing list