[jboss-cvs] JBoss Messaging SVN: r1849 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting tests tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/message tests/src/org/jboss/test/messaging/tools/jmx

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Dec 22 15:55:43 EST 2006


Author: timfox
Date: 2006-12-22 15:55:26 -0500 (Fri, 22 Dec 2006)
New Revision: 1849

Removed:
   trunk/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java
Modified:
   trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
   trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
   trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
Mainly http://jira.jboss.com/jira/browse/JBMESSAGING-657



Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -72,10 +72,9 @@
       ConnectionState connectionState = (ConnectionState)sessionState.getParent();
       SessionDelegate sessionDelegate = (SessionDelegate)invocation.getTargetObject();
       ConsumerState consumerState = (ConsumerState)((DelegateSupport)consumerDelegate).getState();
-      int serverId = connectionState.getServerID();
       int consumerID = consumerState.getConsumerID();
       long channelID = consumerState.getChannelId();
-      int prefetchSize = consumerState.getPrefetchSize();
+      int prefetchSize = consumerState.getBufferSize();
       QueuedExecutor sessionExecutor = sessionState.getExecutor();
       int maxDeliveries = consumerState.getMaxDeliveries();
       
@@ -94,31 +93,43 @@
       
       //Now we have finished creating the client consumer, we can tell the SCD
       //we are ready
-      consumerDelegate.more();
+      consumerDelegate.changeRate(1);
 
       return consumerDelegate;
    }
    
    public Object handleClosing(Invocation invocation) throws Throwable
    {      
-      // First we make sure closing is called on the ServerConsumerEndpoint. This ensures that any
-      // in-transit messages are flushed out to the client side.
-
-      Object res = invocation.invokeNext();
+      ConsumerState consumerState = getState(invocation);
       
-      ConsumerState consumerState = getState(invocation);
-      SessionState sessionState = (SessionState)consumerState.getParent();
-      ConnectionState connectionState = (ConnectionState)sessionState.getParent();
-            
-      // Then we call close on the messagecallbackhandler which waits for onMessage invocations
+      // First we call close on the messagecallbackhandler which waits for onMessage invocations      
       // to complete and then cancels anything in the client buffer.
+      // any further messages received will be ignored
       consumerState.getMessageCallbackHandler().close();
       
+      long lastDeliveryId = consumerState.getMessageCallbackHandler().getLastDeliveryId();
+      
+      SessionState sessionState = (SessionState)consumerState.getParent();
+      ConnectionState connectionState = (ConnectionState)sessionState.getParent();
+                 
       sessionState.removeCallbackHandler(consumerState.getMessageCallbackHandler());
 
       CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
       cm.unregisterHandler(consumerState.getConsumerID());
             
+      // Then we make sure closing is called on the ServerConsumerEndpoint.
+
+      Object res = invocation.invokeNext();
+      
+      //Now we send a message to the server consumer with the last delivery id so
+      //it can cancel any inflight messages after that
+      //This needs to be done *after* the call to closing has been executed on the server
+      //maybe it can be combined with closing
+      
+      ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
+         
+      del.cancelInflightMessages(lastDeliveryId);
+                                   
       return res;
    }      
    

Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -38,7 +38,6 @@
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.server.endpoint.DefaultCancel;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
-import org.jboss.jms.tx.ClientTransaction;
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.logging.Logger;
 
@@ -67,31 +66,7 @@
    // Constructors --------------------------------------------------
    
    // Public --------------------------------------------------------
-
-   private void ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
-   {
-      SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
-      
-      //If the delivery was obtained via a connection consumer we need to ack via that
-      //otherwise we just use this session
-      
-      SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
-      
-      sessionToUse.acknowledgeDelivery(delivery);      
-   }
    
-   private void cancelDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
-   {
-      SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
-      
-      //If the delivery was obtained via a connection consumer we need to cancel via that
-      //otherwise we just use this session
-      
-      SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
-      
-      sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(), delivery.getMessageProxy().getDeliveryCount()));      
-   }
-   
    public Object handleClosing(Invocation invocation) throws Throwable
    {
       MethodInvocation mi = (MethodInvocation)invocation;
@@ -176,6 +151,11 @@
          state.getClientAckList().clear();
       }
       
+      
+      //TODO - we should also cancel any deliveries remaining in any transaction for the session
+      //so the delivery count gets updated to the server, and not rely on the server side close
+      //cancelling them
+      
       return invocation.invokeNext();
    }
 
@@ -460,6 +440,30 @@
    {
       return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
    }
+   
+   private void ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+   {
+      SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
+      
+      //If the delivery was obtained via a connection consumer we need to ack via that
+      //otherwise we just use this session
+      
+      SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
+      
+      sessionToUse.acknowledgeDelivery(delivery);      
+   }
+   
+   private void cancelDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+   {
+      SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
+      
+      //If the delivery was obtained via a connection consumer we need to cancel via that
+      //otherwise we just use this session
+      
+      SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
+      
+      sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(), delivery.getMessageProxy().getDeliveryCount()));      
+   }
 
    // Inner Classes -------------------------------------------------
    

Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -169,7 +169,7 @@
 
       int consumerID = consumerDelegate.getID();
 
-      int prefetchSize = consumerDelegate.getPrefetchSize();
+      int bufferSize = consumerDelegate.getBufferSize();
 
       int maxDeliveries = consumerDelegate.getMaxDeliveries();
 
@@ -177,7 +177,7 @@
 
       ConsumerState consumerState =
          new ConsumerState(sessionState, consumerDelegate, dest, selector, noLocal,
-                           subscriptionName, consumerID, connectionConsumer, prefetchSize,
+                           subscriptionName, consumerID, connectionConsumer, bufferSize,
                            maxDeliveries, channelId);
 
       delegate.setState(consumerState);

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -71,12 +71,12 @@
    }
 
    // ConsumerDelegate implementation -------------------------------
-
+   
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void more()
+   public void cancelInflightMessages(long lastDeliveryId) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
@@ -85,6 +85,15 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
+   public void changeRate(float newRate)
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
    public void close() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
@@ -162,16 +171,6 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void confirmDelivery(int count)
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-
    // Public --------------------------------------------------------
 
    public String toString()
@@ -179,7 +178,7 @@
       return "ConsumerDelegate[" + id + "](ChannelId=" + this.channelId+")" ;
    }
    
-   public int getPrefetchSize()
+   public int getBufferSize()
    {
       return bufferSize;
    }
@@ -198,7 +197,7 @@
    {
       super.copyAttributes(newDelegate);
       
-      this.bufferSize = ((ClientConsumerDelegate)newDelegate).getPrefetchSize();
+      this.bufferSize = ((ClientConsumerDelegate)newDelegate).getBufferSize();
       
       this.maxDeliveries = ((ClientConsumerDelegate)newDelegate).getMaxDeliveries();
       

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -21,9 +21,9 @@
   */
 package org.jboss.jms.client.remoting;
 
-import java.util.List;
 import java.util.Map;
 
+import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.server.endpoint.ClientDelivery;
 import org.jboss.jms.server.remoting.MessagingMarshallable;
 import org.jboss.logging.Logger;
@@ -53,13 +53,21 @@
 {
    // Constants -----------------------------------------------------
 
-   protected static final Logger log = Logger.getLogger(CallbackManager.class);
+   protected static final Logger log;
 
    public static final String JMS_CALLBACK_SUBSYSTEM = "CALLBACK";
 
    // Static --------------------------------------------------------
 
    protected static CallbackManager theManager;
+   
+   private static boolean trace;      
+   
+   static
+   {
+      log = Logger.getLogger(CallbackManager.class);
+      trace = log.isTraceEnabled();
+   }
 
    // Attributes ----------------------------------------------------
 
@@ -79,16 +87,20 @@
    {
       MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
       ClientDelivery dr = (ClientDelivery)mm.getLoad();
-      List msgs = dr.getMessages();
+      MessageProxy msg = dr.getMessage();
 
       MessageCallbackHandler handler = (MessageCallbackHandler)callbackHandlers.get(new Integer(dr.getConsumerId()));
 
       if (handler == null)
       {
-         throw new IllegalStateException("Cannot find handler for consumer: " + dr.getConsumerId());
+         //This is OK and can happen if the callback handler is deregistered on consumer close,
+         //but there are messages still in transit which arrive later.
+         //In this case it is just safe to ignore the message
+         if (trace) { log.trace(this + " callback handler not found, message arrived after consumer is closed"); }
+         return;
       }
 
-      handler.handleMessage(msgs);
+      handler.handleMessage(msg);
    }
 
    // Public --------------------------------------------------------

Deleted: trunk/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -1,86 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.remoting;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.jboss.messaging.util.Streamable;
-
-/**
- * A HandleMessageResponse
- * 
- * This is the response the server gets after delivering messages to a client consumer
-
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class HandleMessageResponse implements Streamable
-{
-   private static final long serialVersionUID = 2500443290413453569L;
-
-   private boolean full;
-   
-   private int messagesAccepted;
-   
-   public HandleMessageResponse()
-   {      
-   }
-   
-   public HandleMessageResponse(boolean full, int messagesAccepted)
-   {
-      this.full = full;
-      
-      this.messagesAccepted = messagesAccepted;
-   }
-   
-   public boolean clientIsFull()
-   {
-      return full;
-   }
-   
-   public int getNumberAccepted()
-   {
-      return messagesAccepted;
-   }
-   
-   
-   // Streamable implementation
-   // ---------------------------------------------------------------
-   
-   public void write(DataOutputStream out) throws Exception
-   {
-      out.writeBoolean(full);
-      
-      out.writeInt(messagesAccepted);
-   }
-
-   public void read(DataInputStream in) throws Exception
-   {
-      full = in.readBoolean();
-      
-      messagesAccepted = in.readInt();
-   }
-}

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -165,12 +165,14 @@
    private int ackMode;
    private boolean closed;
    private Object mainLock;
-   private boolean serverSending;
-   private int bufferSize;
+   private int maxBufferSize;
+   private int minBufferSize;
    private QueuedExecutor sessionExecutor;
    private boolean listenerRunning;
    private int maxDeliveries;
    private long channelID;
+   private boolean startSendingMessageSent;
+   private long lastDeliveryId = -1;
         
    // Constructors --------------------------------------------------
 
@@ -185,7 +187,8 @@
          throw new IllegalArgumentException(this + " bufferSize must be > 0");
       }
               
-      this.bufferSize = bufferSize;
+      this.maxBufferSize = bufferSize;
+      this.minBufferSize = bufferSize / 2;
       buffer = new LinkedList();
       isConnectionConsumer = isCC;
       this.ackMode = ackMode;
@@ -193,77 +196,55 @@
       this.consumerDelegate = cons;
       this.consumerID = consumerID;
       this.channelID = channelID;
-      this.serverSending = true;
       mainLock = new Object();
       this.sessionExecutor = sessionExecutor;
       this.maxDeliveries = maxDeliveries;
+      this.startSendingMessageSent = true;
    }
         
    // Public --------------------------------------------------------
       
 
    /**
-    * Handles a list of messages sent from the server
-    * @param msgs The list of messages
-    * @return The number of messages handled (placeholder for future - now we always accept all messages)
-    *         or -1 if closed
+    * Handles a message sent from the server
+    * @param msgs The message
     */
-   public HandleMessageResponse handleMessage(List msgs) throws HandleCallbackException
+   public void handleMessage(MessageProxy msg)
    {                      
-      if (trace)
-      {
-         StringBuffer sb = new StringBuffer(this + " receiving [");
-         for(int i = 0; i < msgs.size(); i++)
-         {
-            sb.append(((MessageProxy)msgs.get(i)).getMessage().getMessageID());
-            if (i < msgs.size() - 1)
-            {
-               sb.append(",");
-            }
-         }
-         sb.append("] from the remoting layer");
-         log.trace(sb.toString());
-      }
+      if (trace) { log.trace("Receiving message " + msg + " from the remoting layer"); }
 
       synchronized (mainLock)
       {
          if (closed)
          {             
             // Ignore
-            return new HandleMessageResponse(false, 0);
+            if (trace) { log.trace(this + " is closed, so ignore message"); }
+            return;
          }
 
-         // Asynchronously confirm delivery on client
-
-         try
-         {
-            sessionExecutor.execute(new ConfirmDelivery(msgs.size()));
-         }
-         catch (InterruptedException e)
-         {
-            log.warn("Thread interrupted", e);
-         }
-
-         // Put the messages in the buffer and notify any waiting receive()
-         
-         processMessages(msgs);
+         msg.setSessionDelegate(sessionDelegate, isConnectionConsumer);
+                  
+         msg.setReceived();                  
                    
-         buffer.addAll(msgs);                  
+         //Add it to the buffer
+         buffer.add(msg);         
          
+         lastDeliveryId = msg.getDeliveryId();
+         
          if (trace) { log.trace(this + " added message(s) to the buffer"); }
          
-         boolean full = buffer.size() >= bufferSize;         
+         messageAdded();
          
-         messagesAdded();
-         
-         if (full)
+         if (buffer.size() >= maxBufferSize)
          {
-            serverSending = false;
             if (trace) { log.trace(this + " is full"); }
+            
+            //We are full. Send message to server to tell it to stop sending
+            
+            startSendingMessageSent = false;
+            
+            sendChangeRateMessage(0);
          }
-                                          
-         // For now we always accept all messages - in the future this may change
-         return new HandleMessageResponse(full, msgs.size());
       }
    }
          
@@ -282,6 +263,7 @@
          if (listener != null && !buffer.isEmpty())
          {  
             listenerRunning = true;
+            
             this.queueRunner(new ListenerRunner());
          }        
       }   
@@ -331,7 +313,9 @@
          for(Iterator i = buffer.iterator(); i.hasNext();)
          {
             MessageProxy mp = (MessageProxy)i.next();
+            
             DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
+            
             cancels.add(ack);
          }
                
@@ -342,34 +326,6 @@
       
       if (trace) { log.trace(this + " closed"); }
    }
-   
-   private void waitForOnMessageToComplete()
-   {
-      // Wait for any onMessage() executions to complete
-
-      if (Thread.currentThread().equals(sessionExecutor.getThread()))
-      {
-         // the current thread already closing this MessageCallbackHandler (this happens when the
-         // session is closed from within the MessageListener.onMessage(), for example), so no need
-         // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
-         return;
-      }
-
-      Future result = new Future();
-      
-      try
-      {
-         sessionExecutor.execute(new Closer(result));
-
-         if (trace) { log.trace(this + " blocking wait for Closer execution"); }
-         result.getResult();
-         if (trace) { log.trace(this + " got Closer result"); }
-      }
-      catch (InterruptedException e)
-      {
-         log.warn("Thread interrupted", e);
-      }
-   }
      
    /**
     * Method used by the client thread to get a Message, if available.
@@ -458,8 +414,6 @@
                   sessionDelegate.postDeliver(false);
                }
                
-               //postDeliver(sessionDelegate, isConnectionConsumer, false);
-               
                if (!m.getMessage().isExpired())
                {
                   if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
@@ -486,11 +440,13 @@
       } 
       
       //This needs to be outside the lock
-      if (buffer.isEmpty() && !serverSending)
+      if (!startSendingMessageSent && buffer.size() <= minBufferSize)
       {
-         //The server has previously stopped sending because the buffer was full
-         //but now it is empty, so we tell the server to start sending again
-         consumerDelegate.more();
+         //Tell the server we need more messages - but we don't want to keep sending the message
+         //if we've already sent it - hence the check
+         startSendingMessageSent = true;
+            
+         sendChangeRateMessage(1);                    
       }
       
       m.incDeliveryCount();
@@ -498,7 +454,6 @@
       return m;
    }    
    
-
    public MessageListener getMessageListener()
    {
       return listener;      
@@ -525,16 +480,121 @@
       {
          buffer.addFirst(proxy);
          
-         messagesAdded();
+         messageAdded();
       }
    }
+   
+   public void copyState(MessageCallbackHandler newHandler)
+   {
+      synchronized (mainLock)
+      {
+         this.consumerID = newHandler.consumerID;
+         
+         this.consumerDelegate = newHandler.consumerDelegate;
+         
+         this.sessionDelegate = newHandler.sessionDelegate;
+         
+         this.buffer.clear();
+      }
+   }
+   
+   public long getLastDeliveryId()
+   {
+      synchronized (mainLock)
+      {
+         return lastDeliveryId;
+      }
+   }
      
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
+            
+   // Private -------------------------------------------------------
    
-   protected long waitOnLock(Object lock, long waitTime) throws InterruptedException
+   private void waitForOnMessageToComplete()
    {
+      // Wait for any onMessage() executions to complete
+
+      if (Thread.currentThread().equals(sessionExecutor.getThread()))
+      {
+         // the current thread already closing this MessageCallbackHandler (this happens when the
+         // session is closed from within the MessageListener.onMessage(), for example), so no need
+         // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+         return;
+      }
+
+      Future result = new Future();
+      
+      try
+      {
+         sessionExecutor.execute(new Closer(result));
+
+         if (trace) { log.trace(this + " blocking wait for Closer execution"); }
+         result.getResult();
+         if (trace) { log.trace(this + " got Closer result"); }
+      }
+      catch (InterruptedException e)
+      {
+         log.warn("Thread interrupted", e);
+      }
+   }
+   
+   private void sendChangeRateMessage(float newRate) 
+   {
+      //FIXME - We should be able to execute this invocation as a true
+      //remoting asynchronous invocation - i.e. it is written to the transport
+      //and no response is waited for
+      //Therefore there is no need to execute it here on a separate thread.
+      //Unfortunately remoting does not currently support this so this
+      //will be SLOW now.
+      try
+      {
+         consumerDelegate.changeRate(newRate);
+      }
+      catch (JMSException e)
+      {
+         log.error("Failed to send changeRate message", e);
+      }
+   }
+   
+   private void queueRunner(ListenerRunner runner)
+   {
+      try
+      {
+         this.sessionExecutor.execute(runner);
+      }
+      catch (InterruptedException e)
+      {
+         log.warn("Thread interrupted", e);
+      }
+   }
+   
+   private void messageAdded()
+   {
+      // If we have a thread waiting on receive() we notify it
+      if (receiverThread != null)
+      {
+         if (trace) { log.trace(this + " notifying receiver thread"); }            
+         mainLock.notify();
+      }     
+      else if (listener != null)
+      { 
+         // We have a message listener
+         if (!listenerRunning)
+         {
+            listenerRunning = true;
+
+            if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
+            this.queueRunner(new ListenerRunner());
+         }     
+         
+         //TODO - Execute onMessage on same thread for even better throughput 
+      }
+   }
+   
+   private long waitOnLock(Object lock, long waitTime) throws InterruptedException
+   {
       long start = System.currentTimeMillis();
       
       // Wait for last message to arrive
@@ -545,6 +605,7 @@
       if (waited < waitTime)
       {
          waitTime = waitTime - waited;
+         
          return waitTime;
       }
       else
@@ -553,7 +614,7 @@
       }     
    }
         
-   protected MessageProxy getMessage(long timeout) throws JMSException
+   private MessageProxy getMessage(long timeout) throws JMSException
    {
       if (timeout == -1)
       {
@@ -608,62 +669,8 @@
       return m;
    }
    
-   protected void processMessages(List msgs)
-   {
-      Iterator iter = msgs.iterator();
-      
-      while (iter.hasNext())
-      {         
-         MessageProxy msg = (MessageProxy)iter.next();
-      
-         // If this is the handler for a connection consumer we don't want to set the session
-         // delegate since this is only used for client acknowledgement which is illegal for a
-         // session used for an MDB
-         msg.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-                  
-         msg.setReceived();
-      }
-   }
-   
-   // Private -------------------------------------------------------
-   
-   private void queueRunner(ListenerRunner runner)
-   {
-      try
-      {
-         this.sessionExecutor.execute(runner);
-      }
-      catch (InterruptedException e)
-      {
-         log.warn("Thread interrupted", e);
-      }
-   }
-   
-   private void messagesAdded()
-   {
-      // If we have a thread waiting on receive() we notify it
-      if (receiverThread != null)
-      {
-         if (trace) { log.trace(this + " notifying receiver thread"); }            
-         mainLock.notify();
-      }     
-      else if (listener != null)
-      { 
-         // We have a message listener
-         if (!listenerRunning)
-         {
-            listenerRunning = true;
-
-            if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
-            this.queueRunner(new ListenerRunner());
-         }     
-         
-         //TODO - Execute onMessage on same thread for even better throughput 
-      }
-   }
-   
    // Inner classes -------------------------------------------------   
-   
+         
    /*
     * This class is used to put on the listener executor to wait for onMessage
     * invocations to complete when closing
@@ -703,7 +710,9 @@
             if (listener == null)
             {
                listenerRunning = false;
+               
                if (trace) { log.trace("no listener, returning"); }
+               
                return;
             }
             
@@ -712,6 +721,7 @@
             if (buffer.isEmpty())
             {
                listenerRunning = false;
+               
                if (trace) { log.trace("no messages in buffer, marking listener as not running"); }
             }
             else
@@ -745,65 +755,23 @@
             } 
          }
          
+         
+         //Tell the server we need more messages - but we don't want to keep sending the message
+         //if we've already sent it - hence the check
+         if (!startSendingMessageSent && buffer.size() <= minBufferSize)
+         {                    
+            startSendingMessageSent = true;
+            
+            sendChangeRateMessage(1);
+         } 
+         
          if (again)
          {
             // Queue it up again
             queueRunner(this);
-         }
-         else
-         {
-            if (!serverSending)
-            {
-               // Ask server for more messages
-               try
-               {
-                  consumerDelegate.more();
-               }
-               catch (JMSException e)
-               {
-                  log.error("Failed to execute more()", e);
-               }
-               return;
-            }
-         }
+         }                                               
       }
-   }
-
-   /*
-    * Used to asynchronously confirm to the server message arrival (delivery) on client.
-    */
-   private class ConfirmDelivery implements Runnable
-   {
-      int count;
-
-      ConfirmDelivery(int count)
-      {
-         this.count = count;
-      }
-
-      public void run()
-      {
-         if (trace) { log.trace("confirming delivery on client of " + count + " message(s)"); }
-         consumerDelegate.confirmDelivery(count);
-      }
-   }
-   
-   public void copyState(MessageCallbackHandler newHandler)
-   {
-      synchronized (mainLock)
-      {
-         this.consumerID = newHandler.consumerID;
-         
-         this.consumerDelegate = newHandler.consumerDelegate;
-         
-         this.sessionDelegate = newHandler.sessionDelegate;
-         
-         this.serverSending = false;
-         
-         this.buffer.clear();
-      }
-   }
-
+   }   
 }
 
 

Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -55,7 +55,7 @@
 
    private MessageCallbackHandler messageCallbackHandler;
 
-   private int prefetchSize;
+   private int bufferSize;
    
    private SessionState parent;
    
@@ -68,7 +68,7 @@
    
    public ConsumerState(SessionState parent, ConsumerDelegate delegate, Destination dest,
                         String selector, boolean noLocal, String subscriptionName, int consumerID,
-                        boolean isCC, int prefetchSize, int maxDeliveries, long channelId)
+                        boolean isCC, int bufferSize, int maxDeliveries, long channelId)
    {
       super(parent, (DelegateSupport)delegate);
       children = Collections.EMPTY_SET;
@@ -77,7 +77,7 @@
       this.noLocal = noLocal;
       this.consumerID = consumerID;
       this.isConnectionConsumer = isCC;
-      this.prefetchSize = prefetchSize;
+      this.bufferSize = bufferSize;
       this.subscriptionName=subscriptionName;
       this.maxDeliveries = maxDeliveries;
       this.channelId = channelId;
@@ -134,9 +134,9 @@
       return parent.getVersionToUse();
    }
 
-   public int getPrefetchSize()
+   public int getBufferSize()
    {
-      return prefetchSize;
+      return bufferSize;
    }
 
    public HierarchicalState getParent()

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -223,12 +223,7 @@
    {
       callbackHandlers.remove(new Integer(handler.getConsumerId()));
    }
-   
-   public List getCallbackHandlers()
-   {
-      return new ArrayList(callbackHandlers.values());
-   }
-   
+      
    public int getSessionId()
    {
       return sessionId;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -23,9 +23,6 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
 
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
@@ -35,7 +32,7 @@
 /**
  * 
  * A ClientDelivery
- * Encapsulates a delivery of some messages to a client consumer
+ * Encapsulates a delivery of a messages to a client consumer
  * 
  * There is no need to specify the server id since the client side CallbackManager is
  * unique to the remoting connection
@@ -54,7 +51,7 @@
    
    // Attributes ----------------------------------------------------
    
-   private List msgs;
+   private MessageProxy msg;
          
    private int consumerId;
     
@@ -64,9 +61,9 @@
    {      
    }
 
-   public ClientDelivery(List msgs, int consumerId)
+   public ClientDelivery(MessageProxy msg, int consumerId)
    {
-      this.msgs = msgs;
+      this.msg = msg;
       
       this.consumerId = consumerId;
    }
@@ -78,55 +75,37 @@
    {
       out.writeInt(consumerId);
       
-      out.writeInt(msgs.size());
+      out.writeByte(msg.getMessage().getType());
+
+      out.writeInt(msg.getDeliveryCount());
       
-      Iterator iter = msgs.iterator();
-      
-      while (iter.hasNext())
-      {
-         MessageProxy mp = (MessageProxy)iter.next();
-         
-         out.writeByte(mp.getMessage().getType());
+      out.writeLong(msg.getDeliveryId());
 
-         out.writeInt(mp.getDeliveryCount());
-         
-         out.writeLong(mp.getDeliveryId());
-
-         mp.getMessage().write(out);
-      }      
+      msg.getMessage().write(out);          
    }
 
    public void read(DataInputStream in) throws Exception
    {
       consumerId = in.readInt();
       
-      int numMessages = in.readInt();
+      byte type = in.readByte();
       
-      msgs = new ArrayList(numMessages);
+      int deliveryCount = in.readInt();
       
-      for (int i = 0; i < numMessages; i++)
-      {
-         byte type = in.readByte();
-         
-         int deliveryCount = in.readInt();
-         
-         long deliveryId = in.readLong();
-         
-         JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
+      long deliveryId = in.readLong();
+      
+      JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
 
-         m.read(in);
+      m.read(in);
 
-         MessageProxy md = JBossMessage.createThinDelegate(deliveryId, m, deliveryCount);
-         
-         msgs.add(md);
-      }      
+      msg = JBossMessage.createThinDelegate(deliveryId, m, deliveryCount); 
    }
 
    // Public --------------------------------------------------------
    
-   public List getMessages()
+   public MessageProxy getMessage()
    {
-      return msgs;
+      return msg;
    }
    
    public int getConsumerId()

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -39,23 +39,15 @@
 public interface ConsumerEndpoint extends Closeable
 {  
    /**
-    * If the client buffer has previously become full because the server was sending at a faster
-    * rate than the client could consume, then the server will stop sending messages. When the
-    * client has emptied the buffer it then needs to inform the server that it can receive more
-    * messages by calling this method.
-    *
-    * @throws JMSException
+    * Sent to the server to specify a new maximum rate at which to send messages at
     */
-   void more() throws JMSException;
-
+   void changeRate(float newRate) throws JMSException;
+   
+   
    /**
-    * The server consumer endpoint needs to know at any time how messages are in transit between
-    * server and client. That is why it needs to receive confirmations every time the client
-    * received one (or more) messages. The confirmation is sent asynchronously from client to server.
-    * This is NOT a consumption acknowledgment.
-    *
-    * @param count - the number of messages received by the client in one batch.
+    * Cancels any deliveries with a delivery id > lastDeliveryId - these are inflight
+    * @param lastDeliveryId
     */
-   void confirmDelivery(int count);
+   void cancelInflightMessages(long lastDeliveryId) throws JMSException;
 
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -21,10 +21,6 @@
  */
 package org.jboss.jms.server.endpoint;
 
-
-import java.util.ArrayList;
-import java.util.List;
-
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -33,8 +29,6 @@
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.selector.Selector;
-import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.jms.server.remoting.JMSDispatcher;
 import org.jboss.jms.server.remoting.MessagingMarshallable;
 import org.jboss.jms.util.ExceptionUtil;
@@ -49,15 +43,18 @@
 import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.messaging.util.Future;
 import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.callback.HandleCallbackException;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
- * Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
+ * Concrete implementation of ConsumerEndpoint.
+ * 
+ * Lives on the boundary between Messaging Core and the
  * JMS Facade.
+ * 
+ * Handles delivery of messages from the server to the client side consumer.
+ * 
  *
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -73,8 +70,6 @@
 
    // Static --------------------------------------------------------  
 
-   private static final int MESSAGES_IN_TRANSIT_WAIT_COUNT = 100;
-
    // Attributes ----------------------------------------------------
 
    private boolean trace = log.isTraceEnabled();
@@ -86,84 +81,55 @@
    private String queueName;
 
    private ServerSessionEndpoint sessionEndpoint;
+   
+   private ServerInvokerCallbackHandler callbackHandler;
+   
+   private byte versionToUse;
 
    private boolean noLocal;
 
    private Selector messageSelector;
 
    private JBossDestination destination;
+   
+   private boolean started;
+   
+   //This lock protects starting and stopping
+   private Object startStopLock;
 
-   private List toDeliver;
-
    // Must be volatile
-   private volatile boolean clientConsumerFull;
-
-   // Must be volatile
-   private volatile boolean bufferFull;
-
-   // Must be volatile
-   private volatile boolean started;
-
-   // No need to be volatile - is protected by lock
-   private boolean closed;
-
-   private Executor executor;
-
-   private int prefetchSize;
-
-   private Object lock;
-
-   private Object messagesInTransitLock;
+   private volatile boolean clientAccepting;
    
-   private int messagesInTransitCount; // access only from a region guarded by messagesInTransitLock
-   
    // Constructors --------------------------------------------------
 
    ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
                          ServerSessionEndpoint sessionEndpoint,
-                         String selector, boolean noLocal, JBossDestination dest,
-                         int prefetchSize)
-      throws InvalidSelectorException
+                         String selector, boolean noLocal, JBossDestination dest)
+                         throws InvalidSelectorException
    {
       if (trace) { log.trace("constructing consumer endpoint " + id); }
 
       this.id = id;
+      
       this.messageQueue = messageQueue;
+      
       this.queueName = queueName;
+      
       this.sessionEndpoint = sessionEndpoint;
-      this.prefetchSize = prefetchSize;
       
-      // We always created with clientConsumerFull = true. This prevents the SCD sending messages to
-      // the client before the client has fully finished creating the MessageCallbackHandler.
-      this.clientConsumerFull = true;
-
-      // We allocate an executor from the rotating pool for each consumer based on it's id
-      // This gives better latency than each consumer for the destination using the same
-      // executor
-      QueuedExecutorPool pool =
-         sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
-
-      this.executor = (QueuedExecutor)pool.get();
-             
-      // Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
-      // deliveries for the same consumer happen serially, since even if they are queued serially
-      // the actual deliveries can happen in parallel, resulting in a later one "overtaking" an
-      // earlier non-deterministicly depending on thread scheduling.
-      // Consequently we use a QueuedExecutor to ensure the deliveries happen sequentially. We do
-      // not want each ServerConsumerEndpoint instance to have its own instance - since we would
-      // end up using too many threads, neither do we want to share the same instance amongst all
-      // consumers - we do not want to serialize delivery to all consumers. So we maintain a bag of
-      // QueuedExecutors and give them out to consumers as required. Different consumers can end up
-      // using the same queuedexecutor concurrently if there are a lot of active consumers.
-
+      this.callbackHandler = sessionEndpoint.getConnectionEndpoint().getCallbackHandler();
+      
+      this.versionToUse = sessionEndpoint.getConnectionEndpoint().getUsingVersion();
+            
       this.noLocal = noLocal;
       
       this.destination = dest;
       
-      this.toDeliver = new ArrayList();
+      //Always start as false - wait for consumer to initiate
+      this.clientAccepting = false;
       
-      this.lock = new Object();
-
+      this.startStopLock = new Object();
+      
       if (selector != null)
       {
          if (trace) log.trace("creating selector:" + selector);
@@ -179,9 +145,6 @@
       // prompt delivery
       promptDelivery();
 
-      messagesInTransitLock = new Object();
-      messagesInTransitCount = 0;
-      
       log.debug(this + " constructed");
    }
 
@@ -195,30 +158,26 @@
       if (trace) { log.trace(this + " receives " + ref + " for delivery"); }
       
       // This is ok to have outside lock - is volatile
-      if (bufferFull)
+      if (!clientAccepting)
       {
-         // We buffer a maximum of PREFETCH_LIMIT messages at once
+         if (trace) { log.trace(this + " the client is not currently accepting messages"); }
          
-         if (trace) { log.trace(this + " has reached prefetch size will not accept any more references"); }
-         
          return null;
       }
-       
-      // Need to synchronized around the whole block to prevent setting started = false
-      // but handle is already running and a message is deposited during the stop procedure.
-      synchronized (lock)
-      {  
+        
+      synchronized (startStopLock)
+      {         
          // If the consumer is stopped then we don't accept the message, it should go back into the
          // queue for delivery later.
          if (!started)
          {
             if (trace) { log.trace(this + " NOT started yet!"); }
-
+   
             return null;
          }
-
+   
          if (trace) { log.trace(this + " has the main lock, preparing the message for delivery"); }
-
+   
          JBossMessage message = (JBossMessage)ref.getMessage();
          
          boolean selectorRejected = !this.accept(message);
@@ -235,8 +194,8 @@
             return delivery;
          }
          
-         long deliveryId = sessionEndpoint.addDelivery(delivery);
-
+         long deliveryId = sessionEndpoint.addDelivery(delivery, id);
+   
          // We don't send the message as-is, instead we create a MessageProxy instance. This allows
          // local fields such as deliveryCount to be handled by the proxy but global data to be
          // fielded by the same underlying Message instance. This allows us to avoid expensive
@@ -244,37 +203,42 @@
    
          MessageProxy mp = JBossMessage.createThinDelegate(deliveryId, message, ref.getDeliveryCount());
     
-         // Add the proxy to the list to deliver
-                           
-         toDeliver.add(mp);     
-          
-         bufferFull = toDeliver.size() >= prefetchSize;
-             
-         if (!clientConsumerFull)
-         {            
-            try
-            {
-               Deliverer deliverer = new Deliverer();
-               if (trace) { log.trace(this + " scheduling a new " + deliverer); }
-               this.executor.execute(deliverer);
-            }
-            catch (InterruptedException e)
-            {
-               log.warn("Thread interrupted", e);
-            }
+         //We send the message to the client on the current thread.
+         //The message is written onto the transport and then the thread returns immediately
+         //without waiting for a response
+         
+         //FIXME - how can we ensure that a later send doesn't overtake an earlier send - this might
+         //happen if they are using different underlying TCP connections (e.g. from pool)
+         
+         ClientDelivery del = new ClientDelivery(mp, id);
+         
+         MessagingMarshallable mm = new MessagingMarshallable(versionToUse, del);
+         
+         Callback callback = new Callback(mm);
+   
+         try
+         {
+            //FIXME - we need to use the asynch callback API, this is the Sync one
+            callbackHandler.handleCallback(callback);
          }
+         catch (HandleCallbackException e)
+         {
+            log.error("Failed to handle callback", e);
+            
+            return null;
+         }
                              
-         return delivery;              
+         return delivery;      
       }
    }      
-   
 
    // Filter implementation -----------------------------------------
 
    public boolean accept(Routable r)
    {
       boolean accept = true;
-      if (this.destination.isQueue())
+      
+      if (destination.isQueue())
       {
          // For subscriptions message selection is handled in the Subscription itself
          // we do not want to do the check twice
@@ -323,9 +287,11 @@
    {      
       try
       {
+         if (trace) { log.trace(this + " close"); }
+         
          localClose();
          
-         sessionEndpoint.removeConsumer(id);
+         sessionEndpoint.removeConsumer(id);         
       }   
       catch (Throwable t)
       {
@@ -335,78 +301,66 @@
            
    // ConsumerEndpoint implementation -------------------------------
    
-   /*
-    * This is called by the client consumer to tell the server to wake up and start sending more
-    * messages if available
-    */
-   public void more() throws JMSException
-   {           
+   
+   public void changeRate(float newRate) throws JMSException
+   {
+      if (trace) { log.trace(this + " changeRate: " + newRate); }
+      
       try
-      {
-         // Set clientConsumerFull to false.
-         //
-         // NOTE! This must be done using a Runnable on the delivery executor - this is to prevent
-         // the following race condition:
-         //  1) Messages are delivered to the client, causing it to be full.
-         //  2) The messages are consumed very quickly on the client causing more() to be called.
-         //  3) more() hits the server BEFORE the deliverer thread has returned from delivering to
-         //     the client causing clientConsumerFull to be set to false and adding a deliverer to
-         //     the queue.
-         //  4) The deliverer thread returns and sets clientConsumerFull to true.
-         //  5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
-         //     though the client needs messages.
-
-         executor.execute(new Runnable()
-         {
-            public void run()
-            {
-               if (trace) { log.trace(ServerConsumerEndpoint.this + " is notified that client wants more() messages"); }
-               clientConsumerFull = false;
-            }
-         });
-                           
-         // Run a deliverer to deliver any existing ones
-         executor.execute(new Deliverer());
+      {      
+         //For now we just support a binary on/off
+         //The client will send newRate = 0, to say it does not want any more messages when it's client side
+         //buffer gets full
+         //or it will send an arbitrary non zero number to say it does want more messages, when it's client side
+         //buffer empties to half it's full size.
+         //Note the client does not wait until the client side buffer is empty before sending a newRate(+ve)
+         //message since this would add extra latency.
          
-         // TODO Why do we need to wait for it to execute? Why not just return immediately?
+         //In the future we can fine tune this by allowing the client to specify an actual rate in the newRate value
+         //so this is basically a placeholder for the future so we don't have to change the wire format when
+         //we support it
          
-         // Now wait for it to execute
-         Future result = new Future();
-         this.executor.execute(new Waiter(result));
-         result.getResult();
-                  
-         // Now we know the deliverer has delivered any outstanding messages to the client buffer
+         //No need to synchronize - clientAccepting is volatile
          
-         messageQueue.deliver(false);
-      }
-      catch (InterruptedException e)
-      {
-         log.warn("Thread interrupted", e);
-      }       
+         if (newRate == 0)
+         {
+            clientAccepting = false;
+         }
+         else
+         {
+            clientAccepting = true;
+            
+            promptDelivery();
+         }            
+      }   
       catch (Throwable t)
       {
-         throw ExceptionUtil.handleJMSInvocation(t, this + " more");
+         throw ExceptionUtil.handleJMSInvocation(t, this + " changeRate");
       }
    }
-
-   public void confirmDelivery(int count)
+   
+   /*
+    * This method is always called between closing() and close() being called
+    * Instead of having a new method we could perhaps somehow pass the last delivery id
+    * in with closing - then we don't need another message
+    */
+   public void cancelInflightMessages(long lastDeliveryId) throws JMSException
    {
-      synchronized(messagesInTransitLock)
+      if (trace) { log.trace(this + " cancelInflightMessages: " + lastDeliveryId); }
+      
+      try
+      {      
+         //Cancel all deliveries made by this consumer with delivery id > lastDeliveryId
+         
+         sessionEndpoint.cancelDeliveriesForConsumerAfterDeliveryId(id, lastDeliveryId);      
+      }   
+      catch (Throwable t)
       {
-         messagesInTransitCount -= count;
-
-         if (trace) { log.trace("confirming delivery of " + count + " message(s), messages in transit " + messagesInTransitCount); }
-
-         if (messagesInTransitCount < 0)
-         {
-            log.error(this + " has an invalid messages in transit count (" +
-                      messagesInTransitCount + ")");
-         }
-
-         messagesInTransitLock.notifyAll();
-      }
+         throw ExceptionUtil.handleJMSInvocation(t, this + " cancelInflightMessages");
+      }            
    }
    
+   
    public boolean isClosed() throws JMSException
    {
       throw new IllegalStateException("isClosed should never be handled on the server side");
@@ -433,48 +387,38 @@
    
    void localClose() throws Throwable
    {      
-      synchronized (lock)
-      { 
-         if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
+      if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
 
-         messageQueue.remove(this); 
+      messageQueue.remove(this); 
+      
+      JMSDispatcher.instance.unregisterTarget(new Integer(id));
+      
+      // If this is a consumer of a non durable subscription then we want to unbind the
+      // subscription and delete all its data.
+
+      if (destination.isTopic())
+      {
+         PostOffice postOffice = 
+            sessionEndpoint.getConnectionEndpoint().getServerPeer().getPostOfficeInstance();
          
-         JMSDispatcher.instance.unregisterTarget(new Integer(id));
-         
-         // If this is a consumer of a non durable subscription then we want to unbind the
-         // subscription and delete all its data.
+         Binding binding = postOffice.getBindingForQueueName(queueName);
 
-         if (destination.isTopic())
+         //Note binding can be null since there can many competing subscribers for the subscription  - 
+         //in which case the first will have removed the subscription and subsequently
+         //ones won't find it
+         
+         if (binding != null && !binding.getQueue().isRecoverable())
          {
-            PostOffice postOffice = 
-               sessionEndpoint.getConnectionEndpoint().getServerPeer().getPostOfficeInstance();
-            
-            Binding binding = postOffice.getBindingForQueueName(queueName);
-
-            //Note binding can be null since there can many competing subscribers for the subscription  - 
-            //in which case the first will have removed the subscription and subsequently
-            //ones won't find it
-            
-            if (binding != null && !binding.getQueue().isRecoverable())
-            {
-               postOffice.unbindQueue(queueName);
-            }
+            postOffice.unbindQueue(queueName);
          }
-                     
-         closed = true;
       }
+     
    }        
            
    void start()
    {             
-      synchronized (lock)
-      {
-         // Can't start or stop it if it is closed.
-         if (closed)
-         {
-            return;
-         }
-         
+      synchronized (startStopLock)
+      {      
          if (started)
          {
             return;
@@ -482,96 +426,53 @@
          
          started = true;
       }
-      
+            
       // Prompt delivery
       promptDelivery();
    }
    
    void stop() throws Throwable
-   {     
-      // We need to:
-      // - Stop accepting any new messages in the SCE.
-      // - Flush any messages from the SCE to the buffer.
-      // If the client consumer is now full, then we need to cancel the ones in the toDeliver list.
-
-      // We need to lock since otherwise we could set started to false but the handle method was
-      // already executing and messages might get deposited after.
-      synchronized (lock)
-      {
+   {           
+      synchronized (startStopLock)
+      {         
          if (!started)
          {
             return;
          }
          
-         started = false;
-      }
-      
-      // Now we know no more messages will be accepted in the SCE.
-            
-      try
-      {
-         // Flush any messages waiting to be sent to the client.
-         this.executor.execute(new Deliverer());
+         started = false;                  
          
-         if (trace) { log.trace(this + " flushed all remaining messages (if any) to the client"); }
-
-         // Now we know any deliverer has delivered any outstanding messages to the client buffer.
-      }
-      catch (InterruptedException e)
-      {
-         log.warn("Thread interrupted", e);
-      }
-
-      // Make sure there are no messages in transit between server and client
-
-      synchronized(messagesInTransitLock)
-      {
-         int loopCount = 0;
-         while(messagesInTransitCount > 0 && loopCount < MESSAGES_IN_TRANSIT_WAIT_COUNT)
-         {
-            log.debug(this + " waiting for " + messagesInTransitCount + " message(s) in transit " +
-                      "to reach the client, " + (loopCount + 1) + " lock grab attempts.");
-            messagesInTransitLock.wait(500);
-            loopCount ++;
-         }
-
-         if (loopCount >= MESSAGES_IN_TRANSIT_WAIT_COUNT)
-         {
-            throw new IllegalStateException("Maximum number of lock grab attempts exceeded, " +
-                                            "giving up to wait for messages in transit");
-         }
-
-         if (trace) { log.trace(this + " has no messages in transit"); }
-      }
-
-      // Now we know that there are no in flight messages on the way to the client consumer, but
-      // there may be messages still in the toDeliver list since the client consumer might be full,
-      // so we need to cancel these.
-            
-      if (!toDeliver.isEmpty())
-      { 
-         synchronized (lock)
-         {
-            //Cancel in reverse order
-            for (int i = toDeliver.size() - 1; i >= 0; i--)
-            {
-               MessageProxy proxy = (MessageProxy)toDeliver.get(i);
-
-               sessionEndpoint.cancelDelivery(proxy.getDeliveryId());
-            }
-         }
-                 
-         toDeliver.clear();
+         /*
+          * 
+         Any message deliveries already transit to the consumer, will just
+         be ignored by the MessageCallbackHandler since it will be closed 
+   
+         To clarify, the close protocol (from connection) is as follows:
+                  
+         1) MessageCallbackHandler::close() - any messages in buffer are cancelled to the server session, and any
+            subsequent receive messages will be ignored         
          
-         bufferFull = false;
-      }      
-      
-      //Need to prompt delivery
-      promptDelivery();
+         2) ServerConsumerEndpoint::closing() causes stop() this flushes any deliveries yet to deliver to the client callback handler
+         
+         3) MessageCallbackHandler::cancelInflightMessages(long lastDeliveryId) - any deliveries after lastDeliveryId
+         for the consumer will be considered in flight and cancelled.
+         
+         4) ServerConsumerEndpoint:close() - endpoint is deregistered
+         
+         5) Session.close() - acks or cancels any remaining deliveries in the SessionState as appropriate
+         
+         6) ServerSessionEndpoint::close() - cancels any remaining deliveries and deregisters session
+         
+         7) Client side session executor is shutdown
+         
+         8) ServerConnectionEndpoint::close() - connection is deregistered.
+         
+         9) Remoting connection listener is removed and remoting connection stopped.
+         
+         */
+      }
    }
-   
-   
-   
+         
    // Protected -----------------------------------------------------         
       
    // Private -------------------------------------------------------
@@ -582,122 +483,5 @@
    }
    
    // Inner classes -------------------------------------------------   
-   
-   /*
-    * Delivers messages to the client 
-    * TODO - We can make this a bit more intelligent by letting it measure the rate
-    * the client is consuming messages and send messages at that rate.
-    * This would mean the client consumer wouldn't be full so often and more wouldn't have to be called
-    * This should give higher throughput.
-    */
-   private class Deliverer implements Runnable
-   {
-      public void run()
-      {
-         // Is there anything to deliver? This is ok outside lock - is volatile.
-         if (clientConsumerFull)
-         {
-            if (trace) { log.trace(this + " client consumer full, do nothing"); }
-            return;
-         }
-         
-         List list = null;
-             
-         synchronized (lock)
-         {
-            if (trace) { log.trace(this + " has the main lock, attempting delivery"); }
-
-            if (!toDeliver.isEmpty())
-            {
-               list = new ArrayList(toDeliver);
-               toDeliver.clear();
-               bufferFull = false;
-            }
-         }
-                                                           
-         if (list == null)
-         {
-            if (trace) { log.trace(this + " has a null list, returning"); }
-            return;
-         }
-
-         ServerConnectionEndpoint connection =
-            ServerConsumerEndpoint.this.sessionEndpoint.getConnectionEndpoint();
-         
-         ClientDelivery del = new ClientDelivery(list, id);
-         MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
-         Callback callback = new Callback(mm);
-
-         try
-         {
-            if (trace)
-            {
-               StringBuffer sb = new StringBuffer(ServerConsumerEndpoint.this + " handing [");
-               for(int i = 0; i < list.size(); i++)
-               {
-                  sb.append(((MessageProxy)list.get(i)).getMessage().getMessageID());
-                  if (i < list.size() - 1)
-                  {
-                     sb.append(",");
-                  }
-               }
-               sb.append("] over to the remoting layer");
-               log.trace(sb.toString());
-            }
-            
-            synchronized(messagesInTransitLock)
-            {
-               connection.getCallbackHandler().handleCallback(callback);
-               messagesInTransitCount += list.size();
-            }
-
-            if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
-
-            // We are NOT using Remoting's facility of acknowledging callbacks. A callback is sent
-            // asynchronously, and there is no confirmation that the callback reached the client or
-            // not.
-
-            // TODO Previously, synchronous server-to-client invocations were used by the client
-            // to report back whether is full or not. This cannot be achieved with asynchronous
-            // callbacks, so the client must explicitely sent this information to the server,
-            // with an invocation on its own.
-         }
-         catch(Throwable t)
-         {
-            log.warn("Failed to deliver the message to the client. See the server log for more details.");
-            log.debug(ServerConsumerEndpoint.this + " failed to deliver the message to the client.", t);
-
-            ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
-
-            mgr.handleClientFailure(connection.getRemotingClientSessionId());
-         }
-      }
-
-      public String toString()
-      {
-         return "Deliverer[" + Integer.toHexString(hashCode()) + "]";
-      }
-   }
-   
-   /*
-    * The purpose of this class is to put it on the QueuedExecutor and wait for it to run
-    * We can then ensure that all the Runnables in front of it on the queue have also executed
-    * We cannot just call shutdownAfterProcessingCurrentlyQueuedTasks() since the
-    * QueueExecutor might be share by other consumers and we don't want to wait for their
-    * tasks to complete
-    */
-   private static class Waiter implements Runnable
-   {
-      Future result;
-      
-      Waiter(Future result)
-      {
-         this.result = result;
-      }
-      
-      public void run()
-      {
-         result.setResult(null);
-      }
-   }   
+     
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -29,6 +29,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -502,7 +503,7 @@
                
                if (trace) { log.trace(this + " Recovered delivery " + deliveryId + ", " + del); }
                
-               deliveries.put(new Long(deliveryId), del);
+               deliveries.put(new Long(deliveryId), new DeliveryRecord(del, -1));
             }
          }
          
@@ -702,6 +703,40 @@
    }
 
    // Package protected ---------------------------------------------
+      
+   void cancelDeliveriesForConsumerAfterDeliveryId(int consumerId, long lastDeliveryId) throws Throwable
+   {
+      //Need to cancel in reverse
+      
+      LinkedList toCancel = new LinkedList();
+      
+      Iterator iter = deliveries.entrySet().iterator();
+            
+      while (iter.hasNext())
+      {
+         Map.Entry entry = (Map.Entry)iter.next();
+         
+         Long deliveryId = (Long)entry.getKey();
+         
+         DeliveryRecord record = (DeliveryRecord)entry.getValue();
+         
+         if (record.consumerId == consumerId && deliveryId.longValue() > lastDeliveryId)
+         {
+            iter.remove();
+            
+            toCancel.addFirst(record);
+         }
+      }
+      
+      iter = toCancel.iterator();
+      
+      while (iter.hasNext())
+      {
+         DeliveryRecord record = (DeliveryRecord)iter.next();
+         
+         record.del.cancel();
+      }
+   }
    
    void removeBrowser(int browserId) throws Exception
    {
@@ -786,11 +821,11 @@
          
          if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
          
-         Delivery del = (Delivery)entry.getValue();
+         DeliveryRecord rec = (DeliveryRecord)entry.getValue();
          
-         del.cancel();
+         rec.del.cancel();
          
-         channels.add(del.getObserver());
+         channels.add(rec.del.getObserver());
       }
       
       promptDelivery(channels);
@@ -806,26 +841,26 @@
    
    void cancelDelivery(long deliveryId) throws Throwable
    {
-      Delivery del = (Delivery)deliveries.remove(new Long(deliveryId));
+      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
       
-      if (del == null)
+      if (rec == null)
       {
          throw new IllegalStateException("Cannot find delivery to cancel " + deliveryId);
       }
       
-      del.cancel();
+      rec.del.cancel();
    }
    
-   long addDelivery(Delivery del)
+   long addDelivery(Delivery del, int consumerId)
    {
       long deliveryId = deliveryIdSequence.increment();
       
-      deliveries.put(new Long(deliveryId), del);
+      deliveries.put(new Long(deliveryId), new DeliveryRecord(del, consumerId));
       
       if (trace) { log.trace(this + " added delivery " + deliveryId + ": " + del); }
       
       return deliveryId;      
-   }
+   }      
    
    void acknowledgeTransactionally(List acks, Transaction tx) throws Throwable
    {
@@ -847,16 +882,16 @@
          
          Long id = new Long(ack.getDeliveryId());
            
-         Delivery del = (Delivery)deliveries.get(id);
+         DeliveryRecord rec = (DeliveryRecord)deliveries.get(id);
          
-         if (del == null)
+         if (rec == null)
          {
             throw new IllegalStateException("Cannot find delivery to acknowledge " + ack);
          }
                            
          deliveryCallback.addDeliveryId(id);
          
-         del.acknowledge(tx);
+         rec.del.acknowledge(tx);
       }      
    }
    
@@ -890,21 +925,21 @@
    {
       if (trace) { log.trace(this + " acknowledging delivery " + ack.getDeliveryId()); }
       
-      Delivery del = (Delivery)deliveries.remove(new Long(ack.getDeliveryId()));
+      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryId()));
       
-      if (del == null)
+      if (rec == null)
       {
          throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
       }
       
-      del.acknowledge(null);    
+      rec.del.acknowledge(null);    
    } 
    
    private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
    {
-      Delivery del = (Delivery)deliveries.remove(new Long(cancel.getDeliveryId()));
+      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
       
-      if (del == null)
+      if (rec == null)
       {
          throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
       }
@@ -922,17 +957,17 @@
             if (dlq != null)
             {         
                //reset delivery count to zero
-               del.getReference().setDeliveryCount(0);
+               rec.del.getReference().setDeliveryCount(0);
                
-               dlq.handle(null, del.getReference(), tx);
+               dlq.handle(null, rec.del.getReference(), tx);
                
-               del.acknowledge(tx);           
+               rec.del.acknowledge(tx);           
             }
             else
             {
                log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
                
-               del.acknowledge(tx);
+               rec.del.acknowledge(tx);
             }                              
                         
             tx.commit();
@@ -946,12 +981,12 @@
       }
       else
       {                                                   
-         del.getReference().setDeliveryCount(cancel.getDeliveryCount());
+         rec.del.getReference().setDeliveryCount(cancel.getDeliveryCount());
          
-         del.cancel();
+         rec.del.cancel();
       }
       
-      return del;
+      return rec.del;
    }
 
    private ConsumerDelegate failoverConsumer(JBossDestination jmsDestination,
@@ -991,7 +1026,7 @@
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(consumerID, binding.getQueue(),
                                     binding.getQueue().getName(), this, selectorString, noLocal,
-                                    jmsDestination, prefetchSize);
+                                    jmsDestination);
       
       JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
 
@@ -1263,13 +1298,13 @@
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
                   binding.getQueue().getName(), this, selectorString, noLocal,
-                  jmsDestination, prefetchSize);
+                  jmsDestination);
       
       JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
       
       ClientConsumerDelegate stub =
          new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
-                  prefetchSize, maxDeliveryAttempts);
+                                    prefetchSize, maxDeliveryAttempts);
       
       synchronized (consumers)
       {
@@ -1294,11 +1329,33 @@
       }
    }
    
+   // Inner classes -------------------------------------------------
    
+   /*
+    * Holds a record of a delivery - we need to store the consumer id as well
+    * hence this class
+    * The only reason we need to store the consumer id is that on consumer close, we need to 
+    * cancel any deliveries corresponding to that consumer.
+    * We can't rely on the cancel being driven from the MessageCallbackHandler since
+    * the deliveries may have got lost in transit (ignored) since the consumer might have closed
+    * when they were in transit.
+    * In such a case we might otherwise end up with the consumer closing but not all it's deliveries being
+    * cancelled, which would mean they wouldn't be cancelled until the session is closed which is too late
+    */
+   private class DeliveryRecord
+   {
+      Delivery del;
+      
+      int consumerId;
+      
+      DeliveryRecord(Delivery del, int consumerId)
+      {
+         this.del = del;
+         
+         this.consumerId = consumerId;
+      }            
+   }
    
-   
-   // Inner classes -------------------------------------------------
-   
    /**
     * 
     * The purpose of this class is to remove deliveries from the delivery list on commit

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -64,16 +64,16 @@
       endpoint.closing();
    }
 
-   public void more() throws JMSException
+   public void changeRate(float newRate) throws JMSException
    {
-      endpoint.more();
+      endpoint.changeRate(newRate);
    }
-
-   public void confirmDelivery(int count)
+   
+   public void cancelInflightMessages(long lastDeliveryId) throws JMSException
    {
-      endpoint.confirmDelivery(count);
+      endpoint.cancelInflightMessages(lastDeliveryId);
    }
-   
+
    public boolean isClosed() throws JMSException
    {
       return endpoint.isClosed();

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -38,7 +38,6 @@
 import org.jboss.aop.Dispatcher;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.remoting.CallbackManager;
-import org.jboss.jms.client.remoting.HandleMessageResponse;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.Version;
@@ -97,19 +96,23 @@
    protected static final byte CANCEL = 3;
    protected static final byte CANCEL_LIST = 4;
    protected static final byte SEND = 5;   
-   protected static final byte MORE = 6;
+   //protected static final byte MORE = 6;
+   
+   protected static final byte CHANGE_RATE = 6;
+   
    protected static final byte SEND_TRANSACTION = 7;
    protected static final byte GET_ID_BLOCK = 8;
    protected static final byte RECOVER_DELIVERIES = 9;
-   protected static final byte CONFIRM_DELIVERY = 10;
+   //protected static final byte CONFIRM_DELIVERY = 10;
+   
+   
  
 
    // The response codes - start from 100
 
-   protected static final byte CALLBACK = 100;
+   protected static final byte MESSAGE_DELIVERY = 100;
    protected static final byte NULL_RESPONSE = 101;
    protected static final byte ID_BLOCK_RESPONSE = 102;
-   protected static final byte HANDLE_MESSAGE_RESPONSE = 103;
    protected static final byte BROWSE_MESSAGE_RESPONSE = 104;
    protected static final byte BROWSE_MESSAGES_RESPONSE = 105;
    protected static final byte CALLBACK_LIST = 106;
@@ -223,11 +226,15 @@
    
                   if (trace) { log.trace("wrote send()"); }
                }
-               else if ("more".equals(methodName))
+               else if ("changeRate".equals(methodName))
                {
-                  dos.writeByte(MORE);
+                  dos.writeByte(CHANGE_RATE);
    
                   writeHeader(mi, dos);
+                  
+                  Float f = (Float)mi.getArguments()[0];
+                  
+                  dos.writeFloat(f.floatValue());
    
                   dos.flush();
    
@@ -332,20 +339,6 @@
    
                   if (trace) { log.trace("wrote sendUnackedAckInfos()"); }
                }
-               else if ("confirmDelivery".equals(methodName))
-               {
-                  dos.writeByte(CONFIRM_DELIVERY);
-   
-                  writeHeader(mi, dos);
-   
-                  Integer count = (Integer)mi.getArguments()[0];
-   
-                  dos.writeInt(count.intValue());
-                     
-                  dos.flush();
-   
-                  if (trace) { log.trace("wrote confirmDelivery()"); }
-               }
                else if ("sendTransaction".equals(methodName))
                {
                   dos.writeByte(SEND_TRANSACTION);
@@ -393,7 +386,7 @@
    
                ClientDelivery dr = (ClientDelivery)param;
    
-               dos.writeByte(CALLBACK);
+               dos.writeByte(MESSAGE_DELIVERY);
                
                dos.writeUTF(req.getSessionId());
    
@@ -455,19 +448,6 @@
    
                if (trace) { log.trace("wrote id block response"); }
             }
-            else if (res instanceof HandleMessageResponse)
-            {         
-               //Return value from delivering messages to client
-               dos.write(HANDLE_MESSAGE_RESPONSE);
-   
-               HandleMessageResponse response = (HandleMessageResponse)res;
-   
-               response.write(dos);
-   
-               dos.flush();
-   
-               if (trace) { log.trace("wrote handle message response"); }
-            }
             else if (res instanceof JBossMessage)
             {
                //Return value from browsing message
@@ -482,8 +462,7 @@
                dos.flush();
                
                if (trace) { log.trace("wrote browse message response"); }
-            }
-            
+            }            
             else if (res instanceof Message[])
             {
                //Return value from browsing messages
@@ -510,7 +489,7 @@
                      ((ArrayList) res).size() > 0 &&
                      ((ArrayList) res).get(0) instanceof Callback)
             {
-               // Comes from polled Callbacks.
+               // Comes from polled Callbacks. (HTTP transport??)
                ArrayList callbackList = (ArrayList)res;
                dos.write(CALLBACK_LIST);
                dos.writeUTF(resp.getSessionId());
@@ -627,10 +606,16 @@
    
                return request;
             }
-            case MORE:
+            case CHANGE_RATE:
             {
                MethodInvocation mi = readHeader(dis);
-   
+               
+               float f = dis.readFloat();
+               
+               Object[] args = new Object[] {new Float(f)};
+               
+               mi.setArguments(args);
+                  
                InvocationRequest request =
                   new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
                                         new MessagingMarshallable(version, mi), null, null, null);
@@ -802,24 +787,6 @@
    
                return request;
             }
-            case CONFIRM_DELIVERY:
-            {
-               MethodInvocation mi = readHeader(dis);
-   
-               int count = dis.readInt();                  
-   
-               Object[] args = new Object[] {new Integer(count)};
-   
-               mi.setArguments(args);
-   
-               InvocationRequest request =
-                  new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
-                                        new MessagingMarshallable(version, mi), null, null, null);
-   
-               if (trace) { log.trace("read confirmDelivery()"); }
-   
-               return request;
-            }
             case ID_BLOCK_RESPONSE:
             {
                IDBlock block = new IDBlock();
@@ -832,18 +799,6 @@
    
                return resp;
             }
-            case HANDLE_MESSAGE_RESPONSE:
-            {
-               HandleMessageResponse res = new HandleMessageResponse();
-   
-               res.read(dis);
-   
-               InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, res), false, null);
-   
-               if (trace) { log.trace("read handle message response"); }
-   
-               return resp;
-            }
             case BROWSE_MESSAGE_RESPONSE:
             {
                byte type = dis.readByte();
@@ -890,7 +845,7 @@
    
                return resp;
             }
-            case CALLBACK:
+            case MESSAGE_DELIVERY:
             {
                String sessionId = dis.readUTF();
                ClientDelivery dr = new ClientDelivery();

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/build.xml	2006-12-22 20:55:26 UTC (rev 1849)
@@ -356,6 +356,12 @@
       <antcall target="clustering-tests"/>
    </target>
 
+   <target name="http-tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
+      <antcall target="remote-tests">
+            <param name="test.remoting" value="http"/>
+      </antcall>
+   </target>
+
    <target name="stress-tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
       <antcall target="invm-stress-tests"/>
       <antcall target="remote-stress-tests"/>  <!-- default remoting configuration (socket) -->

Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -767,12 +767,18 @@
       Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
       MessageProducer prod = sessSend.createProducer(queue);
       prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      
+      log.trace("Sending messages");
+      
       TextMessage tm1 = sessSend.createTextMessage("a");
       TextMessage tm2 = sessSend.createTextMessage("b");
       TextMessage tm3 = sessSend.createTextMessage("c");
       prod.send(tm1);
       prod.send(tm2);
       prod.send(tm3);
+      
+      log.trace("Sent messages");
+      
       sessSend.close();
       
       assertRemainingMessages(3);
@@ -780,10 +786,20 @@
       conn.start();
 
       Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      
+      log.trace("Creating consumer");
+      
       MessageConsumer cons = sessReceive.createConsumer(queue);
       
+      log.trace("Created consumer");
+      
       MessageListenerAutoAck listener = new MessageListenerAutoAck(sessReceive);
+      
+      log.trace("Setting message listener");
+      
       cons.setMessageListener(listener);
+      
+      log.trace("Set message listener");
 
       listener.waitForMessages();
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -1081,13 +1081,25 @@
           Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
           MessageProducer prod = sess.createProducer(queue);
-          prod.send(sess.createTextMessage("1"));
-          prod.send(sess.createTextMessage("2"));
-          prod.send(sess.createTextMessage("3"));
+          
+          TextMessage tm1 = sess.createTextMessage("1");
+          
+          TextMessage tm2 = sess.createTextMessage("2");
+          
+          TextMessage tm3 = sess.createTextMessage("3");
+          
+          prod.send(tm1);
+          prod.send(tm2);
+          prod.send(tm3);
 
+          log.trace("Creating consumer");
           MessageConsumer cons1 = sess.createConsumer(queue);
 
-          Message r1 = cons1.receive();
+          log.trace("Waiting for message");
+          
+          TextMessage r1 = (TextMessage)cons1.receive();
+          
+          assertEquals(tm1.getText(), r1.getText());
 
           log.trace("Got first message");
 
@@ -1097,12 +1109,16 @@
 
           MessageConsumer cons2 = sess.createConsumer(queue);
 
-          log.trace("Wairting for second message");
-          Message r2 = cons2.receive();
+          log.trace("Waiting for second message");
+          TextMessage r2 = (TextMessage)cons2.receive();
+          
+          assertEquals(tm2.getText(), r2.getText());
 
           log.trace("got second message");
 
-          Message r3 = cons2.receive();
+          TextMessage r3 = (TextMessage)cons2.receive();
+          
+          assertEquals(tm3.getText(), r3.getText());
 
           r1.acknowledge();
           r2.acknowledge();
@@ -1137,6 +1153,8 @@
           prod.send(sess.createTextMessage("1"));
           prod.send(sess.createTextMessage("2"));
           prod.send(sess.createTextMessage("3"));
+          
+          log.trace("Sent messages");
 
           conn.start();
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -38,7 +38,6 @@
 
 import org.jboss.aop.Dispatcher;
 import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.client.remoting.HandleMessageResponse;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
@@ -104,7 +103,7 @@
    
    //Consumer
         
-   protected Method moreMethod;
+   protected Method changeRateMethod;
    
  
    //connection
@@ -150,7 +149,7 @@
             
       //Consumer
             
-      moreMethod = consumerDelegate.getMethod("more", null);
+      changeRateMethod = consumerDelegate.getMethod("changeRate", new Class[] { Float.TYPE });
 
       //Connection
       
@@ -193,9 +192,9 @@
    
    //Consumer
    
-   public void testMore() throws Exception
+   public void testChangeRate() throws Exception
    {
-      wf.testMore();
+      wf.testChangeRate();
    }
    
    
@@ -229,9 +228,9 @@
       wf.testSerializableResponse();
    }
    
-   public void testCallBack() throws Exception
+   public void testMessageDelivery() throws Exception
    {
-      wf.testCallback();
+      wf.testMessageDelivery();
    }
    
    public void testIDBlockResponse() throws Exception
@@ -239,11 +238,8 @@
       wf.testGetIdBlockResponse();
    }
    
-   public void testHandleMessageResponse() throws Exception
-   {
-      wf.testHandleMessageResponse();
-   }
-            
+   //TODO need a test for the polled callbacks
+          
    // Public --------------------------------------------------------
    
    public static class SerializableObject implements Serializable
@@ -269,7 +265,7 @@
    /**
     * We extend the class so we have access to protected fields
     */
-   class TestWireFormat extends JMSWireFormat
+   private class TestWireFormat extends JMSWireFormat
    {      
       public void testAcknowledgeDelivery() throws Exception
       {
@@ -564,7 +560,127 @@
          
       }
       
+      public void testCancelDeliveries() throws Exception
+      {                            
+         long methodHash = 62365354;
+         
+         int objectId = 54321;
+         
+         List cancels = new ArrayList();
+         
+         DefaultCancel cancel1 = new DefaultCancel(65654, 43);
+         DefaultCancel cancel2 = new DefaultCancel(65765, 2);
+         cancels.add(cancel1);
+         cancels.add(cancel2);
+         
+         MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveriesMethod, cancelDeliveriesMethod, null);
+         
+         mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
+         
+         mi.setArguments(new Object[] {cancels});
+         
+         MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+         
+         InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+         
+         ByteArrayOutputStream bos = new ByteArrayOutputStream();
+         
+         OutputStream oos = new DataOutputStream(bos);
+                  
+         wf.write(ir, oos);
+        
+         oos.flush();
+               
+         byte[] bytes = bos.toByteArray();
+              
+         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+                  
+         DataInputStream dis = new DataInputStream(bis); 
+               
+         //Check the bytes
+             
+         //First byte should be version
+         assertEquals(77, dis.readByte());
+         
+         //Next byte should be CANCEL_MESSAGES
+         assertEquals(JMSWireFormat.CANCEL_LIST, dis.readByte());
+         
+         //Next int should be objectId
+         assertEquals(objectId, dis.readInt());
+         
+         //Next long should be methodHash
+         assertEquals(methodHash, dis.readLong());
+                  
+         //Next should the size of the list
+         
+         int size = dis.readInt();
+         
+         assertEquals(2, size);
+         
+         //then the AckInfos
+         long deliveryId = dis.readLong();
+         int deliveryCount = dis.readInt();
+         DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount);
+         
+         deliveryId = dis.readLong();
+         deliveryCount = dis.readInt();
+         DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount);
+
+         assertEquals(cancel1.getDeliveryCount(), rcancel1.getDeliveryCount());
+         
+         assertEquals(cancel1.getDeliveryId(), cancel1.getDeliveryId());
+         
+         assertEquals(cancel2.getDeliveryCount(), rcancel2.getDeliveryCount());
+         
+         assertEquals(cancel2.getDeliveryId(), cancel2.getDeliveryId());
+                   
+         //should be eos
+                
+         try
+         {
+            dis.readByte();
+            fail("End of stream expected");
+         }
+         catch (EOFException e)
+         {
+            //Ok
+         }
+         
+         
+         bis.reset();
+         
+         InputStream ois = new DataInputStream(bis);
+         
+         InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
+         
+         mm = (MessagingMarshallable)ir2.getParameter();
+         
+         assertEquals(77, mm.getVersion());
+         
+         MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+         
+         assertEquals(methodHash, mi2.getMethodHash());
+         
+         assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
+         
+         List list = (List)mi2.getArguments()[0];
+        
+         assertEquals(2, list.size());
+         
+         DefaultCancel xack1 = (DefaultCancel)list.get(0);
+         DefaultCancel xack2 = (DefaultCancel)list.get(1);
+         
+         assertEquals(cancel1.getDeliveryId(), xack1.getDeliveryId());
+         
+         assertEquals(cancel1.getDeliveryCount(), xack1.getDeliveryCount());
+         
+         assertEquals(cancel2.getDeliveryId(), xack2.getDeliveryId());
+         
+         assertEquals(cancel2.getDeliveryCount(), xack2.getDeliveryCount());
+         
+      }  
       
+      
       /*
        * Test that general serializable invocation requests are marshalled correctky
        */
@@ -961,127 +1077,7 @@
          
       }  
             
-      
-      public void testCancelDeliveries() throws Exception
-      {                            
-         long methodHash = 62365354;
-         
-         int objectId = 54321;
-         
-         List cancels = new ArrayList();
-         
-         DefaultCancel cancel1 = new DefaultCancel(65654, 43);
-         DefaultCancel cancel2 = new DefaultCancel(65765, 2);
-         cancels.add(cancel1);
-         cancels.add(cancel2);
-         
-         MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveriesMethod, cancelDeliveriesMethod, null);
-         
-         mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
-         
-         mi.setArguments(new Object[] {cancels});
-         
-         MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
-         
-         InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
-         
-         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-         
-         OutputStream oos = new DataOutputStream(bos);
                   
-         wf.write(ir, oos);
-        
-         oos.flush();
-               
-         byte[] bytes = bos.toByteArray();
-              
-         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-                  
-         DataInputStream dis = new DataInputStream(bis); 
-               
-         //Check the bytes
-             
-         //First byte should be version
-         assertEquals(77, dis.readByte());
-         
-         //Next byte should be CANCEL_MESSAGES
-         assertEquals(JMSWireFormat.CANCEL_LIST, dis.readByte());
-         
-         //Next int should be objectId
-         assertEquals(objectId, dis.readInt());
-         
-         //Next long should be methodHash
-         assertEquals(methodHash, dis.readLong());
-                  
-         //Next should the size of the list
-         
-         int size = dis.readInt();
-         
-         assertEquals(2, size);
-         
-         //then the AckInfos
-         long deliveryId = dis.readLong();
-         int deliveryCount = dis.readInt();
-         DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount);
-         
-         deliveryId = dis.readLong();
-         deliveryCount = dis.readInt();
-         DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount);
-
-         assertEquals(cancel1.getDeliveryCount(), rcancel1.getDeliveryCount());
-         
-         assertEquals(cancel1.getDeliveryId(), cancel1.getDeliveryId());
-         
-         assertEquals(cancel2.getDeliveryCount(), rcancel2.getDeliveryCount());
-         
-         assertEquals(cancel2.getDeliveryId(), cancel2.getDeliveryId());
-                   
-         //should be eos
-                
-         try
-         {
-            dis.readByte();
-            fail("End of stream expected");
-         }
-         catch (EOFException e)
-         {
-            //Ok
-         }
-         
-         
-         bis.reset();
-         
-         InputStream ois = new DataInputStream(bis);
-         
-         InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
-         
-         mm = (MessagingMarshallable)ir2.getParameter();
-         
-         assertEquals(77, mm.getVersion());
-         
-         MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
-         
-         assertEquals(methodHash, mi2.getMethodHash());
-         
-         assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
-         
-         List list = (List)mi2.getArguments()[0];
-        
-         assertEquals(2, list.size());
-         
-         DefaultCancel xack1 = (DefaultCancel)list.get(0);
-         DefaultCancel xack2 = (DefaultCancel)list.get(1);
-         
-         assertEquals(cancel1.getDeliveryId(), xack1.getDeliveryId());
-         
-         assertEquals(cancel1.getDeliveryCount(), xack1.getDeliveryCount());
-         
-         assertEquals(cancel2.getDeliveryId(), xack2.getDeliveryId());
-         
-         assertEquals(cancel2.getDeliveryCount(), xack2.getDeliveryCount());
-         
-      }  
-      
       public void testNullResponse() throws Exception
       {
          MessagingMarshallable mm = new MessagingMarshallable((byte)77, null);
@@ -1129,19 +1125,23 @@
          assertNull(mm.getLoad());
             
       }
-      
-      
-      
-      public void testMore() throws Exception
+                  
+      public void testChangeRate() throws Exception
       {
          long methodHash = 62365354;
          
          int objectId = 54321;
          
-         MethodInvocation mi = new MethodInvocation(null, methodHash, moreMethod, moreMethod, null);
+         float rate = 123.45f;
          
-         mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
+         MethodInvocation mi = new MethodInvocation(null, methodHash, changeRateMethod, changeRateMethod, null);
          
+         mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId)); 
+         
+         Object[] args = new Object[] { new Float(rate) };
+         
+         mi.setArguments(args);
+         
          MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
          
          InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
@@ -1165,8 +1165,8 @@
          //First byte should be version
          assertEquals(77, dis.readByte());         
          
-         //Second byte should be MORE
-         assertEquals(JMSWireFormat.MORE, dis.readByte());
+         //Second byte should be CHANGE_RATE
+         assertEquals(JMSWireFormat.CHANGE_RATE, dis.readByte());
          
          //Next int should be objectId
          assertEquals(objectId, dis.readInt());
@@ -1174,6 +1174,11 @@
          //Next long should be methodHash
          assertEquals(methodHash, dis.readLong());
          
+         //Next should be the float
+         float f2 = dis.readFloat();
+         
+         assertTrue(rate == f2);
+         
          //Now eos
          try
          {
@@ -1196,37 +1201,28 @@
          
          MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
          
+         Float f3 = (Float)mi2.getArguments()[0];
+         
+         assertTrue(rate == f3.floatValue());
+         
          assertEquals(methodHash, mi2.getMethodHash());
          
          assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());         
       }
       
-      
-      
-      public void testCallback() throws Exception
+            
+      public void testMessageDelivery() throws Exception
       {
          int consumerID = 12345678;
          
          JBossMessage m1 = new JBossMessage(123);
-         JBossMessage m2 = new JBossMessage(456);
-         JBossMessage m3 = new JBossMessage(789);
-         
-         List msgs = new ArrayList();
-         
+
          MessageProxy del1 = JBossMessage.createThinDelegate(1, m1, 7);
-         MessageProxy del2 = JBossMessage.createThinDelegate(2, m2, 8);
-         MessageProxy del3 = JBossMessage.createThinDelegate(3, m3, 9);
          
          MessageTest.configureMessage(m1);
-         MessageTest.configureMessage(m2);
-         MessageTest.configureMessage(m3);
+
+         ClientDelivery dr = new ClientDelivery(del1, consumerID);
          
-         msgs.add(del1);
-         msgs.add(del2);
-         msgs.add(del3);         
-         
-         ClientDelivery dr = new ClientDelivery(msgs, consumerID);
-         
          ByteArrayOutputStream bos = new ByteArrayOutputStream();
          
          OutputStream oos = new DataOutputStream(bos);
@@ -1248,18 +1244,15 @@
          //First byte should be version
          assertEquals(77, dis.readByte());         
          
-         //Second byte should be CALLBACK
-         assertEquals(JMSWireFormat.CALLBACK, dis.readByte());
+         //Second byte should be MESSAGE_DELIVERY
+         assertEquals(JMSWireFormat.MESSAGE_DELIVERY, dis.readByte());
          
          //Next should be sessionID
          assertEquals("dummySessionId", dis.readUTF());
          
          //Next int should be consumer id
          assertEquals(12345678, dis.readInt());
-         
-         //Next int should be number of messages
-         assertEquals(3, dis.readInt());
-                           
+                     
          //Next byte should be type
          assertEquals(JBossMessage.TYPE, dis.readByte());
          
@@ -1272,41 +1265,9 @@
          //And now the message itself
          JBossMessage r1 = new JBossMessage();
          
-         r1.read(dis);
+         r1.read(dis);         
          
-         
-         //Next byte should be type
-         assertEquals(JBossMessage.TYPE, dis.readByte());
-         
-         //Next int should be delivery count
-         assertEquals(8, dis.readInt());
-         
-         // Delivery id
-         assertEquals(2, dis.readLong());
-         
-         //And now the message itself
-         JBossMessage r2 = new JBossMessage();
-         
-         r2.read(dis);
-         
-         
-         //Next byte should be type
-         assertEquals(JBossMessage.TYPE, dis.readByte());
-         
-         //Next int should be delivery count
-         assertEquals(9, dis.readInt());
-         
-         // Delivery id
-         assertEquals(3, dis.readLong());
-         
-         //And now the message itself
-         JBossMessage r3 = new JBossMessage();
-         
-         r3.read(dis);
-         
          MessageTest.ensureEquivalent(m1, r1);
-         MessageTest.ensureEquivalent(m2, r2);
-         MessageTest.ensureEquivalent(m3, r3);
          
          //eos
          try
@@ -1339,25 +1300,16 @@
                   
          ClientDelivery dr2 = (ClientDelivery)mm.getLoad();
          
-         List msgs2 = dr2.getMessages();
+         MessageProxy p1 = dr2.getMessage();
          
          assertEquals(consumerID, dr2.getConsumerId());
          
-         MessageProxy p1 = (MessageProxy)msgs2.get(0);
-         MessageProxy p2 = (MessageProxy)msgs2.get(1);
-         MessageProxy p3 = (MessageProxy)msgs2.get(2);
-         
+
          assertEquals(del1.getDeliveryCount(), p1.getDeliveryCount());
-         assertEquals(del2.getDeliveryCount(), p2.getDeliveryCount());
-         assertEquals(del3.getDeliveryCount(), p3.getDeliveryCount());
-         
+
          JBossMessage q1 = p1.getMessage();
-         JBossMessage q2 = p1.getMessage();
-         JBossMessage q3 = p1.getMessage();
-         
-         MessageTest.ensureEquivalent(m1, q1);
-         MessageTest.ensureEquivalent(m2, q2);
-         MessageTest.ensureEquivalent(m3, q3);         
+  
+         MessageTest.ensureEquivalent(m1, q1);      
       }
       
                   
@@ -1421,66 +1373,6 @@
          assertEquals(block.getLow(), block3.getLow());
          assertEquals(block.getHigh(), block3.getHigh());                  
       }    
-      
-      public void testHandleMessageResponse() throws Exception
-      {
-         HandleMessageResponse h = new HandleMessageResponse(true, 76876);
-         
-         MessagingMarshallable mm = new MessagingMarshallable((byte)77, h);
-                  
-         InvocationResponse ir = new InvocationResponse(null, mm, false, null);
-         
-         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-         
-         OutputStream oos = new DataOutputStream(bos);
-         
-         wf.write(ir, oos);
-         
-         oos.flush();
-         
-         ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-         
-         DataInputStream dis = new DataInputStream(bis);
-                   
-         // First byte should be version
-         assertEquals(77, dis.readByte());
-         
-         int b = dis.readByte();
-         
-         assertEquals(JMSWireFormat.HANDLE_MESSAGE_RESPONSE, b);
-         
-         HandleMessageResponse h2 = new HandleMessageResponse();
-         
-         h2.read(dis);
-         
-         assertEquals(h.clientIsFull(), h2.clientIsFull());
-         assertEquals(h.getNumberAccepted(), h2.getNumberAccepted());
-         
-         //eos
-         try
-         {
-            dis.readByte();
-            fail("End of stream expected");
-         }
-         catch (EOFException e)
-         {
-            //Ok
-         }
-         
-         bis.reset();
-         
-         InputStream ois = new DataInputStream(bis);
-         
-         InvocationResponse ir2 = (InvocationResponse)wf.read(ois, null);
-         
-         mm = (MessagingMarshallable)ir2.getResult();
-         
-         assertEquals(77, mm.getVersion());
-         
-         HandleMessageResponse h3 = (HandleMessageResponse)mm.getLoad();
-         
-         assertEquals(h.clientIsFull(), h3.clientIsFull());
-         assertEquals(h.getNumberAccepted(), h3.getNumberAccepted());                 
-      }    
+            
    }
 }
\ No newline at end of file

Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -88,7 +88,7 @@
    {
       Message m = queueProducerSession.createMessage();
       queueProducer.send(m, DeliveryMode.NON_PERSISTENT, 4, 5000);
-      Message result = queueConsumer.receive(100);
+      Message result = queueConsumer.receive(1000);
       assertEquals(m.getJMSMessageID(), result.getJMSMessageID());
    }
 
@@ -97,7 +97,7 @@
       Message m = queueProducerSession.createMessage();
       queueProducer.send(m, DeliveryMode.NON_PERSISTENT, 4, 1000);
       Thread.sleep(2000);
-      assertNull(queueConsumer.receive(100));
+      assertNull(queueConsumer.receive(1000));
    }
 
    public void testExpirationOnReceiveNoWait() throws Exception

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-12-22 20:55:26 UTC (rev 1849)
@@ -1157,6 +1157,8 @@
       //is always currently used - (we could make this configurable)
 
       String transport = config.getRemotingTransport();
+      
+      log.info("*** Using transport: " + transport);
 
       String params = "/?marshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
                       "unmarshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +




More information about the jboss-cvs-commits mailing list