[jboss-cvs] JBoss Messaging SVN: r3606 - in trunk: src/main/org/jboss/jms/client/api and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 21 16:01:36 EST 2008


Author: timfox
Date: 2008-01-21 16:01:36 -0500 (Mon, 21 Jan 2008)
New Revision: 3606

Added:
   trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
Removed:
   trunk/src/main/org/jboss/jms/client/api/Consumer.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
Modified:
   trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/api/ClientBrowser.java
   trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
   trunk/src/main/org/jboss/jms/client/api/ClientSession.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java
   trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/Consumer.java
   trunk/src/main/org/jboss/messaging/core/HandleStatus.java
   trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
   trunk/tests/src/org/jboss/test/messaging/JBMBaseTestCase.java
   trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
Log:
Client side cleanup part V


Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -64,7 +64,7 @@
    
    // Attributes ----------------------------------------------------
    
-   private org.jboss.jms.client.api.Consumer cons;
+   private org.jboss.jms.client.api.ClientConsumer cons;
    
    private org.jboss.jms.client.api.ClientSession sess;
    
@@ -117,7 +117,7 @@
           
       cons = sess.createConsumerDelegate(dest.toCoreDestination(), messageSelector, false, subName, true);
 
-      this.consumerID = cons.getConsumerID();      
+      this.consumerID = cons.getID();      
         
       this.maxDeliveries = cons.getMaxDeliveries();
       
@@ -133,7 +133,7 @@
       }
 
       id = threadId.increment();
-      internalThread = new Thread(this, "Connection Consumer for dest " + dest + " id=" + id);
+      internalThread = new Thread(this, "Connection ClientConsumer for dest " + dest + " id=" + id);
       internalThread.start();
 
       if (trace) { log.trace(this + " created"); }

Modified: trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageConsumer.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -32,7 +32,7 @@
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
-import org.jboss.jms.client.api.Consumer;
+import org.jboss.jms.client.api.ClientConsumer;
 
 
 import org.jboss.jms.destination.JBossDestination;
@@ -53,11 +53,11 @@
 
    // Attributes ----------------------------------------------------
 
-   protected Consumer consumer;
+   protected ClientConsumer consumer;
 
    // Constructors --------------------------------------------------
 
-   public JBossMessageConsumer(Consumer consumer)
+   public JBossMessageConsumer(ClientConsumer consumer)
    {      
       this.consumer = consumer;
    }
@@ -102,7 +102,6 @@
 
    // QueueReceiver implementation ----------------------------------
 
-
    public Queue getQueue() throws JMSException
    {
       return (Queue)JBossDestination.fromCoreDestination(consumer.getDestination());
@@ -121,7 +120,7 @@
       return consumer.getNoLocal();
    }
 
-   public Consumer getDelegate()
+   public ClientConsumer getDelegate()
    {
        return consumer;
    }

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -52,7 +52,7 @@
 import javax.jms.XATopicSession;
 import javax.transaction.xa.XAResource;
 
-import org.jboss.jms.client.api.Consumer;
+import org.jboss.jms.client.api.ClientConsumer;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.destination.JBossTemporaryQueue;
@@ -237,7 +237,7 @@
 
       log.trace("attempting to create consumer for destination:" + d + (messageSelector == null ? "" : ", messageSelector: " + messageSelector) + (noLocal ? ", noLocal = true" : ""));
 
-      org.jboss.jms.client.api.Consumer cd = session.
+      org.jboss.jms.client.api.ClientConsumer cd = session.
          createConsumerDelegate(((JBossDestination)d).toCoreDestination(), messageSelector, noLocal, null, false);
 
       return new JBossMessageConsumer(cd);
@@ -279,7 +279,7 @@
          throw new InvalidDestinationException("Not a JBossTopic:" + topic);
       }
 
-      Consumer cd =
+      ClientConsumer cd =
          session.createConsumerDelegate(((JBossTopic)topic).toCoreDestination(), null, false, name, false);
 
       return new JBossMessageConsumer(cd);
@@ -309,7 +309,7 @@
          messageSelector = null;
       }
 
-      Consumer cd = session.
+      ClientConsumer cd = session.
          createConsumerDelegate(((JBossTopic)topic).toCoreDestination(), messageSelector, noLocal, name, false);
 
       return new JBossMessageConsumer(cd);

Modified: trunk/src/main/org/jboss/jms/client/api/ClientBrowser.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientBrowser.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/api/ClientBrowser.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -31,6 +31,4 @@
    boolean hasNextMessage() throws JMSException;
       
    Message[] nextMessageBlock(int maxMessages) throws JMSException;
-
-
  }

Modified: trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConnection.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConnection.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -29,13 +29,12 @@
 public interface ClientConnection extends Closeable
 {
    ClientSession createSessionDelegate(boolean transacted,
-         int acknowledgmentMode, boolean isXA) throws JMSException;
+                                       int acknowledgmentMode, boolean isXA) throws JMSException;
 
    String getClientID() throws JMSException;
 
    int getServerID();
    
-
    void setClientID(String id) throws JMSException;
 
    void start() throws JMSException;
@@ -59,11 +58,6 @@
                                                     ServerSessionPool sessionPool,
                                                     int maxMessages) throws JMSException;
 
-//   void registerFailoverListener(FailoverListener failoverListener);
-//   
-//   boolean unregisterFailoverListener(FailoverListener failoverListener);
-   
-   
    public void setRemotingConnection(JMSRemotingConnection conn);
    
    public Client getClient();
@@ -77,7 +71,4 @@
    public String getID();
    
    public byte getVersion();
-   
-   
-   
 }

Copied: trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java (from rev 3604, trunk/src/main/org/jboss/jms/client/api/Consumer.java)
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.jms.client.api;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.jboss.jms.client.Closeable;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.messaging.core.Destination;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public interface ClientConsumer extends Closeable
+{
+   String getID();
+   
+   void changeRate(float newRate) throws JMSException;
+
+   MessageListener getMessageListener() throws JMSException;
+
+   void setMessageListener(MessageListener listener) throws JMSException;
+
+   Destination getDestination() throws JMSException;
+
+   boolean getNoLocal() throws JMSException;
+
+   String getMessageSelector() throws JMSException;
+
+   Message receive(long timeout) throws JMSException;
+   
+   int getMaxDeliveries();
+   
+   boolean isShouldAck();
+   
+   void handleMessage(JBossMessage message) throws Exception;
+   
+   void addToFrontOfBuffer(JBossMessage message) throws JMSException;
+   
+   long getRedeliveryDelay();
+}

Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -17,7 +17,6 @@
 import org.jboss.jms.client.Closeable;
 import org.jboss.jms.client.impl.Ack;
 import org.jboss.jms.client.impl.Cancel;
-import org.jboss.jms.client.impl.ClientConsumer;
 import org.jboss.jms.client.impl.DeliveryInfo;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
@@ -38,21 +37,13 @@
  */
 public interface ClientSession extends Closeable
 {
-
-
    ClientConnection getConnection();
 
-   ClientConsumer getCallbackHandler(String consumerID);
-
-   void addCallbackHandler(ClientConsumer handler);
-
-   void removeCallbackHandler(ClientConsumer handler);
-
    String getID();
    
    /// Methods that will perform a server invocation ----------------------------------------------------------
 
-   Consumer createConsumerDelegate(Destination destination, String selector,
+   ClientConsumer createConsumerDelegate(Destination destination, String selector,
          boolean noLocal, String subscriptionName,
          boolean isCC) throws JMSException;
    
@@ -187,8 +178,4 @@
    public Object getCurrentTxId();
 
    public void setCurrentTxId(Object currentTxId);
-
-
-   
-
 }

Deleted: trunk/src/main/org/jboss/jms/client/api/Consumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/Consumer.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/api/Consumer.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -1,47 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.jms.client.api;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import org.jboss.jms.client.Closeable;
-import org.jboss.messaging.core.Destination;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- */
-public interface Consumer extends Closeable
-{
-   void changeRate(float newRate) throws JMSException;
-
-   // ConsumerEndpoint -------------------------------------------------------------------------
-   
-   // ConsumerDelegate --------------------------------------------------------------------------
-   MessageListener getMessageListener() throws JMSException;
-
-   void setMessageListener(MessageListener listener) throws JMSException;
-
-   Destination getDestination() throws JMSException;
-
-   boolean getNoLocal() throws JMSException;
-
-   String getMessageSelector() throws JMSException;
-
-   Message receive(long timeout) throws JMSException;
-   
-   String getConsumerID();
-   
-   int getMaxDeliveries();
-   
-   boolean isShouldAck();
-
-}

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -232,7 +232,6 @@
          clientID = ((GetClientIDResponse) sendBlocking(new GetClientIDRequest())).getClientID();
       }
       return clientID;
-
    }
 
    /**
@@ -382,9 +381,10 @@
             {
                log.trace("Failed to close", t);
             }
-         }
-         
+         }         
       }
+      
+      children.clear();
    }
 
    

Deleted: trunk/src/main/org/jboss/jms/client/impl/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumer.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumer.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -1,933 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.jms.client.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.jboss.jms.client.api.ClientSession;
-import org.jboss.jms.client.api.Consumer;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.PriorityLinkedList;
-import org.jboss.messaging.core.impl.PriorityLinkedListImpl;
-import org.jboss.messaging.util.Future;
-
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-/**
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox/a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision: 2774 $</tt>
- *
- * $Id: MessageCallbackHandler.java 2774 2007-06-12 22:43:54Z timfox $
- */
-public class ClientConsumer
-{
-   // Constants ------------------------------------------------------------------------------------
-   
-   private static final Logger log;
-   
-   // Static ---------------------------------------------------------------------------------------
-   
-   private static boolean trace;      
-   
-   private static final int WAIT_TIMEOUT = 30000;
-   
-   
-   static
-   {
-      log = Logger.getLogger(ClientConsumer.class);
-      trace = log.isTraceEnabled();
-   }
-   
-   private static boolean checkExpiredOrReachedMaxdeliveries(JBossMessage jbm,
-                                                             ClientSession del,
-                                                             int maxDeliveries, boolean shouldCancel)
-   {
-      Message msg = jbm.getCoreMessage();
-      
-      boolean expired = msg.isExpired();
-      
-      boolean reachedMaxDeliveries = jbm.getDeliveryCount() == maxDeliveries;
-      
-      if (expired || reachedMaxDeliveries)
-      {
-         if (trace)
-         {
-            if (expired)
-            {
-               log.trace(msg + " has expired, cancelling to server");
-            }
-            else
-            {
-               log.trace(msg + " has reached maximum delivery number " + maxDeliveries +", cancelling to server");
-            }
-         }
-         
-         if (shouldCancel)
-         {	         
-	         final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
-	                                                 expired, reachedMaxDeliveries);	         
-	         try
-	         {
-	            del.cancelDelivery(cancel);
-	         }
-	         catch (JMSException e)
-	         {
-	            log.error("Failed to cancel delivery", e);
-	         }   
-         }
-               
-         return true;
-      }
-      else
-      {
-         return false;
-      }
-   }
-        
-   //This is static so it can be called by the asf layer too
-   public static void callOnMessage(ClientSession sess,
-                                    MessageListener listener,
-                                    String consumerID,
-                                    boolean isConnectionConsumer,
-                                    JBossMessage m,
-                                    int ackMode,
-                                    int maxDeliveries,
-                                    ClientSession connectionConsumerSession,
-                                    boolean shouldAck)
-      throws JMSException
-   {      
-      if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
-      {
-         //Message has been cancelled
-         return;
-      }
-      
-      DeliveryInfo deliveryInfo =
-         new DeliveryInfo(m, consumerID, connectionConsumerSession, shouldAck);
-            
-      m.incDeliveryCount();
-      
-      // If this is the callback-handler for a connection consumer we don't want to acknowledge or
-      // add anything to the tx for this session.
-      if (!isConnectionConsumer)
-      {
-         //We need to call preDeliver, deliver the message then call postDeliver - this is because
-         //it is legal to call session.recover(), or session.rollback() from within the onMessage()
-         //method in which case the last message needs to be delivered so it needs to know about it
-         sess.preDeliver(deliveryInfo);
-      } 
-      
-      try
-      {
-         if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
-                     
-         listener.onMessage(m);
-
-         if (trace) { log.trace("listener's onMessage() finished"); }
-      }
-      catch (RuntimeException e)
-      {
-         log.error("RuntimeException was thrown from onMessage, " + m.getJMSMessageID() + " will be redelivered", e);
-         
-         // See JMS 1.1 spec 4.5.2
-
-         if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
-         {              
-            sess.recover();
-         }
-      }   
-
-      // If this is the callback-handler for a connection consumer we don't want to acknowledge
-      // or add anything to the tx for this session
-      if (!isConnectionConsumer)
-      {
-      	if (trace) { log.trace("Calling postDeliver"); }
-      	
-         sess.postDeliver();
-         
-         if (trace) { log.trace("Called postDeliver"); }
-      }   
-   }
-   
-   // Attributes -----------------------------------------------------------------------------------
-      
-   /*
-    * The buffer is now a priority linked list
-    * This resolves problems whereby messages are delivered from the server side queue in
-    * correct priority order, but because the old consumer list was not a priority list
-    * then if messages were sitting waiting to be consumed on the client side, then higher
-    * priority messages might be behind lower priority messages and thus get consumed out of order
-    */
-   private PriorityLinkedList<JBossMessage> buffer;
-   private ClientSession sessionDelegate;
-   private Consumer consumerDelegate;
-   private String consumerID;
-   private boolean isConnectionConsumer;
-   private volatile Thread receiverThread;
-   private MessageListener listener;
-   private int ackMode;
-   private boolean closed;
-   private Object mainLock;
-   private QueuedExecutor sessionExecutor;
-   private boolean listenerRunning;
-   private int maxDeliveries;
-   private long lastDeliveryId = -1;
-   private boolean waitingForLastDelivery;
-   private boolean shouldAck;
-   private long redeliveryDelay;
-   private boolean paused;      
-   private int consumeCount;
-   private boolean firstTime = true;
-   private int bufferSize;
-
-   // Constructors ---------------------------------------------------------------------------------
-
-   public ClientConsumer(boolean isCC, int ackMode,                                
-                         ClientSession sess, Consumer cons, String consumerID,
-                         String queueName,
-                         int bufferSize, QueuedExecutor sessionExecutor,
-                         int maxDeliveries, boolean shouldAck,
-                         long redeliveryDelay)
-   {
-      if (bufferSize < 1)
-      {
-         throw new IllegalArgumentException(this + " bufferSize must be > 0");
-      }
-              
-      buffer = new PriorityLinkedListImpl<JBossMessage>(10);
-      isConnectionConsumer = isCC;
-      this.ackMode = ackMode;
-      this.sessionDelegate = sess;
-      this.consumerDelegate = cons;
-      this.consumerID = consumerID;
-      mainLock = new Object();
-      this.sessionExecutor = sessionExecutor;
-      this.maxDeliveries = maxDeliveries;
-      this.shouldAck = shouldAck;
-      this.redeliveryDelay = redeliveryDelay;
-      this.bufferSize = bufferSize;
-   }
-        
-   // Public ---------------------------------------------------------------------------------------
-
-
-   public boolean isClosed()
-   {
-      return closed;
-   }
-
-   /**
-    * Handles a message sent from the server.
-    *
-    * @param message The message
-    */
-   public void handleMessage(final JBossMessage message) throws Exception
-   {
-      synchronized (mainLock)
-      {
-         if (closed)
-         {
-            // Sanity - this should never happen - we should always wait for all deliveries to arrive
-            // when closing
-            throw new IllegalStateException(this + " is closed, so ignoring message");
-         }
-
-         message.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-
-         message.doBeforeReceive();
-
-         //Add it to the buffer
-         buffer.addLast(message, message.getJMSPriority());
-
-         lastDeliveryId = message.getDeliveryId();
-
-         if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
-
-         messageAdded();
-      }
-   }
-
-   public void setMessageListener(MessageListener listener) throws JMSException
-   {     
-      synchronized (mainLock)
-      {
-         if (receiverThread != null)
-         {
-            // Should never happen
-            throw new IllegalStateException("Consumer is currently in receive(..). " +
-               "Cannot set MessageListener");
-         }
-         
-         this.listener = listener;
-                            
-         if (listener != null && !buffer.isEmpty())
-         {  
-            listenerRunning = true;
-            
-            this.queueRunner(new ListenerRunner());
-         }        
-      }   
-   }
-   
-   public void cancelBuffer() throws JMSException
-   {
-   	if (trace) { log.trace("Cancelling buffer: " + buffer.size()); }
-   	
-      synchronized (mainLock)
-      {      
-         // Now we cancel anything left in the buffer. The reason we do this now is that otherwise
-         // the deliveries wouldn't get cancelled until session close (since we don't cancel
-         // consumer's deliveries until then), which is too late - since we need to preserve the
-         // order of messages delivered in a session.
-         
-         if (shouldAck && !buffer.isEmpty())
-         {                        
-            // Now we cancel any deliveries that might be waiting in our buffer. This is because
-            // otherwise the messages wouldn't get cancelled until the corresponding session died.
-            // So if another consumer in another session tried to consume from the channel before
-            // that session died it wouldn't receive those messages.
-            // We can't just cancel all the messages in the SCE since some of those messages might
-            // have actually been delivered (unlike these) and we may want to acknowledge them
-            // later, after this consumer has been closed
-   
-            List cancels = new ArrayList();
-   
-            for(Iterator i = buffer.iterator(); i.hasNext();)
-            {
-               JBossMessage mp = (JBossMessage)i.next();
-               
-               CancelImpl cancel =
-                  new CancelImpl(mp.getDeliveryId(), mp.getDeliveryCount(), false, false);
-               
-               cancels.add(cancel);
-            }
-                  
-            if (trace) { log.trace("Calling cancelDeliveries"); }
-            sessionDelegate.cancelDeliveries(cancels);
-            if (trace) { log.trace("Done call"); }
-            
-            buffer.clear();
-         }    
-      }
-   }
-   
-   public void close(long lastDeliveryId) throws JMSException
-   {     
-   	log.trace(this + " close");
-         	
-   	//Wait for the last delivery to arrive
-      waitForLastDelivery(lastDeliveryId);
-      
-      //Important! We set the listener to null so the next ListenerRunner won't run
-      if (listener != null)
-      {
-      	setMessageListener(null);
-      }
-      
-      //Now we wait for any current listener runners to run.
-      waitForOnMessageToComplete();   
-      
-      synchronized (mainLock)
-      {         
-         if (closed)
-         {
-            return;
-         }
-         
-         closed = true;   
-         
-         if (receiverThread != null)
-         {            
-            // Wake up any receive() thread that might be waiting
-            mainLock.notify();
-         }   
-         
-         this.listener = null;
-      }
-                           
-      if (trace) { log.trace(this + " closed"); }
-   }
-     
-   /**
-    * Method used by the client thread to get a Message, if available.
-    *
-    * @param timeout - the timeout value in milliseconds. A zero timeount never expires, and the
-    *        call blocks indefinitely. A -1 timeout means receiveNoWait(): return the next message
-    *        or null if one is not immediately available. Returns null if the consumer is
-    *        concurrently closed.
-    */
-   public JBossMessage receive(long timeout) throws JMSException
-   {                
-      JBossMessage m = null;      
-      
-      synchronized (mainLock)
-      {        
-         if (trace) { log.trace(this + " receiving, timeout = " + timeout); }
-         
-         if (closed)
-         {
-            // If consumer is closed or closing calling receive returns null
-            if (trace) { log.trace(this + " closed, returning null"); }
-            return null;
-         }
-         
-         if (listener != null)
-         {
-            throw new JMSException("The consumer has a MessageListener set, " +
-               "cannot call receive(..)");
-         }
-                       
-         receiverThread = Thread.currentThread();
-               
-         long startTimestamp = System.currentTimeMillis();
-                  
-         try
-         {
-            while(true)
-            {                             
-               if (timeout == 0)
-               {
-                  if (trace) { log.trace(this + ": receive, no timeout"); }
-                  
-                  m = getMessage(0);                     
-                  
-                  if (m == null)
-                  {
-                     return null;
-                  }
-               }
-               else if (timeout == -1)
-               {
-                  //ReceiveNoWait
-                  if (trace) { log.trace(this + ": receive, noWait"); }
-                  
-                  m = getMessage(-1);                     
-                  
-                  if (m == null)
-                  {
-                     if (trace) { log.trace(this + ": no message available"); }
-                     return null;
-                  }
-               }
-               else
-               {
-                  if (trace) { log.trace(this + ": receive, timeout " + timeout + " ms, blocking poll on queue"); }
-                  
-                  m = getMessage(timeout);
-                                    
-                  if (m == null)
-                  {
-                     // timeout expired
-                     if (trace) { log.trace(this + ": " + timeout + " ms timeout expired"); }
-                     
-                     return null;
-                  }
-               }
-                              
-               if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
-                       
-               boolean ignore =
-                  checkExpiredOrReachedMaxdeliveries(m, sessionDelegate, maxDeliveries, shouldAck);
-               
-               if (!isConnectionConsumer && !ignore)
-               {
-                  DeliveryInfo info = new DeliveryInfo(m, consumerID, null, shouldAck);
-                                                    
-                  sessionDelegate.preDeliver(info);                  
-                  
-                  //If post deliver didn't succeed and acknowledgement mode is auto_ack
-                  //That means the ref wasn't acked since it couldn't be found.
-                  //In order to maintain at most once semantics we must therefore not return
-                  //the message
-                  
-                  ignore = !sessionDelegate.postDeliver();  
-                  
-                  if (trace)
-                  {
-                  	log.trace("Post deliver returned " + !ignore);
-                  }
-                  
-                  if (!ignore)
-                  {
-                     m.incDeliveryCount();                                
-                  }
-               }
-                                             
-               if (!ignore)
-               {
-                  if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
-                  
-                  break;
-               }
-               
-               if (trace)
-               {
-                  log.trace("Discarding message " + m);
-               }
-               
-               // the message expired, so discard the message, adjust timeout and reenter the buffer
-               if (timeout != 0)
-               {
-                  timeout -= System.currentTimeMillis() - startTimestamp;
-                  if (timeout == 0)
-                  {
-                     // As 0 means waitForever, we make it noWait
-                     timeout = -1;
-                  }
-
-               }
-            }           
-         }
-         finally
-         {
-            receiverThread = null;            
-         }
-      } 
-      
-      if (trace) { log.trace(this + " receive() returning " + m); }
-      
-      return m;
-   } 
-         
-   public MessageListener getMessageListener()
-   {
-      return listener;      
-   }
-
-   public String toString()
-   {
-      return "ClientConsumer[" + consumerID + "]";
-   }
-   
-   public String getConsumerId()
-   {
-      return consumerID;
-   }
-
-   public void setConsumerId(String consumerId)
-   {
-       this.consumerID = consumerId;
-   }
-   
-   public void addToFrontOfBuffer(JBossMessage proxy) throws JMSException
-   {
-      synchronized (mainLock)
-      {
-         buffer.addFirst(proxy, proxy.getJMSPriority());
-         
-         consumeCount--;
-         
-         messageAdded();
-      }
-   }
-
-   public long getRedeliveryDelay()
-   {
-   	return redeliveryDelay;
-   }
-   
-   public void pause()
-   {
-      synchronized (mainLock)
-      {
-         paused = true;
-
-         sendChangeRateMessage(0f);         
-      }
-   }
-
-   public void resume()
-   {
-      synchronized (mainLock)
-      {
-         paused = false;
-
-         if (firstTime)
-         {
-            consumeCount = 0;
-
-            firstTime = false;
-         }
-         else
-         {
-            consumeCount = bufferSize / 3 - buffer.size();
-         }
-
-         sendChangeRateMessage(1f);
-      }
-   }
-
-   
-   // Package protected ----------------------------------------------------------------------------
-   
-   // Protected ------------------------------------------------------------------------------------
-            
-   // Private --------------------------------------------------------------------------------------
-
-   private void checkSendChangeRate()
-   {
-      consumeCount++;
-      
-      if (!paused && consumeCount == bufferSize)
-      {
-         consumeCount = 0;
-
-         sendChangeRateMessage(1.0f);
-      }
-   }
-
-   /*
-    * Wait for the last delivery to arrive
-    */
-   private void waitForLastDelivery(long id)
-   {
-      if (trace) { log.trace("Waiting for last delivery id " + id); }
-      
-      if (id == -1)
-      {
-      	//No need to wait - nothing to wait for      	
-      	return;
-      }
-      
-      synchronized (mainLock)
-      {          
-         waitingForLastDelivery = true;
-         try
-         {
-            long wait = WAIT_TIMEOUT;
-            while (lastDeliveryId != id && wait > 0)
-            {
-               long start = System.currentTimeMillis();  
-               try
-               {
-                  mainLock.wait(wait);
-               }
-               catch (InterruptedException e)
-               {               
-               }
-               wait -= (System.currentTimeMillis() - start);
-            }      
-            if (trace && lastDeliveryId == id)
-            {
-               log.trace("Got last delivery");
-            }
-             
-            if (lastDeliveryId != id)
-            {
-               log.warn("Timed out waiting for last delivery " + id + " got " + lastDeliveryId); 
-            }
-         }
-         finally
-         {
-            waitingForLastDelivery = false;
-         }
-      }
-   }
-   
-   private void sendChangeRateMessage(float newRate) 
-   {
-      try
-      {
-         // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
-         // job to detect it and turn it into a remoting one way invocation.
-         consumerDelegate.changeRate(newRate);
-      }
-      catch (JMSException e)
-      {
-         log.error("Failed to send changeRate message", e);
-      }
-   }
-   
-   private void waitForOnMessageToComplete()
-   {
-      // Wait for any onMessage() executions to complete
-
-      if (Thread.currentThread().equals(sessionExecutor.getThread()))
-      {
-         // the current thread already closing this ClientConsumer (this happens when the
-         // session is closed from within the MessageListener.onMessage(), for example), so no need
-         // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
-         return;
-      }
-
-      Future result = new Future();
-      
-      try
-      {
-         sessionExecutor.execute(new Closer(result));
-
-         if (trace) { log.trace(this + " blocking wait for Closer execution"); }
-         result.getResult();
-         if (trace) { log.trace(this + " got Closer result"); }
-      }
-      catch (InterruptedException e)
-      {         
-      }
-   }
-
-   private void queueRunner(ListenerRunner runner)
-   {
-      try
-      {
-         this.sessionExecutor.execute(runner);
-      }
-      catch (InterruptedException e)
-      {         
-      }
-   }
-   
-   private void messageAdded()
-   {
-      boolean notified = false;
-      
-      if (trace) { log.trace("Receiver thread:" + receiverThread + " listener:" + listener + " listenerRunning:" + listenerRunning + 
-      		" sessionExecutor:" + sessionExecutor); }
-      
-      // If we have a thread waiting on receive() we notify it
-      if (receiverThread != null)
-      {
-         if (trace) { log.trace(this + " notifying receiver/waiter thread"); }   
-         
-         mainLock.notifyAll();
-         
-         notified = true;
-      }     
-      else if (listener != null)
-      { 
-         // We have a message listener
-         if (!listenerRunning)
-         {
-            listenerRunning = true;
-
-            if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
-            
-            this.queueRunner(new ListenerRunner());
-         }     
-         
-         //TODO - Execute onMessage on same thread for even better throughput 
-      }
-      
-      // Make sure we notify any thread waiting for last delivery
-      if (waitingForLastDelivery && !notified)
-      {
-      	if (trace) { log.trace("Notifying"); }
-      	
-         mainLock.notifyAll();
-      }
-   }
-   
-   private long waitOnLock(Object lock, long waitTime) throws InterruptedException
-   {
-      long start = System.currentTimeMillis();
-      
-      // Wait for last message to arrive
-      lock.wait(waitTime);
-     
-      long waited = System.currentTimeMillis() - start;
-      
-      if (waited < waitTime)
-      {
-         waitTime = waitTime - waited;
-         
-         return waitTime;
-      }
-      else
-      {
-         return 0;
-      }     
-   }
-        
-   private JBossMessage getMessage(long timeout)
-   {
-      if (timeout == -1)
-      {
-         // receiveNoWait so don't wait
-      }
-      else
-      {         
-         try
-         {         
-            if (timeout == 0)
-            {
-               // wait for ever potentially
-               while (!closed && buffer.isEmpty())
-               {
-                  if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
-
-                  mainLock.wait();
-
-                  if (trace) { log.trace(this + " done waiting on main lock"); }
-               }
-            }
-            else
-            {
-               // wait with timeout
-               long toWait = timeout;
-             
-               while (!closed && buffer.isEmpty() && toWait > 0)
-               {
-                  if (trace) { log.trace(this + " waiting on main lock, timeout " + toWait + " ms"); }
-
-                  toWait = waitOnLock(mainLock, toWait);
-
-                  if (trace) { log.trace(this + " done waiting on lock, buffer is " + (buffer.isEmpty() ? "" : "NOT ") + "empty"); }
-               }
-            }
-         }
-         catch (InterruptedException e)
-         {
-            if (trace) { log.trace("InterruptedException, " + this + ".getMessage() returning null"); }
-            return null;
-         } 
-      }
-
-      JBossMessage m = null;
-             
-      if (!closed && !buffer.isEmpty())
-      {
-         m = (JBossMessage)buffer.removeFirst();
-         
-         checkSendChangeRate();
-      }
-
-      return m;
-   }
-   
-   // Inner classes --------------------------------------------------------------------------------
-         
-   /*
-    * This class is used to put on the listener executor to wait for onMessage
-    * invocations to complete when closing
-    */
-   private class Closer implements Runnable
-   {
-      Future result;
-      
-      Closer(Future result)
-      {
-         this.result = result;
-      }
-      
-      public void run()
-      {
-         if (trace) { log.trace("Closer starts running"); }
-
-         result.setResult(null);
-
-         if (trace) { log.trace("Closer finished run"); }
-      }
-   }
-   
-   /*
-    * This class handles the execution of onMessage methods
-    */
-   private class ListenerRunner implements Runnable
-   {
-      public void run()
-      {         
-         JBossMessage msg = null;
-         
-         MessageListener theListener = null;
-         
-         synchronized (mainLock)
-         {
-            if (listener == null || buffer.isEmpty())
-            {
-               listenerRunning = false;
-               
-               if (trace) { log.trace("no listener or buffer is empty, returning"); }
-               
-               return;
-            }
-            
-            theListener = listener;
-            
-            // remove a message from the buffer
-
-            msg = (JBossMessage)buffer.removeFirst();                
-            
-            checkSendChangeRate();
-         }
-         
-         /*
-          * Bug here is as follows:
-          * The next runner gets scheduled BEFORE the on message is executed
-          * so if the onmessage fails on acking it will be put on hold
-          * and failover will kick in, this will clear the executor
-          * so the next queud one disappears at everything grinds to a halt
-          * 
-          * Solution - don't use a session executor - have a sesion thread instead much nicer
-          */
-                                
-         if (msg != null)
-         {
-            try
-            {
-               callOnMessage(sessionDelegate, theListener, consumerID,
-                             false, msg, ackMode, maxDeliveries, null, shouldAck);
-               
-               if (trace) { log.trace("Called callonMessage"); }
-            }
-            catch (Throwable t)
-            {
-               log.error("Failed to deliver message", t);
-            } 
-         }
-         
-         synchronized (mainLock)
-         {
-         	if (!buffer.isEmpty())
-            {
-            	//Queue up the next runner to run
-            	
-            	if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
-            	
-            	queueRunner(this);
-            	
-            	if (trace) { log.trace("Queued next onMessage to run"); }
-            }
-            else
-            {
-            	if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
-            	
-            	listenerRunning  = false;
-            }   
-         }
-                  
-         if (trace) { log.trace("Exiting run()"); }
-      }
-   }   
-}
-
-
-

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -23,27 +23,37 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
-import javax.jms.Message;
 import javax.jms.MessageListener;
+import javax.jms.Session;
 
-import org.jboss.jms.client.api.Consumer;
+import org.jboss.jms.client.api.ClientConsumer;
 import org.jboss.jms.client.api.ClientSession;
-import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.exception.MessagingShutdownException;
-import org.jboss.messaging.util.Logger;
+import org.jboss.jms.message.JBossMessage;
 import org.jboss.messaging.core.Destination;
 import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.PriorityLinkedList;
+import org.jboss.messaging.core.impl.PriorityLinkedListImpl;
+import org.jboss.messaging.core.remoting.Client;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
 import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
 import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
-import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.Logger;
 
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
 /**
- * The client-side Consumer delegate class.
+ * The client-side ClientConsumer delegate class.
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -54,13 +64,18 @@
  *
  * $Id: ClientConsumerImpl.java 3603 2008-01-21 18:49:20Z timfox $
  */
-public class ClientConsumerImpl extends CommunicationSupport<ClientConsumerImpl> implements Consumer
+public class ClientConsumerImpl extends CommunicationSupport<ClientConsumerImpl> implements ClientConsumer
 {
    // Constants ------------------------------------------------------------------------------------
 
 	private static final long serialVersionUID = 3253922610778321868L;
 
 	private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
+	
+	private static final boolean trace = log.isTraceEnabled();
+	
+	private static final int WAIT_TIMEOUT = 30000;
+      
 
    // Attributes -----------------------------------------------------------------------------------
 
@@ -68,33 +83,149 @@
    private int bufferSize;
    private int maxDeliveries;
    private long redeliveryDelay;
-
-   // State attributes -----------------------------------------------------------------------------
-
-   private String consumerID;
    private Destination destination;
    private String selector;
    private String subscriptionName;
    private boolean noLocal;
    private boolean isConnectionConsumer;
-   private ClientConsumer clientConsumer;
    private boolean storingDeliveries;
    
+   private PriorityLinkedList<JBossMessage> buffer = new PriorityLinkedListImpl<JBossMessage>(10);
+   private volatile Thread receiverThread;
+   private MessageListener listener;
+   private int ackMode;
+   private boolean closed;
+   private Object mainLock = new Object();
+   private QueuedExecutor sessionExecutor;
+   private boolean listenerRunning;
+   private long lastDeliveryId = -1;
+   private boolean waitingForLastDelivery;
+   private boolean shouldAck;
+   private boolean paused;      
+   private int consumeCount;
+   private boolean firstTime = true;
+
+   
    // Static ---------------------------------------------------------------------------------------
+   
+   private static boolean checkExpiredOrReachedMaxdeliveries(JBossMessage jbm,
+         ClientSession del,
+         int maxDeliveries, boolean shouldCancel)
+   {
+      Message msg = jbm.getCoreMessage();
 
-   // Constructors ---------------------------------------------------------------------------------
-   public ClientConsumerImpl(String objectID, int bufferSize, int maxDeliveries, long redeliveryDelay)
-   {
-      super(objectID);
-      this.bufferSize = bufferSize;
-      this.maxDeliveries = maxDeliveries;
-      this.redeliveryDelay = redeliveryDelay;
+      boolean expired = msg.isExpired();
+
+      boolean reachedMaxDeliveries = jbm.getDeliveryCount() == maxDeliveries;
+
+      if (expired || reachedMaxDeliveries)
+      {
+         if (trace)
+         {
+            if (expired)
+            {
+               log.trace(msg + " has expired, cancelling to server");
+            }
+            else
+            {
+               log.trace(msg + " has reached maximum delivery number " + maxDeliveries +", cancelling to server");
+            }
+         }
+
+         if (shouldCancel)
+         {           
+            final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
+                  expired, reachedMaxDeliveries);          
+            try
+            {
+               del.cancelDelivery(cancel);
+            }
+            catch (JMSException e)
+            {
+               log.error("Failed to cancel delivery", e);
+            }   
+         }
+
+         return true;
+      }
+      else
+      {
+         return false;
+      }
    }
 
-   public ClientConsumerImpl(ClientSession session, String objectID, int bufferSize, int maxDeliveries, long redeliveryDelay,
-         Destination dest,
-         String selector, boolean noLocal, String subscriptionName, String consumerID,
-         boolean isCC)
+   // This is static so it can be called by the asf layer too
+   public static void callOnMessage(ClientSession sess,
+         MessageListener listener,
+         String consumerID,
+         boolean isConnectionConsumer,
+         JBossMessage m,
+         int ackMode,
+         int maxDeliveries,
+         ClientSession connectionConsumerSession,
+         boolean shouldAck)
+   throws JMSException
+   {      
+      if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+      {
+         // Message has been cancelled
+         return;
+      }
+
+      DeliveryInfo deliveryInfo =
+         new DeliveryInfo(m, consumerID, connectionConsumerSession, shouldAck);
+
+      m.incDeliveryCount();
+
+      // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+      // add anything to the tx for this session.
+      if (!isConnectionConsumer)
+      {
+         // We need to call preDeliver, deliver the message then call postDeliver - this is because
+         // it is legal to call session.recover(), or session.rollback() from within the onMessage()
+         // method in which case the last message needs to be delivered so it needs to know about it
+         sess.preDeliver(deliveryInfo);
+      } 
+
+      try
+      {
+         if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+
+         listener.onMessage(m);
+
+         if (trace) { log.trace("listener's onMessage() finished"); }
+      }
+      catch (RuntimeException e)
+      {
+         log.error("RuntimeException was thrown from onMessage, " + m.getJMSMessageID() + " will be redelivered", e);
+
+         // See JMS 1.1 spec 4.5.2
+
+         if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+         {              
+            sess.recover();
+         }
+      }   
+
+      // If this is the callback-handler for a connection consumer we don't want to acknowledge
+      //or add anything to the tx for this session
+      if (!isConnectionConsumer)
+      {
+         if (trace) { log.trace("Calling postDeliver"); }
+
+         sess.postDeliver();
+
+         if (trace) { log.trace("Called postDeliver"); }
+      }   
+   }
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ClientConsumerImpl(ClientSession session, String objectID, int bufferSize,
+                             int maxDeliveries, long redeliveryDelay,
+                             Destination dest,
+                             String selector, boolean noLocal, String subscriptionName,
+                             boolean isCC, QueuedExecutor sessionExecutor)
    {
       super(objectID);
       this.session = session;
@@ -105,16 +236,11 @@
       this.selector = selector;
       this.noLocal = noLocal;
       this.subscriptionName = subscriptionName;
-      this.consumerID = consumerID;
       this.isConnectionConsumer = isCC;
+      this.sessionExecutor = sessionExecutor;
+      this.shouldAck = !(destination.getType() == DestinationType.TOPIC && subscriptionName == null);  
    }
 
-   public ClientConsumerImpl()
-   {
-   }
-
-   // DelegateSupport overrides --------------------------------------------------------------------
-
    @Override
    protected byte getVersion()
    {
@@ -145,17 +271,12 @@
 
          // First we call close on the ClientConsumer which waits for onMessage invocations
          // to complete and the last delivery to arrive
-         getClientConsumer().close(lastDeliveryId);
+         close(lastDeliveryId);
 
-         session.removeCallbackHandler(getClientConsumer());
+         PacketDispatcher.client.unregister(id);
 
-         CallbackManager cm = session.getConnection().getRemotingConnection().getCallbackManager();
-         cm.unregisterHandler(getConsumerID());
-
-         PacketDispatcher.client.unregister(getConsumerID());
-
          //And then we cancel any messages still in the message callback handler buffer
-         getClientConsumer().cancelBuffer();
+         cancelBuffer();
 
          return lastDeliveryId;
 
@@ -167,11 +288,10 @@
          if (proxiedException instanceof MessagingShutdownException /* ||
                  (connectionState.getFailoverCommandCenter() == null ) */ )
 
-
          {
-            if (!getClientConsumer().isClosed())
+            if (!this.isClosed())
             {
-               getClientConsumer().close(-1);
+               close(-1);
             }
          }
          JMSException ex = new JMSException(proxiedException.toString());
@@ -187,44 +307,25 @@
       return response.getID();
    }
 
-   // ConsumerDelegate implementation --------------------------------------------------------------
-
+   public boolean isClosed()
+   {
+      return closed;
+   }
+   
    public void changeRate(float newRate) throws JMSException
    {
       sendOneWay(new ChangeRateMessage(newRate));
    }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
+   
    public MessageListener getMessageListener()
    {
-      return getClientConsumer().getMessageListener();
+      return this.listener;
    }
 
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public Message receive(long timeout) throws JMSException
-   {
-      return getClientConsumer().receive(timeout);
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
-   public void setMessageListener(MessageListener listener) throws JMSException
-   {
-      getClientConsumer().setMessageListener(listener);
-   }
-
-   /**
-    * This invocation should either be handled by the client-side interceptor chain or by the
-    * server-side endpoint.
-    */
    public boolean getNoLocal()
    {
       return this.noLocal;
@@ -294,55 +395,690 @@
    	return redeliveryDelay;
    }
    
-   public String getConsumerID()
+   public boolean isConnectionConsumer()
    {
-      return consumerID;
+      return isConnectionConsumer;
    }
+ 
+   public String getSubscriptionName()
+   {
+      return subscriptionName;
+   }
 
-   public boolean isConnectionConsumer()
+   public void setSubscriptionName(String subscriptionName)
    {
-      return isConnectionConsumer;
+      this.subscriptionName = subscriptionName;
    }
 
-   public void setClientConsumer(ClientConsumer handler)
+   public boolean isStoringDeliveries()
    {
-      this.clientConsumer = handler;
+      return storingDeliveries;
    }
+   
+   // Protected ------------------------------------------------------------------------------------
 
-   public ClientConsumer getClientConsumer()
+   // Package Private ------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner Classes --------------------------------------------------------------------------------
+         
+   public void handleMessage(final JBossMessage message) throws Exception
    {
-      return clientConsumer;
+      synchronized (mainLock)
+      {
+         if (closed)
+         {
+            // Sanity - this should never happen - we should always wait for all deliveries to arrive
+            // when closing
+            throw new IllegalStateException(this + " is closed, so ignoring message");
+         }
+
+         message.setSessionDelegate(session, isConnectionConsumer);
+
+         message.doBeforeReceive();
+
+         //Add it to the buffer
+         buffer.addLast(message, message.getJMSPriority());
+
+         lastDeliveryId = message.getDeliveryId();
+
+         if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+         messageAdded();
+      }
    }
 
-   public String getSubscriptionName()
+   public void setMessageListener(MessageListener listener) throws JMSException
+   {     
+      synchronized (mainLock)
+      {
+         if (receiverThread != null)
+         {
+            // Should never happen
+            throw new IllegalStateException("ClientConsumer is currently in receive(..). " +
+               "Cannot set MessageListener");
+         }
+         
+         this.listener = listener;
+                            
+         if (listener != null && !buffer.isEmpty())
+         {  
+            listenerRunning = true;
+            
+            this.queueRunner(new ListenerRunner());
+         }        
+      }   
+   }
+   
+   public void cancelBuffer() throws JMSException
    {
-      return subscriptionName;
+      if (trace) { log.trace("Cancelling buffer: " + buffer.size()); }
+      
+      synchronized (mainLock)
+      {      
+         // Now we cancel anything left in the buffer. The reason we do this now is that otherwise
+         // the deliveries wouldn't get cancelled until session close (since we don't cancel
+         // consumer's deliveries until then), which is too late - since we need to preserve the
+         // order of messages delivered in a session.
+         
+         if (shouldAck && !buffer.isEmpty())
+         {                        
+            // Now we cancel any deliveries that might be waiting in our buffer. This is because
+            // otherwise the messages wouldn't get cancelled until the corresponding session died.
+            // So if another consumer in another session tried to consume from the channel before
+            // that session died it wouldn't receive those messages.
+            // We can't just cancel all the messages in the SCE since some of those messages might
+            // have actually been delivered (unlike these) and we may want to acknowledge them
+            // later, after this consumer has been closed
+   
+            List cancels = new ArrayList();
+   
+            for(Iterator i = buffer.iterator(); i.hasNext();)
+            {
+               JBossMessage mp = (JBossMessage)i.next();
+               
+               CancelImpl cancel =
+                  new CancelImpl(mp.getDeliveryId(), mp.getDeliveryCount(), false, false);
+               
+               cancels.add(cancel);
+            }
+                  
+            if (trace) { log.trace("Calling cancelDeliveries"); }
+            session.cancelDeliveries(cancels);
+            if (trace) { log.trace("Done call"); }
+            
+            buffer.clear();
+         }    
+      }
    }
+   
+   public void close(long lastDeliveryId) throws JMSException
+   {     
+      log.trace(this + " close");
+            
+      //Wait for the last delivery to arrive
+      waitForLastDelivery(lastDeliveryId);
+      
+      //Important! We set the listener to null so the next ListenerRunner won't run
+      if (listener != null)
+      {
+         setMessageListener(null);
+      }
+      
+      //Now we wait for any current listener runners to run.
+      waitForOnMessageToComplete();   
+      
+      synchronized (mainLock)
+      {         
+         if (closed)
+         {
+            return;
+         }
+         
+         closed = true;   
+         
+         if (receiverThread != null)
+         {            
+            // Wake up any receive() thread that might be waiting
+            mainLock.notify();
+         }   
+         
+         this.listener = null;
+      }
+                           
+      if (trace) { log.trace(this + " closed"); }
+   }
+     
+   /**
+    * Method used by the client thread to get a Message, if available.
+    *
+    * @param timeout - the timeout value in milliseconds. A zero timeount never expires, and the
+    *        call blocks indefinitely. A -1 timeout means receiveNoWait(): return the next message
+    *        or null if one is not immediately available. Returns null if the consumer is
+    *        concurrently closed.
+    */
+   public JBossMessage receive(long timeout) throws JMSException
+   {                
+      JBossMessage m = null;      
+      
+      synchronized (mainLock)
+      {        
+         if (trace) { log.trace(this + " receiving, timeout = " + timeout); }
+         
+         if (closed)
+         {
+            // If consumer is closed or closing calling receive returns null
+            if (trace) { log.trace(this + " closed, returning null"); }
+            return null;
+         }
+         
+         if (listener != null)
+         {
+            throw new JMSException("The consumer has a MessageListener set, " +
+               "cannot call receive(..)");
+         }
+                       
+         receiverThread = Thread.currentThread();
+               
+         long startTimestamp = System.currentTimeMillis();
+                  
+         try
+         {
+            while(true)
+            {                             
+               if (timeout == 0)
+               {
+                  if (trace) { log.trace(this + ": receive, no timeout"); }
+                  
+                  m = getMessage(0);                     
+                  
+                  if (m == null)
+                  {
+                     return null;
+                  }
+               }
+               else if (timeout == -1)
+               {
+                  //ReceiveNoWait
+                  if (trace) { log.trace(this + ": receive, noWait"); }
+                  
+                  m = getMessage(-1);                     
+                  
+                  if (m == null)
+                  {
+                     if (trace) { log.trace(this + ": no message available"); }
+                     return null;
+                  }
+               }
+               else
+               {
+                  if (trace) { log.trace(this + ": receive, timeout " + timeout + " ms, blocking poll on queue"); }
+                  
+                  m = getMessage(timeout);
+                                    
+                  if (m == null)
+                  {
+                     // timeout expired
+                     if (trace) { log.trace(this + ": " + timeout + " ms timeout expired"); }
+                     
+                     return null;
+                  }
+               }
+                              
+               if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
+                       
+               boolean ignore =
+                  checkExpiredOrReachedMaxdeliveries(m, session, maxDeliveries, shouldAck);
+               
+               if (!isConnectionConsumer && !ignore)
+               {
+                  DeliveryInfo info = new DeliveryInfo(m, id, null, shouldAck);
+                                                    
+                  session.preDeliver(info);                  
+                  
+                  //If post deliver didn't succeed and acknowledgement mode is auto_ack
+                  //That means the ref wasn't acked since it couldn't be found.
+                  //In order to maintain at most once semantics we must therefore not return
+                  //the message
+                  
+                  ignore = !session.postDeliver();  
+                  
+                  if (trace)
+                  {
+                     log.trace("Post deliver returned " + !ignore);
+                  }
+                  
+                  if (!ignore)
+                  {
+                     m.incDeliveryCount();                                
+                  }
+               }
+                                             
+               if (!ignore)
+               {
+                  if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
+                  
+                  break;
+               }
+               
+               if (trace)
+               {
+                  log.trace("Discarding message " + m);
+               }
+               
+               // the message expired, so discard the message, adjust timeout and reenter the buffer
+               if (timeout != 0)
+               {
+                  timeout -= System.currentTimeMillis() - startTimestamp;
+                  if (timeout == 0)
+                  {
+                     // As 0 means waitForever, we make it noWait
+                     timeout = -1;
+                  }
 
-   public void setSubscriptionName(String subscriptionName)
+               }
+            }           
+         }
+         finally
+         {
+            receiverThread = null;            
+         }
+      } 
+      
+      if (trace) { log.trace(this + " receive() returning " + m); }
+      
+      return m;
+   } 
+         
+   public void addToFrontOfBuffer(JBossMessage proxy) throws JMSException
    {
-      this.subscriptionName = subscriptionName;
+      synchronized (mainLock)
+      {
+         buffer.addFirst(proxy, proxy.getJMSPriority());
+         
+         consumeCount--;
+         
+         messageAdded();
+      }
    }
 
-   public boolean isStoringDeliveries()
+   public void pause()
    {
-      return storingDeliveries;
+      synchronized (mainLock)
+      {
+         paused = true;
+
+         sendChangeRateMessage(0f);         
+      }
    }
+
+   public void resume()
+   {
+      synchronized (mainLock)
+      {
+         paused = false;
+
+         if (firstTime)
+         {
+            consumeCount = 0;
+
+            firstTime = false;
+         }
+         else
+         {
+            consumeCount = bufferSize / 3 - buffer.size();
+         }
+
+         sendChangeRateMessage(1f);
+      }
+   }
    
    public boolean isShouldAck()
    {
-      //If e are a non durable subscriber to a topic then there is no need
-      //to send acks to the server - we wouldn't have stored them on the server side anyway
+      return this.shouldAck;
+   }
+
+   
+   // Package protected ----------------------------------------------------------------------------
+   
+   // Protected ------------------------------------------------------------------------------------
+            
+   // Private --------------------------------------------------------------------------------------
+
+   private void checkSendChangeRate()
+   {
+      consumeCount++;
       
-      return !(destination.getType() == DestinationType.TOPIC && subscriptionName == null);      
+      if (!paused && consumeCount == bufferSize)
+      {
+         consumeCount = 0;
+
+         sendChangeRateMessage(1.0f);
+      }
    }
+
+   /*
+    * Wait for the last delivery to arrive
+    */
+   private void waitForLastDelivery(long id)
+   {
+      if (trace) { log.trace("Waiting for last delivery id " + id); }
+      
+      if (id == -1)
+      {
+         //No need to wait - nothing to wait for         
+         return;
+      }
+      
+      synchronized (mainLock)
+      {          
+         waitingForLastDelivery = true;
+         try
+         {
+            long wait = WAIT_TIMEOUT;
+            while (lastDeliveryId != id && wait > 0)
+            {
+               long start = System.currentTimeMillis();  
+               try
+               {
+                  mainLock.wait(wait);
+               }
+               catch (InterruptedException e)
+               {               
+               }
+               wait -= (System.currentTimeMillis() - start);
+            }      
+            if (trace && lastDeliveryId == id)
+            {
+               log.trace("Got last delivery");
+            }
+             
+            if (lastDeliveryId != id)
+            {
+               log.warn("Timed out waiting for last delivery " + id + " got " + lastDeliveryId); 
+            }
+         }
+         finally
+         {
+            waitingForLastDelivery = false;
+         }
+      }
+   }
+   
+   private void sendChangeRateMessage(float newRate) 
+   {
+      try
+      {
+         // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
+         // job to detect it and turn it into a remoting one way invocation.
+         changeRate(newRate);
+      }
+      catch (JMSException e)
+      {
+         log.error("Failed to send changeRate message", e);
+      }
+   }
+   
+   private void waitForOnMessageToComplete()
+   {
+      // Wait for any onMessage() executions to complete
+
+      if (Thread.currentThread().equals(sessionExecutor.getThread()))
+      {
+         // the current thread already closing this ClientConsumer (this happens when the
+         // session is closed from within the MessageListener.onMessage(), for example), so no need
+         // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+         return;
+      }
+
+      Future result = new Future();
+      
+      try
+      {
+         sessionExecutor.execute(new Closer(result));
+
+         if (trace) { log.trace(this + " blocking wait for Closer execution"); }
+         result.getResult();
+         if (trace) { log.trace(this + " got Closer result"); }
+      }
+      catch (InterruptedException e)
+      {         
+      }
+   }
+
+   private void queueRunner(ListenerRunner runner)
+   {
+      try
+      {
+         this.sessionExecutor.execute(runner);
+      }
+      catch (InterruptedException e)
+      {         
+      }
+   }
+   
+   private void messageAdded()
+   {
+      boolean notified = false;
+      
+      if (trace) { log.trace("Receiver thread:" + receiverThread + " listener:" + listener + " listenerRunning:" + listenerRunning + 
+            " sessionExecutor:" + sessionExecutor); }
+      
+      // If we have a thread waiting on receive() we notify it
+      if (receiverThread != null)
+      {
+         if (trace) { log.trace(this + " notifying receiver/waiter thread"); }   
+         
+         mainLock.notifyAll();
+         
+         notified = true;
+      }     
+      else if (listener != null)
+      { 
+         // We have a message listener
+         if (!listenerRunning)
+         {
+            listenerRunning = true;
+
+            if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
+            
+            this.queueRunner(new ListenerRunner());
+         }     
+         
+         //TODO - Execute onMessage on same thread for even better throughput 
+      }
+      
+      // Make sure we notify any thread waiting for last delivery
+      if (waitingForLastDelivery && !notified)
+      {
+         if (trace) { log.trace("Notifying"); }
+         
+         mainLock.notifyAll();
+      }
+   }
+   
+   private long waitOnLock(Object lock, long waitTime) throws InterruptedException
+   {
+      long start = System.currentTimeMillis();
+      
+      // Wait for last message to arrive
+      lock.wait(waitTime);
      
-   // Protected ------------------------------------------------------------------------------------
+      long waited = System.currentTimeMillis() - start;
+      
+      if (waited < waitTime)
+      {
+         waitTime = waitTime - waited;
+         
+         return waitTime;
+      }
+      else
+      {
+         return 0;
+      }     
+   }
+        
+   private JBossMessage getMessage(long timeout)
+   {
+      if (timeout == -1)
+      {
+         // receiveNoWait so don't wait
+      }
+      else
+      {         
+         try
+         {         
+            if (timeout == 0)
+            {
+               // wait for ever potentially
+               while (!closed && buffer.isEmpty())
+               {
+                  if (trace) { log.trace(this + " waiting on main lock, no timeout"); }
 
-   // Package Private ------------------------------------------------------------------------------
+                  mainLock.wait();
 
-   // Private --------------------------------------------------------------------------------------
+                  if (trace) { log.trace(this + " done waiting on main lock"); }
+               }
+            }
+            else
+            {
+               // wait with timeout
+               long toWait = timeout;
+             
+               while (!closed && buffer.isEmpty() && toWait > 0)
+               {
+                  if (trace) { log.trace(this + " waiting on main lock, timeout " + toWait + " ms"); }
 
-   // Inner Classes --------------------------------------------------------------------------------
+                  toWait = waitOnLock(mainLock, toWait);
 
+                  if (trace) { log.trace(this + " done waiting on lock, buffer is " + (buffer.isEmpty() ? "" : "NOT ") + "empty"); }
+               }
+            }
+         }
+         catch (InterruptedException e)
+         {
+            if (trace) { log.trace("InterruptedException, " + this + ".getMessage() returning null"); }
+            return null;
+         } 
+      }
+
+      JBossMessage m = null;
+             
+      if (!closed && !buffer.isEmpty())
+      {
+         m = (JBossMessage)buffer.removeFirst();
+         
+         checkSendChangeRate();
+      }
+
+      return m;
+   }
+   
+   // Inner classes --------------------------------------------------------------------------------
+         
+   /*
+    * This class is used to put on the listener executor to wait for onMessage
+    * invocations to complete when closing
+    */
+   private class Closer implements Runnable
+   {
+      Future result;
+      
+      Closer(Future result)
+      {
+         this.result = result;
+      }
+      
+      public void run()
+      {
+         if (trace) { log.trace("Closer starts running"); }
+
+         result.setResult(null);
+
+         if (trace) { log.trace("Closer finished run"); }
+      }
+   }
+   
+   /*
+    * This class handles the execution of onMessage methods
+    */
+   private class ListenerRunner implements Runnable
+   {
+      public void run()
+      {         
+         JBossMessage msg = null;
+         
+         MessageListener theListener = null;
+         
+         synchronized (mainLock)
+         {
+            if (listener == null || buffer.isEmpty())
+            {
+               listenerRunning = false;
+               
+               if (trace) { log.trace("no listener or buffer is empty, returning"); }
+               
+               return;
+            }
+            
+            theListener = listener;
+            
+            // remove a message from the buffer
+
+            msg = (JBossMessage)buffer.removeFirst();                
+            
+            checkSendChangeRate();
+         }
+         
+         /*
+          * Bug here is as follows:
+          * The next runner gets scheduled BEFORE the on message is executed
+          * so if the onmessage fails on acking it will be put on hold
+          * and failover will kick in, this will clear the executor
+          * so the next queud one disappears at everything grinds to a halt
+          * 
+          * Solution - don't use a session executor - have a sesion thread instead much nicer
+          */
+                                
+         if (msg != null)
+         {
+            try
+            {
+               callOnMessage(session, theListener, id,
+                             false, msg, ackMode, maxDeliveries, null, shouldAck);
+               
+               if (trace) { log.trace("Called callonMessage"); }
+            }
+            catch (Throwable t)
+            {
+               log.error("Failed to deliver message", t);
+            } 
+         }
+         
+         synchronized (mainLock)
+         {
+            if (!buffer.isEmpty())
+            {
+               //Queue up the next runner to run
+               
+               if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
+               
+               queueRunner(this);
+               
+               if (trace) { log.trace("Queued next onMessage to run"); }
+            }
+            else
+            {
+               if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
+               
+               listenerRunning  = false;
+            }   
+         }
+                  
+         if (trace) { log.trace("Exiting run()"); }
+      }
+   }   
+   
+
 }

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -1,5 +1,6 @@
 package org.jboss.jms.client.impl;
 
+import org.jboss.jms.client.api.ClientConsumer;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
@@ -15,23 +16,18 @@
  */
 public class ClientConsumerPacketHandler implements PacketHandler
 {
-   /**
-    * 
-    */
-   private final ClientConsumer messageHandler;
-   /**
-    * 
-    */
+   private final ClientConsumer clientConsumer;
+
    private final String consumerID;
 
    /**
     * @param messageHandler
     * @param consumerID
     */
-   public ClientConsumerPacketHandler(ClientConsumer messageHandler,
+   public ClientConsumerPacketHandler(ClientConsumer clientConsumer,
          String consumerID)
    {
-      this.messageHandler = messageHandler;
+      this.clientConsumer = clientConsumer;
       this.consumerID = consumerID;
    }
 
@@ -53,7 +49,7 @@
             
             msg.doBeforeReceive();
             
-            messageHandler.handleMessage(msg);
+            clientConsumer.handleMessage(msg);
          }
       } catch (Exception e)
       {

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -29,7 +29,6 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.IllegalStateException;
@@ -43,10 +42,9 @@
 import org.jboss.jms.client.SelectorTranslator;
 import org.jboss.jms.client.api.ClientBrowser;
 import org.jboss.jms.client.api.ClientConnection;
+import org.jboss.jms.client.api.ClientConsumer;
 import org.jboss.jms.client.api.ClientProducer;
 import org.jboss.jms.client.api.ClientSession;
-import org.jboss.jms.client.api.Consumer;
-import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.destination.JBossTopic;
@@ -60,7 +58,6 @@
 import org.jboss.jms.tx.MessagingXAResource;
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.messaging.core.Destination;
-import org.jboss.messaging.core.DestinationType;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.remoting.Client;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
@@ -83,9 +80,7 @@
 import org.jboss.messaging.core.remoting.wireformat.SendMessage;
 import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
 import org.jboss.messaging.util.ClearableQueuedExecutor;
-import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.util.MessageQueueNameHelper;
 import org.jboss.messaging.util.ProxyFactory;
 
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
@@ -120,13 +115,12 @@
    
    private ClientConnection connection;
    
-   // Attributes that used to live on SessionState -------------------------------------------------
+   protected Map<String, Closeable> children = new ConcurrentHashMap<String, Closeable>();
    
-   protected Set<Closeable> children = new ConcurrentHashSet<Closeable>();
-
-   
    private int acknowledgeMode;
+   
    private boolean transacted;
+   
    private boolean xa;
 
    private MessagingXAResource xaResource;
@@ -141,7 +135,7 @@
    private List<Ack> clientAckList;
 
    private DeliveryInfo autoAckInfo;
-   private Map callbackHandlers = new ConcurrentHashMap();
+   //private Map callbackHandlers = new ConcurrentHashMap();
    
    private LinkedList asfMessages = new LinkedList();
    
@@ -159,11 +153,6 @@
    
    // Constructors ---------------------------------------------------------------------------------
    
-
-   // Static ---------------------------------------------------------------------------------------
-
-   // Constructors ---------------------------------------------------------------------------------
-
    public ClientSessionImpl(ClientConnection connection, String objectID, int dupsOKBatchSize)
    {
       super(objectID);
@@ -204,10 +193,6 @@
    {
    }
 
-   // DelegateSupport overrides --------------------------------------------------------------------
-
-   // Closeable implementation ---------------------------------------------------------------------
-
    public void close() throws JMSException
    {
       sendBlocking(new CloseMessage());
@@ -223,7 +208,6 @@
       // We must explicitly shutdown the executor
 
       getExecutor().shutdownNow();
-
    }
 
    private long invokeClosing(long sequence) throws JMSException
@@ -236,17 +220,19 @@
    
    private void closeChildren() throws JMSException
    {
-      for (Closeable child: children)
+      for (Closeable child: children.values())
       {
          child.closing(-1);
          child.close();
       }
+      
+      children.clear();
    }
 
    public long closing(long sequence) throws JMSException
    {
       if (trace) { log.trace("handleClosing()"); }
-
+      
       closeChildren();
       
       //Sanity check
@@ -426,7 +412,7 @@
       CreateBrowserResponse response = (CreateBrowserResponse) sendBlocking(request);
       ClientBrowserImpl delegate = new ClientBrowserImpl(this, response.getBrowserID(), queue, messageSelector);
       ClientBrowser proxy = (ClientBrowser)ProxyFactory.proxy(delegate, ClientBrowser.class);
-      children.add(proxy);
+      children.put(delegate.getID(), proxy);
       return proxy;
    }
 
@@ -441,7 +427,7 @@
    }
 
 
-   public Consumer createConsumerDelegate(Destination destination, String selector,
+   public ClientConsumer createConsumerDelegate(Destination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
                                                   boolean isCC) throws JMSException
    {
@@ -451,46 +437,19 @@
       
       CreateConsumerResponse response = (CreateConsumerResponse) sendBlocking(request);
 
-      ClientConsumerImpl consumerDelegate = new ClientConsumerImpl(this, response.getConsumerID(), response.getBufferSize(), response.getMaxDeliveries(), response.getRedeliveryDelay(),
+      ClientConsumerImpl consumerDelegate =
+         new ClientConsumerImpl(this, response.getConsumerID(), response.getBufferSize(),
+               response.getMaxDeliveries(), response.getRedeliveryDelay(),
             destination,
-            selector, noLocal, subscriptionName, response.getConsumerID(),isCC);      
+            selector, noLocal, subscriptionName,
+            isCC, this.getExecutor());
 
-      Consumer proxy = (Consumer)ProxyFactory.proxy(consumerDelegate, Consumer.class);
+      ClientConsumer proxy = (ClientConsumer)ProxyFactory.proxy(consumerDelegate, ClientConsumer.class);
       
-      children.add(proxy);
+      children.put(consumerDelegate.getID(), proxy);
 
-      //We need the queue name for recovering any deliveries after failover
-      String queueName = null;
-      if (subscriptionName != null)
-      {
-         // I have to use the clientID from connectionDelegate instead of connectionState...
-         // this is because when a pre configured CF is used we need to get the clientID from
-         // server side.
-         // This was a condition verified by the TCK and it was fixed as part of
-         // http://jira.jboss.com/jira/browse/JBMESSAGING-939
-         queueName = MessageQueueNameHelper.
-            createSubscriptionName(this.getID(),subscriptionName);
-      }
-      else if (destination.getType() == DestinationType.QUEUE)
-      {
-         queueName = destination.getName();
-      }
+      PacketDispatcher.client.register(new ClientConsumerPacketHandler(consumerDelegate, consumerDelegate.getID()));
 
-      final ClientConsumer messageHandler =
-         new ClientConsumer(isCC, this.getAcknowledgeMode(),
-                            this, consumerDelegate, consumerDelegate.getID(), queueName,
-                            consumerDelegate.getBufferSize(), this.getExecutor(), consumerDelegate.getMaxDeliveries(), consumerDelegate.isShouldAck(),
-                            consumerDelegate.getRedeliveryDelay());
-
-      this.addCallbackHandler(messageHandler);
-
-      PacketDispatcher.client.register(new ClientConsumerPacketHandler(messageHandler, consumerDelegate.getID()));
-
-      CallbackManager cm = connection.getRemotingConnection().getCallbackManager();
-      cm.registerHandler(consumerDelegate.getID(), messageHandler);
-
-      consumerDelegate.setClientConsumer(messageHandler);
-
       //Now we have finished creating the client consumer, we can tell the SCD
       //we are ready
       consumerDelegate.changeRate(1);
@@ -550,7 +509,7 @@
 
       ClientProducerImpl producerDelegate = new ClientProducerImpl(connection, this, destination );
       ClientProducer proxy = (ClientProducer) ProxyFactory.proxy(producerDelegate, ClientProducer.class);
-      children.add(proxy);
+      children.put(producerDelegate.getID(), proxy);
       return proxy;
    }
 
@@ -837,7 +796,7 @@
          DeliveryInfo info = (DeliveryInfo)toRedeliver.get(i);
          JBossMessage msg = info.getMessage();
 
-         ClientConsumer handler = getCallbackHandler(info.getConsumerId());
+         ClientConsumer handler = (ClientConsumer)children.get(info.getConsumerId());
 
          if (handler == null)
          {
@@ -861,23 +820,6 @@
 
    }
    
-   public ClientConsumer getCallbackHandler(String consumerID)
-   {
-      return (ClientConsumer)callbackHandlers.get(consumerID);
-   }
-
-   public void addCallbackHandler(ClientConsumer handler)
-   {
-      callbackHandlers.put(handler.getConsumerId(), handler);
-   }
-
-   public void removeCallbackHandler(ClientConsumer handler)
-   {
-      callbackHandlers.remove(handler.getConsumerId());
-   }
-
-   
-
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
@@ -925,7 +867,7 @@
 
          if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
 
-         ClientConsumer.callOnMessage(this, getDistinguishedListener(), holder.consumerID,
+         ClientConsumerImpl.callOnMessage(this, getDistinguishedListener(), holder.consumerID,
                                       false,
                                       holder.msg, ackMode, holder.maxDeliveries,
                                       holder.connectionConsumerDelegate, holder.shouldAck);
@@ -1294,16 +1236,16 @@
       this.autoAckInfo = autoAckInfo;
    }
 
-   public Map getCallbackHandlers()
-   {
-      return callbackHandlers;
-   }
+//   public Map getCallbackHandlers()
+//   {
+//      return callbackHandlers;
+//   }
+//
+//   public void setCallbackHandlers(Map callbackHandlers)
+//   {
+//      this.callbackHandlers = callbackHandlers;
+//   }
 
-   public void setCallbackHandlers(Map callbackHandlers)
-   {
-      this.callbackHandlers = callbackHandlers;
-   }
-
    public LinkedList getAsfMessages()
    {
       return asfMessages;

Modified: trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/impl/CommunicationSupport.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -58,9 +58,7 @@
    {
       this("NO_ID_SET");
    }
-   
-   
-   
+         
    // Streamable implementation --------------------------------------------------------------------
 
    public void read(DataInputStream in) throws Exception

Deleted: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -1,95 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.jms.client.remoting;
-
-import java.util.Map;
-
-import org.jboss.jms.client.impl.ClientConsumer;
-import org.jboss.messaging.util.Logger;
-
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-
-/**
- * The CallbackManager is an InvocationHandler used for handling callbacks to message consumers.
- * The callback is received and dispatched off to the relevant consumer.
- * 
- * There is one instance of this class per remoting connection - which is to a unique server -
- * therefore there is no need to add the server id to the key when doing look ups.
- * 
- * TODO this class should be merged with use of PacketDispatcher.client instance and 
- * ClientConsumerPacketHandler should wrap ClientConsumer class
- * 
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class CallbackManager
-{
-   // Constants ------------------------------------------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(CallbackManager.class);
-
-   public static final String JMS_CALLBACK_SUBSYSTEM = "CALLBACK";
-
-   // Static ---------------------------------------------------------------------------------------
-
-   private static boolean trace = log.isTraceEnabled();
-
-   // Attributes -----------------------------------------------------------------------------------
-
-   protected Map<String, ClientConsumer> callbackHandlers;
-
-   // Constructors ---------------------------------------------------------------------------------
-
-   public CallbackManager()
-   {
-      callbackHandlers = new ConcurrentReaderHashMap();
-   }
-
-   // Public ---------------------------------------------------------------------------------------
-
-   public void registerHandler(String consumerID, ClientConsumer handler)
-   {
-      callbackHandlers.put(consumerID, handler);
-   }
-
-   public ClientConsumer unregisterHandler(String consumerID)
-   { 
-      return callbackHandlers.remove(consumerID);
-   }
-
-   public String toString()
-   {
-      return "CallbackManager[" + Integer.toHexString(hashCode()) + "]";
-   }
-
-   // Package protected ----------------------------------------------------------------------------
-
-   // Protected ------------------------------------------------------------------------------------
-
-   // Private --------------------------------------------------------------------------------------
-
-   // Inner classes --------------------------------------------------------------------------------
-
-}

Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -56,7 +56,7 @@
 
    private Client client;
 
-   private CallbackManager callbackManager;
+   //private CallbackManager callbackManager;
 
    // When a failover is performed, this flag is set to true
    protected boolean failed = false;
@@ -80,7 +80,7 @@
    {
       if (log.isTraceEnabled()) { log.trace(this + " created client"); }
 
-      callbackManager = new CallbackManager();
+      //callbackManager = new CallbackManager();
 
       NIOConnector connector = REGISTRY.getConnector(serverLocator);
       client = new ClientImpl(connector, serverLocator);
@@ -118,12 +118,12 @@
       return client;
    }
 
-   public CallbackManager getCallbackManager()
-   {
-      return callbackManager;
-   }
+//   public CallbackManager getCallbackManager()
+//   {
+//      return callbackManager;
+//   }
+//
 
-
     public synchronized boolean isFailed()
    {
       return failed;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -55,7 +55,7 @@
 import org.jboss.messaging.util.Logger;
 
 /**
- * Concrete implementation of a Consumer. 
+ * Concrete implementation of a ClientConsumer. 
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -51,12 +51,11 @@
 import javax.jms.JMSException;
 
 import org.jboss.jms.client.api.ClientBrowser;
-import org.jboss.jms.client.api.Consumer;
 import org.jboss.jms.client.impl.Ack;
+import org.jboss.jms.client.impl.AckImpl;
 import org.jboss.jms.client.impl.Cancel;
 import org.jboss.jms.client.impl.ClientBrowserImpl;
 import org.jboss.jms.client.impl.ClientConsumerImpl;
-import org.jboss.jms.client.impl.AckImpl;
 import org.jboss.jms.client.impl.DeliveryInfo;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
@@ -65,7 +64,6 @@
 import org.jboss.jms.server.DestinationManager;
 import org.jboss.jms.server.container.SecurityAspect;
 import org.jboss.jms.server.security.CheckType;
-import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.core.Binding;
 import org.jboss.messaging.core.Condition;
 import org.jboss.messaging.core.Destination;
@@ -106,6 +104,7 @@
 import org.jboss.messaging.core.remoting.wireformat.SendMessage;
 import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
 import org.jboss.messaging.util.ExceptionUtil;
+import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessageQueueNameHelper;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
@@ -234,11 +233,11 @@
       }
    }
 
-   public Consumer createConsumerDelegate(Destination destination,
-                                                  String filterString,
-                                                  boolean noLocal,
-                                                  String subscriptionName,
-                                                  boolean isCC) throws JMSException
+   public CreateConsumerResponse createConsumerDelegate(Destination destination,
+                                                        String filterString,
+                                                        boolean noLocal,
+                                                        String subscriptionName,
+                                                        boolean isCC) throws JMSException
    {
       
       checkSecurityCreateConsumerDelegate(destination, subscriptionName);
@@ -1199,10 +1198,10 @@
       return false;      
    }
       
-   private Consumer createConsumerDelegateInternal(Destination destination,
-                                                  String filterString,
-                                                  boolean noLocal,
-                                                  String subscriptionName)
+   private CreateConsumerResponse createConsumerDelegateInternal(Destination destination,
+                                                                 String filterString,
+                                                                 boolean noLocal,
+                                                                 String subscriptionName)
       throws Exception
    {
       if (closed)
@@ -1381,7 +1380,7 @@
       }
       else
       {
-         // Consumer on a jms queue
+         // ClientConsumer on a jms queue
          
       	List<Binding> bindings = postOffice.getBindingsForQueueName(destination.getName());
          
@@ -1425,10 +1424,10 @@
 //      	rep.put(queue.getName(), DUR_SUB_STATE_CONSUMERS);
 //      }
       connectionEndpoint.getMessagingServer().getMinaService().getDispatcher().register(ep.newHandler());
+        
+      CreateConsumerResponse response = new CreateConsumerResponse(consumerID, prefetchSize,
+                                                                   maxDeliveryAttemptsToUse, redeliveryDelayToUse );
       
-      ClientConsumerImpl stub =
-         new ClientConsumerImpl(consumerID, prefetchSize, maxDeliveryAttemptsToUse, redeliveryDelayToUse);
-      
       synchronized (consumers)
       {
          consumers.put(consumerID, ep);
@@ -1436,7 +1435,7 @@
          
       log.trace(this + " created and registered " + ep);
       
-      return stub;
+      return response;
    }   
 
    private ClientBrowser createBrowserDelegateInternal(Destination destination,
@@ -1664,14 +1663,10 @@
             } else if (type == REQ_CREATECONSUMER)
             {
                CreateConsumerRequest request = (CreateConsumerRequest) packet;
-               ClientConsumerImpl consumer = (ClientConsumerImpl) createConsumerDelegate(
-                     request.getDestination(), request.getSelector(), request
+               response = createConsumerDelegate(
+                                request.getDestination(), request.getSelector(), request
                            .isNoLocal(), request.getSubscriptionName(), request
                            .isConnectionConsumer());
-
-               response = new CreateConsumerResponse(consumer.getID(), consumer
-                     .getBufferSize(), consumer.getMaxDeliveries(), consumer
-                     .getRedeliveryDelay());
             } else if (type == REQ_CREATEDESTINATION)
             {
                CreateDestinationRequest request = (CreateDestinationRequest) packet;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -28,13 +28,13 @@
 
 import org.jboss.jms.client.Closeable;
 import org.jboss.jms.client.api.ClientBrowser;
-import org.jboss.jms.client.api.Consumer;
 import org.jboss.jms.client.impl.Ack;
 import org.jboss.jms.client.impl.Cancel;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.destination.JBossTopic;
 import org.jboss.messaging.core.Destination;
 import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -47,7 +47,7 @@
  */
 public interface SessionEndpoint extends Closeable
 {
-   Consumer createConsumerDelegate(Destination destination, String selector,
+   CreateConsumerResponse createConsumerDelegate(Destination destination, String selector,
                                            boolean noLocal, String subscriptionName,
                                            boolean connectionConsumer) throws JMSException;
    

Modified: trunk/src/main/org/jboss/messaging/core/Consumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Consumer.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/messaging/core/Consumer.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -24,7 +24,7 @@
 
 /**
  * 
- * A Consumer
+ * A ClientConsumer
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *

Modified: trunk/src/main/org/jboss/messaging/core/HandleStatus.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/HandleStatus.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/messaging/core/HandleStatus.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -29,7 +29,7 @@
  * 
  * NO_MATCH means the MessageReference was rejected by a Filter
  * 
- * BUSY means the MessageReference was rejected since the Consumer was busy
+ * BUSY means the MessageReference was rejected since the ClientConsumer was busy
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *

Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -516,7 +516,7 @@
          
          if (status == null)
          {
-            throw new IllegalStateException("Consumer.handle() should never return null");
+            throw new IllegalStateException("ClientConsumer.handle() should never return null");
          }
          
          if (status == HandleStatus.HANDLED)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -56,7 +56,7 @@
    MSG_CANCELDELIVERIES           ((byte)56),
    MSG_UNSUBSCRIBE                ((byte)57),
    
-   // Consumer 
+   // ClientConsumer 
    MSG_CHANGERATE                 ((byte)70),
    
    // Browser

Modified: trunk/tests/src/org/jboss/test/messaging/JBMBaseTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/JBMBaseTestCase.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/tests/src/org/jboss/test/messaging/JBMBaseTestCase.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -43,7 +43,7 @@
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossSession;
 import org.jboss.jms.client.JBossMessageConsumer;
-import org.jboss.jms.client.api.Consumer;
+import org.jboss.jms.client.api.ClientConsumer;
 import org.jboss.jms.client.impl.ClientConnectionImpl;
 import org.jboss.jms.client.impl.ClientConsumerImpl;
 import org.jboss.jms.client.impl.ClientSessionImpl;

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -365,7 +365,7 @@
          // OK
       }
 
-      // Consumer
+      // ClientConsumer
 
       try
       {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -2708,7 +2708,7 @@
          assertEquals(Session.AUTO_ACKNOWLEDGE, sess3.getAcknowledgeMode());
          MessageConsumer consumer3 = sess3.createConsumer(topic1, null, true);
 
-         //Consumer 1 should not get the message but consumers 2 and 3 should
+         //ClientConsumer 1 should not get the message but consumers 2 and 3 should
 
          conn1.start();
          conn2.start();

Modified: trunk/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java	2008-01-21 19:19:28 UTC (rev 3605)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java	2008-01-21 21:01:36 UTC (rev 3606)
@@ -154,7 +154,7 @@
                      else
                      if (finished.getWorker() instanceof SeveralClientsStressTest.Consumer)
                      {
-                        if (info) log.info("Scheduling new Consumer " + numberOfConsumers);
+                        if (info) log.info("Scheduling new ClientConsumer " + numberOfConsumers);
                         SeveralClientsStressTest.Consumer consumer = new SeveralClientsStressTest.Consumer(numberOfConsumers++, testChannel);
                         threads.add(consumer);
                         consumer.start();
@@ -378,7 +378,7 @@
    {
       public Consumer(int consumerId, LinkedQueue messageQueue)
       {
-         super("Consumer:" + consumerId, consumerId, messageQueue);
+         super("ClientConsumer:" + consumerId, consumerId, messageQueue);
       }
 
       public void run()
@@ -395,7 +395,7 @@
             Connection conn = cf.createConnection();
             Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
             MessageConsumer consumer = sess.createConsumer(queue);
-            if (info) log.info("Consumer was created");
+            if (info) log.info("ClientConsumer was created");
 
             conn.start();
 




More information about the jboss-cvs-commits mailing list