[jboss-cvs] JBoss Messaging SVN: r1802 - in trunk: src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/jms/message src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx tests tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Dec 15 16:28:38 EST 2006


Author: timfox
Date: 2006-12-15 16:28:18 -0500 (Fri, 15 Dec 2006)
New Revision: 1802

Added:
   trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java
Removed:
   trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java
Modified:
   trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/container/AsfAspect.java
   trunk/src/main/org/jboss/jms/client/container/HAAspect.java
   trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
   trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
   trunk/src/main/org/jboss/jms/message/MessageProxy.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
Fixes for connection consumer



Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -304,7 +304,7 @@
                for (int i = 0; i < mesList.size(); i++)
                {
                   MessageProxy m = (MessageProxy)mesList.get(i);
-                  session.addAsfMessage(m, consumerID, channelID, maxDeliveries);
+                  session.addAsfMessage(m, consumerID, channelID, maxDeliveries, sess);
                   if (trace) { log.trace("added " + m + " to session"); }
                }
 

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -502,9 +502,10 @@
     * This method is used by the JBossConnectionConsumer to load up the session
     * with messages to be processed by the session's run() method
     */
-   void addAsfMessage(MessageProxy m, int consumerID, long channelID, int maxDeliveries)
+   void addAsfMessage(MessageProxy m, int consumerID, long channelID, int maxDeliveries,
+                      SessionDelegate connectionConsumerSession)
    {
-      delegate.addAsfMessage(m, consumerID, channelID, maxDeliveries);
+      delegate.addAsfMessage(m, consumerID, channelID, maxDeliveries, connectionConsumerSession);
    }
       
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/container/AsfAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/AsfAspect.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/container/AsfAspect.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -34,7 +34,6 @@
 import org.jboss.jms.client.remoting.MessageCallbackHandler;
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.delegate.ConnectionDelegate;
-import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.message.MessageProxy;
@@ -131,6 +130,7 @@
       int theConsumerID = ((Integer)mi.getArguments()[1]).intValue();
       long channelID = ((Long)mi.getArguments()[2]).longValue();
       int maxDeliveries = ((Integer)mi.getArguments()[3]).intValue();
+      SessionDelegate connectionConsumerDelegate = ((SessionDelegate)mi.getArguments()[4]);
       
       if (m == null)
       {
@@ -142,6 +142,7 @@
       holder.consumerID = theConsumerID;
       holder.channelID = channelID;
       holder.maxDeliveries = maxDeliveries;
+      holder.connectionConsumerDelegate = connectionConsumerDelegate;
       
       msgs.add(holder);
 
@@ -154,6 +155,7 @@
       
       MethodInvocation mi = (MethodInvocation)invocation;
             
+      //This is the delegate for the session from the pool
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
       
       int ackMode = getSessionState(invocation).getAcknowledgeMode();
@@ -166,7 +168,8 @@
          
          MessageCallbackHandler.callOnMessage(del, sessionListener, holder.consumerID,
                                               holder.channelID, false,
-                                              holder.msg, ackMode, holder.maxDeliveries);                          
+                                              holder.msg, ackMode, holder.maxDeliveries,
+                                              holder.connectionConsumerDelegate);                          
       }
       
       return null;
@@ -191,5 +194,6 @@
       private int consumerID;
       private long channelID;
       private int maxDeliveries;
+      private SessionDelegate connectionConsumerDelegate;
    }
 }

Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -23,12 +23,14 @@
 package org.jboss.jms.client.container;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import javax.jms.JMSException;
+import javax.jms.Session;
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
@@ -352,9 +354,7 @@
       for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
       {
          SessionState failedSessionState = (SessionState)i.next();
-         
-         if (trace) { log.trace("Failed session state has " + failedSessionState.getToAck().size() + " deliveries"); }
-         
+          
          int oldSessionId = failedSessionState.getSessionId();
 
          ClientSessionDelegate failedSessionDelegate =
@@ -367,7 +367,7 @@
 
          SessionState newSessionState = (SessionState)newSessionDelegate.getState();
          
-         if (trace) { log.trace("New session state has " + newSessionState.getToAck().size() + " deliveries"); }
+         if (trace) { log.trace("New session state has " + newSessionState.getClientAckList().size() + " deliveries"); }
          
          oldNewSessionStateMap.put(new Integer(oldSessionId), failedSessionState);
 
@@ -418,33 +418,58 @@
          SessionState state = (SessionState)iter.next();
          
          List ackInfos = null;
-         
-         if (!state.isTransacted() && !state.isXA())
+           
+         if (!state.isTransacted() ||
+             (state.isXA() && state.getCurrentTxId() == null))     
          {
-            //Now we remove any unacked np messages - this is because we don't want to ack them
+            //Non transacted session or an XA session with no transaction set (it falls back to auto_ack)
+            
+            if (trace) { log.trace("Session is not transacted (or XA with no tx set), retrieving deliveries from session state"); }
+
+                        
+            //we remove any unacked np messages - this is because we don't want to ack them
             //since the server won't know about them and will barf
                         
-            Iterator iter2 = state.getToAck().iterator();
-            
-            if (trace) { log.trace("Removing any np deliveries"); }
-
-            while (iter2.hasNext())
+            if (state.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
             {
-               DeliveryInfo info = (DeliveryInfo)iter2.next();
-
-               if (!info.getMessageProxy().getMessage().isReliable())
+               Iterator iter2 = state.getClientAckList().iterator();
+               
+               if (trace) { log.trace("Removing any np deliveries"); }
+   
+               while (iter2.hasNext())
                {
-                  iter2.remove();
-                  
-                  if (trace) { log.trace("Removed np delivery: " + info.getDeliveryId()); }
+                  DeliveryInfo info = (DeliveryInfo)iter2.next();
+   
+                  if (!info.getMessageProxy().getMessage().isReliable())
+                  {
+                     iter2.remove();
+                     
+                     if (trace) { log.trace("Removed np delivery: " + info.getDeliveryId()); }
+                  }
                }
+               
+               ackInfos = state.getClientAckList();
             }
+            else
+            {
+               DeliveryInfo autoAck = state.getAutoAckInfo();
+               if (autoAck != null)                 
+               {
+                  if (!autoAck.getMessageProxy().getMessage().isReliable())
+                  {
+                     //unreliable
+                     state.setAutoAckInfo(null);
+                     ackInfos = Collections.EMPTY_LIST;
+                  }
+                  else
+                  {
+                     //reliable
+                     ackInfos = new ArrayList();
+                     ackInfos.add(autoAck);
+                  }
+               }               
+            }
             
-            if (trace) { log.trace("Session is not transacted, retrieving deliveries from session state"); }
-
-            //Get the ack infos from the list in the session state
-            ackInfos = state.getToAck();
-            
             if (trace) { log.trace("Retrieved " + ackInfos.size() + " deliveries"); }
          }
          else

Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -36,7 +36,7 @@
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.server.endpoint.Cancel;
+import org.jboss.jms.server.endpoint.DefaultCancel;
 import org.jboss.jms.server.endpoint.DefaultAck;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.logging.Logger;
@@ -68,6 +68,30 @@
    
    // Public --------------------------------------------------------
 
+   private void ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+   {
+      SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
+      
+      //If the delivery was obtained via a connection consumer we need to ack via that
+      //otherwise we just use this session
+      
+      SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
+      
+      sessionToUse.acknowledgeDelivery(delivery);      
+   }
+   
+   private void cancelDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+   {
+      SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
+      
+      //If the delivery was obtained via a connection consumer we need to cancel via that
+      //otherwise we just use this session
+      
+      SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
+      
+      sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(), delivery.getMessageProxy().getDeliveryCount()));      
+   }
+   
    public Object handleClosing(Invocation invocation) throws Throwable
    {
       MethodInvocation mi = (MethodInvocation)invocation;
@@ -75,42 +99,51 @@
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
       
       int ackMode = state.getAcknowledgeMode();
-
-      // select eligible acknowledgments
-      List acks = new ArrayList();
-      List cancels = new ArrayList();
-      for(Iterator i = state.getToAck().iterator(); i.hasNext(); )
+  
+      //We need to either ack (for auto_ack) or cancel (for client_ack)
+      //any deliveries - this is because the message listener might have closed
+      //before on message had finished executing
+      
+      if (ackMode == Session.AUTO_ACKNOWLEDGE ||
+          ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
+          (state.isXA() && state.getCurrentTxId() == null))
       {
-         DeliveryInfo ack = (DeliveryInfo)i.next();
-         if (ackMode == Session.AUTO_ACKNOWLEDGE ||
-             ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+         //Acknowledge any outstanding auto ack
+         
+         DeliveryInfo remainingAutoAck = state.getAutoAckInfo();
+         
+         if (remainingAutoAck != null)
          {
-            acks.add(new DefaultAck(ack.getMessageProxy().getDeliveryId()));            
+            if (trace) { log.trace(this + " handleClosing(). Found remaining auto ack. Will ack it " + remainingAutoAck.getDeliveryId()); }
+            
+            ackDelivery(del, remainingAutoAck);
+            
+            if (trace) { log.trace(this + " acked it"); }
+            
+            state.setAutoAckInfo(null);
          }
-         else
+      }
+      else if (ackMode == Session.CLIENT_ACKNOWLEDGE)
+      {
+         // Cancel any oustanding deliveries
+         // We cancel any client ack or transactional, we do this explicitly so we can pass the updated
+         // delivery count information from client to server. We could just do this on the server but
+         // we would lose delivery count info.
+         
+         //CLIENT_ACKNOWLEDGE cannot be used with MDBs so is always safe to cancel on this session
+         
+         List cancels = new ArrayList();
+         
+         for(Iterator i = state.getClientAckList().iterator(); i.hasNext(); )
          {
-            Cancel cancel = new Cancel(ack.getMessageProxy().getDeliveryId(), ack.getMessageProxy().getDeliveryCount());
+            DeliveryInfo ack = (DeliveryInfo)i.next();            
+            DefaultCancel cancel = new DefaultCancel(ack.getMessageProxy().getDeliveryId(), ack.getMessageProxy().getDeliveryCount());
             cancels.add(cancel);
          }
-         i.remove();
+         
+         state.getClientAckList().clear();
       }
       
-      // On closing we acknowlege any AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE, since the session
-      // might have closed before the onMessage had finished executing.
-      // We cancel any client ack or transactional, we do this explicitly so we can pass the updated
-      // delivery count information from client to server. We could just do this on the server but
-      // we would lose delivery count info.
-
-      if (!acks.isEmpty())
-      {
-         del.acknowledgeBatch(acks);
-      }
-      if (!cancels.isEmpty())
-      {
-         log.info("Calling canceldeliveries: " + cancels.size());
-         del.cancelDeliveries(cancels);
-      }
-
       return invocation.invokeNext();
    }
 
@@ -134,28 +167,37 @@
       
       int ackMode = state.getAcknowledgeMode();
       
-      if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
-          ackMode == Session.AUTO_ACKNOWLEDGE ||
-          ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
-          state.getCurrentTxId() == null)
+      Object[] args = mi.getArguments();
+      DeliveryInfo info = (DeliveryInfo)args[0];
+      
+      if (ackMode == Session.CLIENT_ACKNOWLEDGE)
       {
-         // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK, and
-         // also for XA sessions not enlisted in a global transaction.
- 
-         // We store the ack in a list for later acknowledgement or recovery
-    
-         Object[] args = mi.getArguments();
-         DeliveryInfo info = (DeliveryInfo)args[0];
-
-         state.getToAck().add(info);
+         // We collect acknowledgments in the list
          
-         if (trace)
-         { 
-            SessionDelegate del = (SessionDelegate)mi.getTargetObject();            
-            log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del);
-         }
+         if (trace) { log.trace(this + " delivery id: " + info.getDeliveryId() + " added to client ack list"); }
+         
+         state.getClientAckList().add(info);
+         
+         //We can return immediately
+         return null;
       }
-
+      else if (ackMode == Session.AUTO_ACKNOWLEDGE ||
+               ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
+               (state.isXA() && state.getCurrentTxId() == null))
+      {
+         //We collect the single acknowledgement in the state.
+         //Currently DUPS_OK is treated the same as AUTO_ACKNOWLDGE
+         //Also XA sessions not enlisted in a global tx are treated as AUTO_ACKNOWLEDGE
+         
+         if (trace) { log.trace(this + " delivery id: " + info.getDeliveryId() + " added to client ack member"); }
+         
+         state.setAutoAckInfo(info);         
+         
+         //We can return immediately         
+         return null;
+      }
+              
+      //Transactional - need to carry on down the stack
       return invocation.invokeNext();
    }
    
@@ -166,11 +208,13 @@
       SessionState state = getState(invocation);
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
     
-      if (!state.getToAck().isEmpty())
-      {                  
-         del.acknowledgeBatch(state.getToAck());
+      if (!state.getClientAckList().isEmpty())
+      {                 
+         //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
+         //on this session (rather than the connection consumer session)
+         del.acknowledgeDeliveries(state.getClientAckList());
       
-         state.getToAck().clear();
+         state.getClientAckList().clear();
       }
         
       return null;
@@ -192,42 +236,35 @@
       
       if (ackMode == Session.AUTO_ACKNOWLEDGE ||
           ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
-          (ackMode != Session.CLIENT_ACKNOWLEDGE && state.getCurrentTxId() == null))
+          (state.isXA() && state.getCurrentTxId() == null))
       {
-         // We acknowledge immediately on a non-transacted session that does not want to
-         // CLIENT_ACKNOWLEDGE, or an XA session not enrolled in a global transaction.
-
+         //We auto acknowledge
+         //Currently DUPS_OK is treated the same as AUTO_ACKNOWLDGE
+         //Also XA sessions not enlisted in a global tx are treated as AUTO_ACKNOWLEDGE
+         
          SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
 
          if (!state.isRecoverCalled())
          {
-            if (trace) { log.trace("acknowledging NON-transactionally"); }
-                        
-            List acks = state.getToAck();
+            DeliveryInfo deliveryInfo = state.getAutoAckInfo();
             
-            // Sanity check
-            if (acks.size() != 1)
+            if (deliveryInfo == null)
             {
-               throw new IllegalStateException("Should only be one entry in list. " +
-                                               "There are " + acks.size());
+               throw new IllegalStateException("Cannot find delivery to auto ack");
             }
-            
-            DeliveryInfo ack = (DeliveryInfo)acks.get(0);
-            
+                                 
+            if (trace) { log.trace(this + " auto acking delivery " + deliveryInfo.getDeliveryId()); }
+                        
             if (cancel)
             {
-               List cancels = new ArrayList();
-               Cancel c = new Cancel(ack.getMessageProxy().getDeliveryId(), ack.getMessageProxy().getDeliveryCount());
-               cancels.add(c);
-               sd.cancelDeliveries(cancels);
+               cancelDelivery(sd, deliveryInfo);
             }
             else
             {
-               sd.acknowledge(ack);
+               ackDelivery(sd, deliveryInfo);
             }
             
-            //TODO we can optimise this for the auto_ack case (i.e. not store in list and have to clear each time)
-            state.getToAck().clear();
+            state.setAutoAckInfo(null);
          }
          else
          {
@@ -251,9 +288,7 @@
             
       SessionState state = getState(invocation);
       
-      int ackMode = state.getAcknowledgeMode();
-         
-      if (ackMode == Session.SESSION_TRANSACTED)
+      if (state.isTransacted())
       {
          throw new IllegalStateException("Cannot recover a transacted session");
       }
@@ -263,9 +298,27 @@
       //Call redeliver
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
       
-      del.redeliver(state.getToAck());
+      if (state.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+      {
+         del.redeliver(state.getClientAckList());
+         
+         state.getClientAckList().clear();
+      }
+      else
+      {
+         DeliveryInfo info = state.getAutoAckInfo();
+         
+         if (info != null)
+         {
+            List redels = new ArrayList();
             
-      state.getToAck().clear();
+            redels.add(info);
+            
+            del.redeliver(redels);
+            
+            state.setAutoAckInfo(null);            
+         }
+      }            
 
       state.setRecoverCalled(true);
       
@@ -297,9 +350,7 @@
     * was called.
     */
    public Object handleRedeliver(Invocation invocation) throws Throwable
-   {
-      if (trace) { log.trace("redeliver called"); }
-      
+   {            
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
             
@@ -307,8 +358,11 @@
       // JMSRedelivered to true.
       
       List toRedeliver = (List)mi.getArguments()[0];
-      LinkedList toCancel = new LinkedList();
       
+      if (trace) { log.trace(this + " handleRedeliver() called: " + toRedeliver); }
+      
+      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+      
       // Need to be recovered in reverse order.
       for (int i = toRedeliver.size() - 1; i >= 0; i--)
       {
@@ -319,25 +373,17 @@
               
          if (handler == null)
          {
-            // This is ok. The original consumer has closed, this message wil get cancelled back
-            // to the channel.
-            Cancel cancel = new Cancel(info.getMessageProxy().getDeliveryId(), info.getMessageProxy().getDeliveryCount());
-            toCancel.addFirst(cancel);
+            // This is ok. The original consumer has closed, so we cancel the message
+            
+            cancelDelivery(del, info);
          }
          else
          {
+            if (trace) { log.trace("Adding proxy back to front of buffer"); }
             handler.addToFrontOfBuffer(proxy);
          }                                    
       }
-      
-      if (!toCancel.isEmpty())
-      {
-         // Cancel the messages that can't be redelivered locally
-         
-         SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-         del.cancelDeliveries(toCancel);
-      }
-            
+              
       return null;  
    }
    

Modified: trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -27,6 +27,7 @@
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.HierarchicalState;
@@ -194,7 +195,16 @@
 
          if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
 
-         connState.getResourceManager().addAck(txID, state.getSessionId(), info);
+         //If the ack is for a delivery that came through via a connection consumer then we
+         //use the connectionConsumer session as the session id, otherwise we use this sessions'
+         //session id
+         
+         ClientSessionDelegate connectionConsumerDelegate =
+            (ClientSessionDelegate)info.getConnectionConsumerSession();
+         
+         int sessionId = connectionConsumerDelegate != null ? connectionConsumerDelegate.getID() : state.getSessionId();
+         
+         connState.getResourceManager().addAck(txID, sessionId, info);
       }
 
       return null;

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -44,6 +44,7 @@
 import org.jboss.jms.message.StreamMessageProxy;
 import org.jboss.jms.message.TextMessageProxy;
 import org.jboss.jms.server.endpoint.Ack;
+import org.jboss.jms.server.endpoint.Cancel;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.remoting.Client;
 
@@ -112,7 +113,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void acknowledge(Ack ack) throws JMSException
+   public void acknowledgeDelivery(Ack ack) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
@@ -121,7 +122,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void acknowledgeBatch(List acks) throws JMSException
+   public void acknowledgeDeliveries(List acks) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
@@ -405,7 +406,8 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void addAsfMessage(MessageProxy m, int consumerID, long channelId, int maxDeliveries)
+   public void addAsfMessage(MessageProxy m, int consumerID, long channelId, int maxDeliveries,
+                             SessionDelegate connectionConsumerSession)
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
@@ -423,7 +425,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void cancelDeliveries(List ackInfos)
+   public void cancelDeliveries(List cancels)
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
@@ -432,6 +434,15 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
+   public void cancelDelivery(Cancel cancel)
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+   
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
    public void recoverDeliveries(List ackInfos) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -34,7 +34,7 @@
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.server.endpoint.Cancel;
+import org.jboss.jms.server.endpoint.DefaultCancel;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.Future;
@@ -65,6 +65,7 @@
       trace = log.isTraceEnabled();
    }
      
+   //This is static so it can be called by the asf layer too
    public static void callOnMessage(SessionDelegate sess,
                                     MessageListener listener,
                                     int consumerID,
@@ -72,10 +73,16 @@
                                     boolean isConnectionConsumer,
                                     MessageProxy m,
                                     int ackMode,
-                                    int maxDeliveries)
+                                    int maxDeliveries,
+                                    SessionDelegate connectionConsumerSession)
       throws JMSException
    {
-      preDeliver(sess, consumerID, channelID, m, isConnectionConsumer);
+      // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+      // add anything to the tx for this session.
+      if (!isConnectionConsumer)
+      {
+         sess.preDeliver(new DeliveryInfo(m, consumerID, channelID, connectionConsumerSession));
+      }  
                   
       int tries = 0;
       
@@ -136,36 +143,16 @@
       if (!sess.isClosed())
       {
          // postDeliver only if the session is not closed
-         postDeliver(sess, isConnectionConsumer, cancel);
+
+         // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+         // add anything to the tx for this session
+         if (!isConnectionConsumer)
+         {
+            sess.postDeliver(cancel);
+         }   
       }
    }
    
-   protected static void preDeliver(SessionDelegate sess,
-                                    int consumerID,
-                                    long channelID,
-                                    MessageProxy m,
-                                    boolean isConnectionConsumer)
-      throws JMSException
-   {
-      // If this is the callback-handler for a connection consumer we don't want to acknowledge or
-      // add anything to the tx for this session.
-      if (!isConnectionConsumer)
-      {
-         sess.preDeliver(new DeliveryInfo(m, consumerID, channelID));
-      }         
-   }
-   
-   protected static void postDeliver(SessionDelegate sess, boolean isConnectionConsumer,
-                                     boolean cancel) throws JMSException
-   {
-      // If this is the callback-handler for a connection consumer we don't want to acknowledge or
-      // add anything to the tx for this session
-      if (!isConnectionConsumer)
-      {
-         sess.postDeliver(cancel);
-      }         
-   }
-   
    // Attributes ----------------------------------------------------
       
    private LinkedList buffer;
@@ -331,7 +318,7 @@
          for(Iterator i = buffer.iterator(); i.hasNext();)
          {
             MessageProxy mp = (MessageProxy)i.next();
-            Cancel ack = new Cancel(mp.getDeliveryId(), mp.getDeliveryCount());
+            DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
             cancels.add(ack);
          }
                
@@ -450,9 +437,15 @@
                        
                // If message is expired we still call pre and post deliver. This makes sure the
                // message is acknowledged so it gets removed from the queue/subscription.
-               preDeliver(sessionDelegate, consumerID, channelID, m, isConnectionConsumer);
+
+               if (!isConnectionConsumer)
+               {
+                  sessionDelegate.preDeliver(new DeliveryInfo(m, consumerID, channelID, null));
+                  
+                  sessionDelegate.postDeliver(false);
+               }
                
-               postDeliver(sessionDelegate, isConnectionConsumer, false);
+               //postDeliver(sessionDelegate, isConnectionConsumer, false);
                
                if (!m.getMessage().isExpired())
                {
@@ -731,7 +724,7 @@
          {
             try
             {
-               callOnMessage(sessionDelegate, listener, consumerID, channelID, false, mp, ackMode, maxDeliveries);
+               callOnMessage(sessionDelegate, listener, consumerID, channelID, false, mp, ackMode, maxDeliveries, null);
             }
             catch (JMSException e)
             {

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -32,6 +32,7 @@
 import org.jboss.jms.client.remoting.MessageCallbackHandler;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.server.Version;
+import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.jms.tx.MessagingXAResource;
 import org.jboss.logging.Logger;
 
@@ -68,14 +69,16 @@
    
    private boolean recoverCalled;
 
-   // List<AckInfo>
-   private List toAck;
+   // List<DeliveryInfo>
+   private List ClientAckList;
+   
+   private DeliveryInfo autoAckInfo;
 
    private ConnectionState parent;
    
    private SessionDelegate delegate;
    
-   private Map callbackHandlers;
+   private Map callbackHandlers;     
    
    public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
                        boolean transacted, int ackMode, boolean xa)
@@ -106,7 +109,7 @@
       
       executor = new QueuedExecutor(new LinkedQueue());
       
-      toAck = new ArrayList();
+      ClientAckList = new ArrayList();
       
       // TODO could optimise this to use the same map of callbackmanagers (which holds refs
       // to callbackhandlers) in the connection, instead of maintaining another map
@@ -137,11 +140,25 @@
    /**
     * @return List<AckInfo>
     */
-   public List getToAck()
+   public List getClientAckList()
    {
-      return toAck;
+      return ClientAckList;
    }
    
+   public DeliveryInfo getAutoAckInfo()
+   {
+      return autoAckInfo;
+   }
+   
+   public void setAutoAckInfo(DeliveryInfo info)
+   {
+      if (info != null && autoAckInfo != null)
+      {
+         throw new IllegalStateException("There is already a delivery set for auto ack");
+      }
+      autoAckInfo = info;
+   }
+   
    public int getAcknowledgeMode()
    {
       return acknowledgeMode;

Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -78,7 +78,8 @@
    
    XAResource getXAResource();
    
-   void addAsfMessage(MessageProxy m, int consumerID, long channelID, int maxDeliveries);
+   void addAsfMessage(MessageProxy m, int consumerID, long channelID,
+                      int maxDeliveries, SessionDelegate connectionConsumerDelegate);
    
    boolean getTransacted();
    

Modified: trunk/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/MessageProxy.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/message/MessageProxy.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -65,6 +65,8 @@
 
    // Attributes ----------------------------------------------------
 
+   //The actual session delegate for the message - needed for doing recovery
+   //so we can recover locally
    private transient SessionDelegate delegate;
    
    private transient boolean cc;

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -37,6 +37,7 @@
 import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
 import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
 import org.jboss.jms.server.connectormanager.SimpleConnectorManager;
+import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
 import org.jboss.jms.server.plugin.contract.JMSUserManager;
 import org.jboss.jms.server.remoting.JMSServerInvocationHandler;
 import org.jboss.jms.server.remoting.JMSWireFormat;
@@ -65,6 +66,8 @@
 import org.jboss.system.ServiceMBeanSupport;
 import org.w3c.dom.Element;
 
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+
 /**
  * A JMS server peer.
  *
@@ -112,6 +115,8 @@
    private long failoverStartTimeout = 3000;
    
    private long failoverCompleteTimeout = 12000;
+   
+   private Map sessions;
       
    // wired components
 
@@ -159,9 +164,11 @@
       version = Version.instance();
       
       failoverStatusLock = new Object();
+      
+      sessions = new ConcurrentReaderHashMap();
 
       started = false;
-   }
+   }      
 
    // ServiceMBeanSupport overrides ---------------------------------
 
@@ -566,6 +573,24 @@
    }
 
    // Public --------------------------------------------------------
+   
+   public ServerSessionEndpoint getSession(Integer sessionID)
+   {
+      return (ServerSessionEndpoint)sessions.get(sessionID);
+   }
+   
+   public void addSession(Integer id, ServerSessionEndpoint session)
+   {
+      sessions.put(id, session);      
+   }
+   
+   public void removeSession(Integer id)
+   {
+      if (sessions.remove(id) == null)
+      {
+         throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+      }
+   }
 
    public Queue getDLQ() throws Exception
    {

Deleted: trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -1,105 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.jms.server.endpoint;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.jboss.messaging.util.Streamable;
-
-/**
- * 
- * A Cancel.
- * 
- * Used to send a cancel (NACK) to the server
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class Cancel implements Streamable
-{
-   // Constants -----------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-   
-   private long deliveryId;
-   
-   private int deliveryCount;      
-
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-   
-   public Cancel()
-   {      
-   }
-   
-   public Cancel(long deliveryId, int deliveryCount)
-   {      
-      this.deliveryId = deliveryId;
-      
-      this.deliveryCount = deliveryCount;
-   }
-
-   // Public --------------------------------------------------------
-   
-   public long getDeliveryId()
-   {
-      return deliveryId;
-   }
-   
-   public int getDeliveryCount()
-   {
-      return deliveryCount;
-   }
-
-   // Streamable implementation -------------------------------------
-   
-   public void read(DataInputStream in) throws Exception
-   {
-      deliveryId = in.readLong();
-      
-      deliveryCount = in.readInt();
-   }
-
-   public void write(DataOutputStream out) throws Exception
-   {
-      out.writeLong(deliveryId);
-      
-      out.writeInt(deliveryCount);
-   }
-
-   // Class YYY overrides -------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Package Private -----------------------------------------------
-
-   // Private -------------------------------------------------------
-   
-   // Inner Classes -------------------------------------------------
-   
-}
-

Copied: trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java (from rev 1798, trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java)
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java	2006-12-15 08:59:37 UTC (rev 1798)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -0,0 +1,85 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+
+/**
+ * 
+ * A Cancel.
+ * 
+ * Used to send a cancel (NACK) to the server
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultCancel implements Cancel
+{
+   // Constants -----------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   private long deliveryId;
+   
+   private int deliveryCount;      
+
+   // Static --------------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
+   public DefaultCancel()
+   {      
+   }
+   
+   public DefaultCancel(long deliveryId, int deliveryCount)
+   {      
+      this.deliveryId = deliveryId;
+      
+      this.deliveryCount = deliveryCount;
+   }
+
+   // Public --------------------------------------------------------
+   
+   public long getDeliveryId()
+   {
+      return deliveryId;
+   }
+   
+   public int getDeliveryCount()
+   {
+      return deliveryCount;
+   }
+
+   // Class YYY overrides -------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Package Private -----------------------------------------------
+
+   // Private -------------------------------------------------------
+   
+   // Inner Classes -------------------------------------------------
+   
+}
+

Modified: trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -21,6 +21,7 @@
   */
 package org.jboss.jms.server.endpoint;
 
+import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.message.MessageProxy;
 
 /**
@@ -48,18 +49,29 @@
 
    private MessageProxy msg;
    
+   //When using the evil abomination known as a ConnectionConsumer, the connection consumer
+   //will get from a session that it created, then pass them onto sessions got from the pool
+   //this means when the messages are acked/cancelled then this needs to be done against
+   //the connection consumer's session not the session from the pool, since that session won't know
+   //about the deliveries on the server side
+   //Therefore if this delivery was done using a connection consumer then this attribute is set
+   //to the connection consumer's session, otherwise it will be null
+   private SessionDelegate connectionConsumerSession;
+   
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
-
-   public DeliveryInfo(MessageProxy msg, int consumerId, long channelID)
+   public DeliveryInfo(MessageProxy msg, int consumerId, long channelID,
+                       SessionDelegate connectionConsumerSession)
    {      
       this.msg = msg;
       
       this.consumerId = consumerId;
       
       this.channelID = channelID;
+      
+      this.connectionConsumerSession = connectionConsumerSession;
    }
 
    // Public --------------------------------------------------------
@@ -79,6 +91,11 @@
       return msg;
    }
    
+   public SessionDelegate getConnectionConsumerSession()
+   {
+      return connectionConsumerSession;
+   }
+   
 
    // Ack Implementation  -------------------------------------------
    

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -189,7 +189,11 @@
          
          SessionAdvised sessionAdvised = new SessionAdvised(ep);
          
-         JMSDispatcher.instance.registerTarget(new Integer(sessionID), sessionAdvised);
+         Integer iSessionID = new Integer(sessionID);
+         
+         serverPeer.addSession(iSessionID, ep);
+         
+         JMSDispatcher.instance.registerTarget(iSessionID, sessionAdvised);
 
          ClientSessionDelegate d = new ClientSessionDelegate(sessionID);
                  
@@ -649,8 +653,14 @@
                      
             List acks = sessionState.getAcks();
             
-            ServerSessionEndpoint session = (ServerSessionEndpoint)sessions.get(new Integer(sessionState.getSessionId()));
+            //We need to lookup the session in a global map maintained on the server peer.
+            //We can't just assume it's one of the sessions in the connection.
+            //This is because in the case of a connection consumer, the message might be delivered through one
+            //connection and the transaction committed/rolledback through another.
+            //ConnectionConsumers suck.
             
+            ServerSessionEndpoint session = serverPeer.getSession(new Integer(sessionState.getSessionId()));
+            
             session.acknowledgeTransactionally(acks, tx);      
          }
       }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -83,8 +83,17 @@
 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
 
 /**
- * Concrete implementation of SessionEndpoint.
+ * The server side representation of a JMS session.
  * 
+ * A user must not invoke methods of a session concurrently on different threads, however
+ * there are situations where multiple threads may access this object concurrently, for instance:
+ * 
+ * A session can be closed when it's connection is closed by the user which might be called on a different thread
+ * A session can be closed when the server determines the connection is dead.
+ * If the session represents a connection consumer's session then the connection consumer will farm off
+ * messages to different sessions obtained from a pool, these may then cancel/ack etc on different threads, but
+ * the acks/cancels/etc will end up back here on the connection consumer session instance.
+ * 
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
@@ -109,6 +118,8 @@
    private volatile boolean closed;
 
    private ServerConnectionEndpoint connectionEndpoint;
+   
+   private ServerPeer sp;
 
    private Map consumers;
    private Map browsers;
@@ -140,7 +151,7 @@
 
       this.connectionEndpoint = connectionEndpoint;
 
-      ServerPeer sp = connectionEndpoint.getServerPeer();
+      sp = connectionEndpoint.getServerPeer();
 
       pm = sp.getPersistenceManagerInstance();
       ms = sp.getMessageStore();
@@ -327,24 +338,31 @@
       }
    }
    
-   public void acknowledgeBatch(List acks) throws JMSException
-   {      
+   public void acknowledgeDelivery(Ack ack) throws JMSException
+   {
       try
       {
+         acknowledgeDeliveryInternal(ack);   
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledge");
+      }
+   }     
+         
+   public void acknowledgeDeliveries(List acks) throws JMSException
+   {    
+      if (trace) {log.trace(this + " acknowledgeDeliveries " + acks); }
+      
+      try
+      {
          Iterator iter = acks.iterator();
          
          while (iter.hasNext())
          {
             Ack ack = (Ack)iter.next();
             
-            Delivery del = (Delivery)deliveries.remove(new Long(ack.getDeliveryId()));
-            
-            if (del == null)
-            {
-               throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
-            }
-            
-            del.acknowledge(null);
+            acknowledgeDeliveryInternal(ack);
          }
       }
       catch (Throwable t)
@@ -352,107 +370,46 @@
          throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledgeBatch");
       }
    }
-   
-   public void acknowledge(Ack ack) throws JMSException
+       
+   public void cancelDelivery(Cancel cancel) throws JMSException
    {
+      if (trace) {log.trace(this + " cancelDelivery " + cancel); }
+      
       try
       {
-         Delivery del = (Delivery)deliveries.remove(new Long(ack.getDeliveryId()));
+         Delivery del = cancelDeliveryInternal(cancel);
          
-         if (del == null)
-         {
-            throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
-         }
-         
-         del.acknowledge(null);    
+         //Prompt delivery
+         ((Channel)del.getObserver()).deliver(false);
       }
       catch (Throwable t)
       {
-         throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledge");
+         throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDelivery");
       }
-   }     
+     
+   }      
 
    public void cancelDeliveries(List cancels) throws JMSException
    {
+      if (trace) {log.trace(this + " cancelDeliveries " + cancels); }
+      
       try
       {
          // deliveries must be cancelled in reverse order
-           
-         List forDLQ = null;
-         
+
          Set channels = new HashSet();
                            
          for (int i = cancels.size() - 1; i >= 0; i--)
          {
-            Cancel cancel = (Cancel)cancels.get(i);
+            Cancel cancel = (Cancel)cancels.get(i);       
             
-            Delivery del = (Delivery)deliveries.remove(new Long(cancel.getDeliveryId()));
+            if (trace) { log.trace("Cancelling delivery " + cancel.getDeliveryId()); }
             
-            if (del == null)
-            {
-               throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
-            }
-                                    
-            if (cancel.getDeliveryCount() >= maxDeliveryAttempts)
-            {
-               if (forDLQ == null)
-               {
-                  forDLQ = new ArrayList();
-               }
-               
-               forDLQ.add(del);
-            }
-            else
-            {                                                   
-               del.getReference().setDeliveryCount(cancel.getDeliveryCount());
-               
-               del.cancel();
-               
-               channels.add(del.getObserver());
-            }
+            Delivery del = cancelDeliveryInternal(cancel);
+            
+            channels.add(del.getObserver());
          }
-         
-         //Send stuff to DLQ
-         
-         if (forDLQ != null)
-         {
-            //We do this in a tx so we don't end up with the message in both the original queue
-            //and the dlq if it fails half way through
-            Transaction tx = tr.createTransaction();
-            
-            try
-            {               
-               for (int i = forDLQ.size() - 1; i >= 0; i--)
-               {
-                  Delivery del = (Delivery)forDLQ.get(i);
                   
-                  if (dlq != null)
-                  {         
-                     //reset delivery count to zero
-                     del.getReference().setDeliveryCount(0);
-                     
-                     dlq.handle(null, del.getReference(), tx);
-                     
-                     del.acknowledge(tx);           
-                  }
-                  else
-                  {
-                     log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-                     
-                     del.acknowledge(tx);
-                  }                              
-               }
-               
-               tx.commit();
-            }
-            catch (Throwable t)
-            {
-               tx.rollback();
-               
-               throw t;
-            }
-         }
-         
          // need to prompt delivery for all affected channels
          
          promptDelivery(channels);
@@ -767,21 +724,7 @@
          }
       }
    }
-   
-//   void promptDeliveryOnConsumers()
-//   {
-//      if (trace) { log.trace(this + " promptDeliveryOnConsumers(), there are " + consumers.size() + " consumers"); }
-//      synchronized (consumers)
-//      {         
-//         for (Iterator i = consumers.values().iterator(); i.hasNext(); )
-//         {
-//            ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)i.next();
-//            
-//            consumer.promptDelivery();
-//         }
-//      }
-//   }
-   
+    
    void localClose() throws Throwable
    {
       if (closed)
@@ -853,6 +796,8 @@
       promptDelivery(channels);
       
       deliveries.clear();
+      
+      sp.removeSession(new Integer(id));
             
       JMSDispatcher.instance.unregisterTarget(new Integer(id));
       
@@ -940,7 +885,75 @@
    // Protected -----------------------------------------------------        
 
    // Private -------------------------------------------------------
+   
+   private void acknowledgeDeliveryInternal(Ack ack) throws Throwable
+   {
+      if (trace) { log.trace("Acknowledging delivery " + ack.getDeliveryId()); }
       
+      Delivery del = (Delivery)deliveries.remove(new Long(ack.getDeliveryId()));
+      
+      if (del == null)
+      {
+         throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
+      }
+      
+      del.acknowledge(null);    
+   } 
+   
+   private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
+   {
+      Delivery del = (Delivery)deliveries.remove(new Long(cancel.getDeliveryId()));
+      
+      if (del == null)
+      {
+         throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
+      }
+                              
+      if (cancel.getDeliveryCount() >= maxDeliveryAttempts)
+      {
+         //Send to DLQ
+         
+         //We do this in a tx so we don't end up with the message in both the original queue
+         //and the dlq if it fails half way through
+         Transaction tx = tr.createTransaction();
+         
+         try
+         {               
+            if (dlq != null)
+            {         
+               //reset delivery count to zero
+               del.getReference().setDeliveryCount(0);
+               
+               dlq.handle(null, del.getReference(), tx);
+               
+               del.acknowledge(tx);           
+            }
+            else
+            {
+               log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
+               
+               del.acknowledge(tx);
+            }                              
+                        
+            tx.commit();
+         }
+         catch (Throwable t)
+         {
+            tx.rollback();
+            
+            throw t;
+         }         
+      }
+      else
+      {                                                   
+         del.getReference().setDeliveryCount(cancel.getDeliveryCount());
+         
+         del.cancel();
+      }
+      
+      return del;
+   }
+      
    private ConsumerDelegate failoverConsumer(JBossDestination jmsDestination,
             String selectorString,
             boolean noLocal,  String subscriptionName,
@@ -1277,6 +1290,8 @@
    }
    
    
+   
+   
    // Inner classes -------------------------------------------------
    
    /**

Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -68,20 +68,33 @@
    JBossTopic createTopic(String topicName) throws JMSException;
  
    /**
-    * Acknowledge a batch of messages - used with client acknowledge or dups_ok acknowledge
+    * Acknowledge a list of deliveries
     * @param ackInfos
     * @throws JMSException
     */
-   void acknowledgeBatch(List deliveryIds) throws JMSException;
+   void acknowledgeDeliveries(List deliveryIds) throws JMSException;
    
    /**
-    * Acknowledge a message - used for auto acknowledge
+    * Acknowledge a delivery
     * @param deliveryId
     * @throws JMSException
     */
-   void acknowledge(Ack ack) throws JMSException;
+   void acknowledgeDelivery(Ack ack) throws JMSException;
    
    /**
+    * Cancel a list of deliveries.
+    * @param ackInfos
+    */
+   void cancelDeliveries(List cancelInfos) throws JMSException;
+         
+   /**
+    * Cancel a delivery
+    * @param cancel
+    * @throws JMSException
+    */
+   void cancelDelivery(Cancel cancel) throws JMSException;
+   
+   /**
     * Add a temporary destination.
     */
    void addTemporaryDestination(JBossDestination destination) throws JMSException;
@@ -108,14 +121,6 @@
    void send(JBossMessage message) throws JMSException;
    
    /**
-    * Cancel some deliveries.
-    * This used at consumer close to cancel any undelivered messages left in the client buffer
-    * or at session recovery to cancel any messages that couldn't be redelivered locally
-    * @param ackInfos
-    */
-   void cancelDeliveries(List cancelInfos) throws JMSException;
-      
-   /**
     * Send delivery info to the server so the delivery lists can be repopulated
     * used at failover
     * @param ackInfos

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -32,6 +32,7 @@
 import org.jboss.jms.destination.JBossTopic;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.endpoint.Ack;
+import org.jboss.jms.server.endpoint.Cancel;
 import org.jboss.jms.server.endpoint.SessionEndpoint;
 
 /**
@@ -102,14 +103,14 @@
       return endpoint.createTopic(topicName);
    }
 
-   public void acknowledgeBatch(List acks) throws JMSException
+   public void acknowledgeDeliveries(List acks) throws JMSException
    {
-      endpoint.acknowledgeBatch(acks);
+      endpoint.acknowledgeDeliveries(acks);
    }
    
-   public void acknowledge(Ack ack) throws JMSException
+   public void acknowledgeDelivery(Ack ack) throws JMSException
    {
-      endpoint.acknowledge(ack);
+      endpoint.acknowledgeDelivery(ack);
    }
 
    public void addTemporaryDestination(JBossDestination destination) throws JMSException
@@ -132,6 +133,11 @@
       endpoint.cancelDeliveries(ackInfos);
    }
    
+   public void cancelDelivery(Cancel cancel) throws JMSException
+   {
+      endpoint.cancelDelivery(cancel);
+   }
+   
    public void recoverDeliveries(List ackInfos) throws JMSException
    {
       endpoint.recoverDeliveries(ackInfos);

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -46,6 +46,7 @@
 import org.jboss.jms.server.endpoint.Cancel;
 import org.jboss.jms.server.endpoint.ClientDelivery;
 import org.jboss.jms.server.endpoint.DefaultAck;
+import org.jboss.jms.server.endpoint.DefaultCancel;
 import org.jboss.jms.server.endpoint.DeliveryRecovery;
 import org.jboss.jms.tx.TransactionRequest;
 import org.jboss.logging.Logger;
@@ -92,14 +93,15 @@
 
    protected static final byte SERIALIZED = 0;   
    protected static final byte ACKNOWLEDGE = 1;
-   protected static final byte ACKNOWLEDGE_BATCH = 2;
-   protected static final byte SEND = 3;   
-   protected static final byte CANCEL_DELIVERIES = 4;
-   protected static final byte MORE = 5;
-   protected static final byte SEND_TRANSACTION = 6;
-   protected static final byte GET_ID_BLOCK = 7;
-   protected static final byte RECOVER_DELIVERIES = 8;
-   protected static final byte CONFIRM_DELIVERY = 9;
+   protected static final byte ACKNOWLEDGE_LIST = 2;
+   protected static final byte CANCEL = 3;
+   protected static final byte CANCEL_LIST = 4;
+   protected static final byte SEND = 5;   
+   protected static final byte MORE = 6;
+   protected static final byte SEND_TRANSACTION = 7;
+   protected static final byte GET_ID_BLOCK = 8;
+   protected static final byte RECOVER_DELIVERIES = 9;
+   protected static final byte CONFIRM_DELIVERY = 10;
  
 
    // The response codes - start from 100
@@ -231,7 +233,7 @@
    
                   if (trace) { log.trace("wrote activate()"); }
                }           
-               else if ("acknowledge".equals(methodName))
+               else if ("acknowledgeDelivery".equals(methodName))
                {
                   dos.writeByte(ACKNOWLEDGE);
    
@@ -243,11 +245,11 @@
    
                   dos.flush();
    
-                  if (trace) { log.trace("wrote acknowledge()"); }
+                  if (trace) { log.trace("wrote acknowledgeDelivery()"); }
                }
-               else if ("acknowledgeBatch".equals(methodName))
+               else if ("acknowledgeDeliveries".equals(methodName))
                {
-                  dos.writeByte(ACKNOWLEDGE_BATCH);
+                  dos.writeByte(ACKNOWLEDGE_LIST);
    
                   writeHeader(mi, dos);
                   
@@ -265,39 +267,27 @@
    
                   dos.flush();
    
-                  if (trace) { log.trace("wrote acknowledge()"); }
+                  if (trace) { log.trace("wrote acknowledgeDeliveries()"); }
                }
-               else if ("sendTransaction".equals(methodName))
+               else if ("cancelDelivery".equals(methodName))
                {
-                  dos.writeByte(SEND_TRANSACTION);
+                  dos.writeByte(CANCEL);
    
                   writeHeader(mi, dos);
-   
-                  TransactionRequest request = (TransactionRequest)mi.getArguments()[0];
-   
-                  request.write(dos);
-   
+                  
+                  Cancel cancel = (Cancel)mi.getArguments()[0];
+                  
+                  dos.writeLong(cancel.getDeliveryId());
+                  
+                  dos.writeInt(cancel.getDeliveryCount());
+                  
                   dos.flush();
    
-                  if (trace) { log.trace("wrote getMessageNow()"); }
+                  if (trace) { log.trace("wrote cancelDelivery()"); }
                }
-               else if ("getIdBlock".equals(methodName))
-               {
-                  dos.writeByte(GET_ID_BLOCK);
-   
-                  writeHeader(mi, dos);
-   
-                  int size = ((Integer)mi.getArguments()[0]).intValue();
-   
-                  dos.writeInt(size);
-   
-                  dos.flush();
-   
-                  if (trace) { log.trace("wrote getIdBlock()"); }
-               }           
                else if ("cancelDeliveries".equals(methodName) && mi.getArguments() != null)
                {
-                  dos.writeByte(CANCEL_DELIVERIES);
+                  dos.writeByte(CANCEL_LIST);
    
                   writeHeader(mi, dos);
    
@@ -310,7 +300,10 @@
                   while (iter.hasNext())
                   {
                      Cancel cancel = (Cancel)iter.next();
-                     cancel.write(dos);
+                     
+                     dos.writeLong(cancel.getDeliveryId());
+                     
+                     dos.writeInt(cancel.getDeliveryCount());
                   }
    
                   dos.flush();
@@ -353,6 +346,35 @@
    
                   if (trace) { log.trace("wrote confirmDelivery()"); }
                }
+               else if ("sendTransaction".equals(methodName))
+               {
+                  dos.writeByte(SEND_TRANSACTION);
+   
+                  writeHeader(mi, dos);
+   
+                  TransactionRequest request = (TransactionRequest)mi.getArguments()[0];
+   
+                  request.write(dos);
+   
+                  dos.flush();
+   
+                  if (trace) { log.trace("wrote getMessageNow()"); }
+               }
+               else if ("getIdBlock".equals(methodName))
+               {
+                  dos.writeByte(GET_ID_BLOCK);
+   
+                  writeHeader(mi, dos);
+   
+                  int size = ((Integer)mi.getArguments()[0]).intValue();
+   
+                  dos.writeInt(size);
+   
+                  dos.flush();
+   
+                  if (trace) { log.trace("wrote getIdBlock()"); }
+               }           
+               
                else
                {
                   dos.write(SERIALIZED);
@@ -669,11 +691,11 @@
                   new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
                                         new MessagingMarshallable(version, mi), null, null, null);
    
-               if (trace) { log.trace("read acknowledge()"); }
+               if (trace) { log.trace("read acknowledgeDelivery()"); }
    
                return request;
             }
-            case ACKNOWLEDGE_BATCH:
+            case ACKNOWLEDGE_LIST:
             {
                MethodInvocation mi = readHeader(dis);
                            
@@ -696,24 +718,46 @@
                   new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
                                         new MessagingMarshallable(version, mi), null, null, null);
    
-               if (trace) { log.trace("read acknowledge()"); }
+               if (trace) { log.trace("read acknowledgeDeliveries()"); }
    
                return request;
             }
-            case CANCEL_DELIVERIES:
+            case CANCEL:
             {
                MethodInvocation mi = readHeader(dis);
+               
+               long deliveryId = dis.readLong();
+               
+               int deliveryCount = dis.readInt();
+               
+               Object[] args = new Object[] {new DefaultCancel(deliveryId, deliveryCount)};
    
+               mi.setArguments(args);
+   
+               InvocationRequest request =
+                  new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+                                        new MessagingMarshallable(version, mi), null, null, null);
+   
+               if (trace) { log.trace("read cancelDelivery()"); }
+   
+               return request;
+            }
+            case CANCEL_LIST:
+            {
+               MethodInvocation mi = readHeader(dis);
+   
                int size = dis.readInt();
    
                List acks = new ArrayList(size);
    
                for (int i = 0; i < size; i++)
-               {
-                  Cancel cancel = new Cancel();
+               {                  
+                  long deliveryId = dis.readLong();
                   
-                  cancel.read(dis);
+                  int deliveryCount = dis.readInt();
                   
+                  DefaultCancel cancel = new DefaultCancel(deliveryId, deliveryCount);
+                                    
                   acks.add(cancel);
                }
    

Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -25,6 +25,7 @@
 import java.io.DataOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -139,7 +140,7 @@
       }
       else
       {
-         return sessionStatesMap.values();
+         return sessionStatesMap == null ? Collections.emptySet() : sessionStatesMap.values();
       }
    }   
    

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/tests/build.xml	2006-12-15 21:28:18 UTC (rev 1802)
@@ -759,7 +759,7 @@
                <include name="**/jms/clustering/*Test.class"/>
                 <include name="org/jboss/test/messaging/util/ServerManagementTest.class"/>
                -->
-               <include name="**/jms/clustering/HATest.class"/>
+               <include name="**/jms/clustering/*Test.class"/>
             </fileset>
          </batchtest>
       </junit>

Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -894,6 +894,8 @@
             count++;
                   
             TextMessage tm = (TextMessage)m;
+            
+            log.trace("Got message: " + tm.getText());            
                       
             // Receive first three messages then recover() session
             // Only last message should be redelivered

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -239,6 +239,84 @@
          if (connConsumer != null) connProducer.close();
       }
    }
+   
+   
+   
+   public void testRedeliveryTransactedDifferentConnection() throws Exception
+   {
+      if (ServerManagement.isRemote()) return;
+      
+      Connection connConnectionConsumer = null;
+      
+      Connection connConsumer = null;
+      
+      Connection connProducer = null;
+      
+      try
+      {
+         connConsumer = cf.createConnection();        
+         
+         connConsumer.start();
+                  
+         Session sessCons = connConsumer.createSession(true, Session.SESSION_TRANSACTED);
+         
+         RedelMessageListener listener = new RedelMessageListener(sessCons);
+         
+         sessCons.setMessageListener(listener);
+         
+         ServerSessionPool pool = new MockServerSessionPool(sessCons);
+         
+         connConnectionConsumer = cf.createConnection();
+         
+         connConnectionConsumer.start();
+         
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConnectionConsumer.createConnectionConsumer(queue, null, pool, 1);         
+         
+         log.trace("Started connection consumer");
+         
+         connProducer = cf.createConnection();
+            
+         Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProd.createProducer(queue);
+            
+         TextMessage m1 = sessProd.createTextMessage("a");
+         TextMessage m2 = sessProd.createTextMessage("b");
+         TextMessage m3 = sessProd.createTextMessage("c");
+         prod.send(m1);
+         prod.send(m2);
+         prod.send(m3);
+         
+         
+         log.trace("Sent messages");
+         
+         //Wait for messages
+         
+         listener.waitForLatch(10000);                  
+         
+         if (listener.failed)
+         {
+            fail ("Didn't receive correct messages");
+         }
+         
+         cc.close();
+         
+         log.trace("Closed connection consumer");
+         
+         connProducer.close();
+         connProducer = null;
+         connConsumer.close();
+         connConsumer = null;
+         connConnectionConsumer.close();
+         connConnectionConsumer = null;
+    
+      }
+      finally 
+      {
+         if (connConsumer != null) connConsumer.close();
+         if (connConsumer != null) connProducer.close();
+         if (connConnectionConsumer != null) connConnectionConsumer.close();
+      }
+   }
 
    public void testCloseWhileProcessing() throws Exception
    {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-12-15 21:28:18 UTC (rev 1802)
@@ -48,6 +48,7 @@
 import org.jboss.jms.server.endpoint.Cancel;
 import org.jboss.jms.server.endpoint.ClientDelivery;
 import org.jboss.jms.server.endpoint.DefaultAck;
+import org.jboss.jms.server.endpoint.DefaultCancel;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.jms.server.remoting.JMSWireFormat;
 import org.jboss.jms.server.remoting.MessagingMarshallable;
@@ -93,10 +94,12 @@
    
    protected Method sendMethod;
    
-   protected Method acknowledgeMethod;
+   protected Method acknowledgeDeliveryMethod;
    
-   protected Method acknowledgeBatchMethod;
+   protected Method acknowledgeDeliveriesMethod;
    
+   protected Method cancelDeliveryMethod;
+   
    protected Method cancelDeliveriesMethod;
    
    //Consumer
@@ -135,10 +138,12 @@
       
       sendMethod = sessionDelegate.getMethod("send", new Class[] { JBossMessage.class });
        
-      acknowledgeMethod = sessionDelegate.getMethod("acknowledge", new Class[] { Ack.class });
+      acknowledgeDeliveryMethod = sessionDelegate.getMethod("acknowledgeDelivery", new Class[] { Ack.class });
       
-      acknowledgeBatchMethod = sessionDelegate.getMethod("acknowledgeBatch", new Class[] { java.util.List.class });
+      acknowledgeDeliveriesMethod = sessionDelegate.getMethod("acknowledgeDeliveries", new Class[] { java.util.List.class });
       
+      cancelDeliveryMethod = sessionDelegate.getMethod("cancelDelivery", new Class[] { Cancel.class });
+            
       cancelDeliveriesMethod = sessionDelegate.getMethod("cancelDeliveries", new Class[] { java.util.List.class });
       
       //TODO - this isn't complete - there are other methods to test
@@ -161,16 +166,26 @@
    
    //Session
    
-   public void testAcknowledge() throws Exception
+   public void testAcknowledgeDelivery() throws Exception
    {
-      wf.testAcknowledge();
+      wf.testAcknowledgeDelivery();
    }
    
-   public void testAcknowledgeBatch() throws Exception
+   public void testAcknowledgeDeliveries() throws Exception
    {
-      wf.testAcknowledgeBatch();
+      wf.testAcknowledgeDeliveries();
    }
    
+   public void testCancelDelivery() throws Exception
+   {
+      wf.testCancelDelivery();
+   }
+   
+   public void testCancelDeliveries() throws Exception
+   {
+      wf.testCancelDeliveries();
+   }
+   
    public void testSend() throws Exception
    {
       wf.testSend();
@@ -183,10 +198,6 @@
       wf.testMore();
    }
    
-   public void testCancelDeliveries() throws Exception
-   {
-      wf.testCancelDeliveries();
-   }
    
    //Connection
    
@@ -260,13 +271,13 @@
     */
    class TestWireFormat extends JMSWireFormat
    {      
-      public void testAcknowledge() throws Exception
+      public void testAcknowledgeDelivery() throws Exception
       {
          long methodHash = 62365354;
          
          int objectId = 54321;
          
-         MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeMethod, acknowledgeMethod, null);
+         MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeDeliveryMethod, acknowledgeDeliveryMethod, null);
          
          mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
          
@@ -347,13 +358,13 @@
          
       }
       
-      public void testAcknowledgeBatch() throws Exception
+      public void testAcknowledgeDeliveries() throws Exception
       {
          long methodHash = 62365354;
          
          int objectId = 54321;
          
-         MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeBatchMethod, acknowledgeBatchMethod, null);
+         MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeDeliveriesMethod, acknowledgeDeliveriesMethod, null);
          
          mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
          
@@ -396,7 +407,7 @@
          assertEquals(77, dis.readByte());
          
          //First byte should be ACKNOWLEDGE
-         assertEquals(JMSWireFormat.ACKNOWLEDGE_BATCH, dis.readByte());
+         assertEquals(JMSWireFormat.ACKNOWLEDGE_LIST, dis.readByte());
          
          //Next int should be objectId
          assertEquals(objectId, dis.readInt());
@@ -456,7 +467,104 @@
          
       }
       
+      public void testCancelDelivery() throws Exception
+      {
+         long methodHash = 6236354;
+         
+         int objectId = 543271;
+         
+         MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveryMethod, cancelDeliveryMethod, null);
+         
+         mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
+         
+         long deliveryID = 765;
+         
+         int deliveryCount = 12;
+         
+         Cancel cancel = new DefaultCancel(deliveryID, deliveryCount);
+         
+         Object[] args = new Object[] { cancel };
+         
+         mi.setArguments(args);
+         
+         MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+         
+         InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+         
+         ByteArrayOutputStream bos = new ByteArrayOutputStream();
+         
+         OutputStream oos = new DataOutputStream(bos);
+                  
+         wf.write(ir, oos);
+         
+         oos.flush();
+         
+         byte[] bytes = bos.toByteArray();
+         
+         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+                  
+         DataInputStream dis = new DataInputStream(bis); 
+                 
+         //Check the bytes
+         
+         //First byte should be version
+         assertEquals(77, dis.readByte());
+         
+         //First byte should be CANCEL
+         assertEquals(JMSWireFormat.CANCEL, dis.readByte());
+         
+         //Next int should be objectId
+         assertEquals(objectId, dis.readInt());
+         
+         //Next long should be methodHash
+         assertEquals(methodHash, dis.readLong());
+         
+         //Next should be the deliveryid
+         long l = dis.readLong();
+         
+         //Then delivery count
+         int count = dis.readInt();
+         
+         assertEquals(deliveryID, l);
+         
+         assertEquals(deliveryCount, count);
+
+         //Now eos
+         try
+         {
+            dis.readByte();
+            fail("End of stream expected");
+         }
+         catch (EOFException e)
+         {
+            //Ok
+         }
+         
+         bis.reset();
+         
+         InputStream ois = new DataInputStream(bis);
+         
+         InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
+         
+         mm = (MessagingMarshallable)ir2.getParameter();
+         
+         assertEquals(77, mm.getVersion());
+         
+         MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+         
+         assertEquals(methodHash, mi2.getMethodHash());
+         
+         assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
+         
+         Cancel l2 = (Cancel)mi2.getArguments()[0];
+         
+         assertEquals(deliveryID, l2.getDeliveryId());
+         
+         assertEquals(deliveryCount, l2.getDeliveryCount());
+         
+      }
       
+      
       /*
        * Test that general serializable invocation requests are marshalled correctky
        */
@@ -729,7 +837,7 @@
          
          MessageProxy proxy = JBossMessage.createThinDelegate(deliveryId, m, deliveryCount);
                      
-         DeliveryInfo info = new DeliveryInfo(proxy, 76762, 98982);
+         DeliveryInfo info = new DeliveryInfo(proxy, 76762, 98982, null);
          
          int sessionId = 8787;
          
@@ -862,8 +970,8 @@
          
          List cancels = new ArrayList();
          
-         Cancel cancel1 = new Cancel(65654, 43);
-         Cancel cancel2 = new Cancel(65765, 2);
+         DefaultCancel cancel1 = new DefaultCancel(65654, 43);
+         DefaultCancel cancel2 = new DefaultCancel(65765, 2);
          cancels.add(cancel1);
          cancels.add(cancel2);
          
@@ -897,7 +1005,7 @@
          assertEquals(77, dis.readByte());
          
          //Next byte should be CANCEL_MESSAGES
-         assertEquals(JMSWireFormat.CANCEL_DELIVERIES, dis.readByte());
+         assertEquals(JMSWireFormat.CANCEL_LIST, dis.readByte());
          
          //Next int should be objectId
          assertEquals(objectId, dis.readInt());
@@ -912,14 +1020,14 @@
          assertEquals(2, size);
          
          //then the AckInfos
-         Cancel rcancel1 = new Cancel();
+         long deliveryId = dis.readLong();
+         int deliveryCount = dis.readInt();
+         DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount);
          
-         Cancel rcancel2 = new Cancel();
-         
-         rcancel1.read(dis);
-         
-         rcancel2.read(dis);
-         
+         deliveryId = dis.readLong();
+         deliveryCount = dis.readInt();
+         DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount);
+
          assertEquals(cancel1.getDeliveryCount(), rcancel1.getDeliveryCount());
          
          assertEquals(cancel1.getDeliveryId(), cancel1.getDeliveryId());
@@ -961,8 +1069,8 @@
         
          assertEquals(2, list.size());
          
-         Cancel xack1 = (Cancel)list.get(0);
-         Cancel xack2 = (Cancel)list.get(1);
+         DefaultCancel xack1 = (DefaultCancel)list.get(0);
+         DefaultCancel xack2 = (DefaultCancel)list.get(1);
          
          assertEquals(cancel1.getDeliveryId(), xack1.getDeliveryId());
          




More information about the jboss-cvs-commits mailing list