[jboss-cvs] JBoss Messaging SVN: r3682 - in trunk: src/main/org/jboss/jms/client/api and 12 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 8 09:04:33 EST 2008


Author: timfox
Date: 2008-02-08 09:04:33 -0500 (Fri, 08 Feb 2008)
New Revision: 3682

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerFlowTokenMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java
   trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java
   trunk/src/main/org/jboss/messaging/util/ConcurrentReaderHashSet.java
Modified:
   trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/api/ClientSession.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateConsumerResponseMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java
   trunk/src/main/org/jboss/messaging/util/AbstractHashSet.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptor.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptorB.java
   trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/TemporaryDestinationTest.java
Log:
Proper token based flow control and some tweaks


Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,6 +21,8 @@
   */
 package org.jboss.jms.client;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.ConnectionConsumer;
 import javax.jms.JMSException;
 import javax.jms.ServerSessionPool;
@@ -28,8 +30,6 @@
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.messaging.util.Logger;
 
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
 /**
  * This class implements javax.jms.ConnectionConsumer
  * 
@@ -80,7 +80,7 @@
    private int id;
    
    /** The thread id generator */
-   private static SynchronizedInt threadId = new SynchronizedInt(0);
+   private static AtomicInteger threadId = new AtomicInteger(0);
    
    private int maxDeliveries;
    

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -492,7 +492,7 @@
                throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist");
             }
             
-            consumer = session.createConsumer(dest.getAddress(), coreFilterString, noLocal, false);
+            consumer = session.createConsumer(dest.getAddress(), coreFilterString, noLocal, false, false);
          }
          else
          {
@@ -513,7 +513,7 @@
                
                session.createQueue(dest.getAddress(), queueName, coreFilterString, false, false);
                
-               consumer = session.createConsumer(queueName, null, noLocal, true);
+               consumer = session.createConsumer(queueName, null, noLocal, true, false);
             }
             else
             {
@@ -575,7 +575,7 @@
                   }                          
                }
                
-               consumer = session.createConsumer(queueName, null, noLocal, false);
+               consumer = session.createConsumer(queueName, null, noLocal, false, false);
             }         
          }
          

Modified: trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -24,8 +24,6 @@
 
    void setMessageHandler(MessageHandler handler) throws MessagingException;
    
-   String getQueueName();
-        
    void close() throws MessagingException;
    
    boolean isClosed();      

Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -33,8 +33,8 @@
    
    SessionBindingQueryResponseMessage bindingQuery(String address) throws MessagingException;
    
-   ClientConsumer createConsumer(String queueName, String filterString,
-                                 boolean noLocal, boolean autoDeleteQueue) throws MessagingException;
+   ClientConsumer createConsumer(String queueName, String filterString, boolean noLocal,
+                                 boolean autoDeleteQueue, boolean direct) throws MessagingException;
    
    ClientBrowser createBrowser(String queueName, String messageSelector) throws MessagingException;
    

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -25,7 +25,6 @@
 
 import org.jboss.jms.client.api.ClientConnection;
 import org.jboss.jms.client.api.ClientConnectionFactory;
-import org.jboss.jms.client.plugin.LoadBalancingFactory;
 import org.jboss.jms.client.remoting.MessagingRemotingConnection;
 import org.jboss.messaging.core.remoting.RemotingConfiguration;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,6 +21,8 @@
  */
 package org.jboss.jms.client.impl;
 
+import java.util.concurrent.ExecutorService;
+
 import org.jboss.jms.client.api.MessageHandler;
 import org.jboss.jms.client.remoting.MessagingRemotingConnection;
 import org.jboss.messaging.core.Message;
@@ -28,14 +30,12 @@
 import org.jboss.messaging.core.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
 import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -59,23 +59,35 @@
    // -----------------------------------------------------------------------------------
 
    private String id;
+   
    private ClientSessionInternal session;
-   private int bufferSize;
-   private PriorityLinkedList<DeliverMessage> buffer = new PriorityLinkedListImpl<DeliverMessage>(
-         10);
+   
+   private PriorityLinkedList<DeliverMessage> buffer = new PriorityLinkedListImpl<DeliverMessage>(10);
+   
    private volatile Thread receiverThread;
+   
    private MessageHandler handler;
+   
    private volatile boolean closed;
+   
    private Object mainLock = new Object();
-   private QueuedExecutor sessionExecutor;
+   
+   private ExecutorService sessionExecutor;
+   
    private boolean listenerRunning;
-   private int consumeCount;
+   
    private MessagingRemotingConnection remotingConnection;
-   private String queueName;
+   
    private long ignoreDeliveryMark = -1;
-
-   // FIXME - revisit closed and closing flags
-
+   
+   private boolean direct;
+   
+   private Thread onMessageThread;
+   
+   private int tokensToSend;
+   
+   private int tokenBatchSize;
+   
    // Static
    // ---------------------------------------------------------------------------------------
 
@@ -83,15 +95,16 @@
    // ---------------------------------------------------------------------------------
 
    public ClientConsumerImpl(ClientSessionInternal session, String id,
-         int bufferSize, QueuedExecutor sessionExecutor,
-         MessagingRemotingConnection remotingConnection, String queueName)
+                             ExecutorService sessionExecutor,
+                             MessagingRemotingConnection remotingConnection,
+                             boolean direct, int tokenBatchSize)
    {
       this.id = id;
       this.session = session;
-      this.bufferSize = bufferSize;
       this.sessionExecutor = sessionExecutor;
       this.remotingConnection = remotingConnection;
-      this.queueName = queueName;
+      this.direct = direct;
+      this.tokenBatchSize = tokenBatchSize;
    }
 
    // ClientConsumer implementation
@@ -101,16 +114,11 @@
    {
       checkClosed();
 
-      DeliverMessage m = null;
-
       synchronized (mainLock)
       {
-         if (closed) { return null; }
-
          if (handler != null)
          {
-            throw new MessagingException(
-               MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
+            throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
          }
 
          receiverThread = Thread.currentThread();
@@ -148,13 +156,13 @@
                        
                if (!closed && !buffer.isEmpty())
                {                              
-                  m = buffer.removeFirst();
+                  DeliverMessage m = buffer.removeFirst();
                   
                   boolean expired = m.getMessage().isExpired();
                   
                   session.delivered(m.getDeliveryID(), expired);
                   
-                  checkSendChangeRate();
+                  flowControl();
                                     
                   if (expired)
                   {
@@ -198,47 +206,40 @@
    public void setMessageHandler(MessageHandler handler) throws MessagingException
    {
       checkClosed();
-
-      synchronized (mainLock)
+      
+      if (receiverThread != null)
       {
-         if (receiverThread != null) { throw new MessagingException(
-               MessagingException.ILLEGAL_STATE,
-               "Cannot set MessageHandler - consumer is in receive(...)"); }
+         throw new MessagingException(MessagingException.ILLEGAL_STATE,"Cannot set MessageHandler - consumer is in receive(...)");
+      }
 
+      synchronized (mainLock)
+      {         
          this.handler = handler;
 
          if (handler != null && !buffer.isEmpty())
          {
             listenerRunning = true;
 
-            this.queueRunner(new ListenerRunner());
+            queueRunner();
          }
       }
    }
 
-   public String getQueueName()
+   public void close() throws MessagingException
    {
-      return queueName;
-   }
+      if (closed)
+      {
+         return;
+      }
 
-   public synchronized void close() throws MessagingException
-   {
-      if (closed) { return; }
-
       try
       {
-         // Important! We set the handler to null so the next ListenerRunner
-         // won't run
-         if (handler != null)
-         {
-            setMessageHandler(null);
-         }
+         // We set the handler to null so the next ListenerRunner won't run
+         handler = null;
 
          // Now we wait for any current handler runners to run.
          waitForOnMessageToComplete();
 
-         // TODO sort out these close and closing flags
-
          synchronized (mainLock)
          {
             closed = true;
@@ -248,25 +249,16 @@
                // Wake up any receive() thread that might be waiting
                mainLock.notify();
             }
-
-            this.handler = null;
          }
 
          remotingConnection.send(id, new CloseMessage());
 
          PacketDispatcher.client.unregister(id);
-
-         if (trace)
-         {
-            log.trace(this + " closed");
-         }
-
       }
       finally
       {
          session.removeConsumer(this);
       }
-
    }
 
    public boolean isClosed()
@@ -282,13 +274,13 @@
       return id;
    }
 
-   public void changeRate(float newRate) throws MessagingException
-   {
-      checkClosed();
+//   public void changeRate(float newRate) throws MessagingException
+//   {
+//      checkClosed();
+//
+//      remotingConnection.send(id, new ConsumerFlowTokenMessage(newRate), true);
+//   }
 
-      remotingConnection.send(id, new ConsumerChangeRateMessage(newRate), true);
-   }
-
    public void handleMessage(final DeliverMessage message) throws Exception
    {
       synchronized (mainLock)
@@ -324,13 +316,34 @@
 
          buffer.addLast(message, coreMessage.getPriority());
 
-         if (trace)
+         if (receiverThread != null)
          {
-            log.trace(this + " added message(s) to the buffer are now "
-                  + buffer.size() + " messages");
+            mainLock.notify();
          }
+         else if (handler != null)
+         {
+            if (direct)
+            {
+               //Dispatch it directly on remoting thread
+               
+               boolean expired = message.getMessage().isExpired();
 
-         messageAdded();
+               session.delivered(message.getDeliveryID(), expired);
+               
+               flowControl();
+
+               if (!expired)
+               {
+                  handler.onMessage(message.getMessage());
+               }
+            }
+            else if (!listenerRunning)
+            {
+               listenerRunning = true;
+
+               queueRunner();
+            }
+         }
       }
    }
 
@@ -356,118 +369,107 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   private void checkSendChangeRate() throws MessagingException
+   private void flowControl() throws MessagingException
    {
-      consumeCount++;
-
-      if (consumeCount == bufferSize)
+      if (tokenBatchSize > 0)
       {
-         consumeCount = 0;
-
-         changeRate(1.0f);
+         tokensToSend++;
+   
+         if (tokensToSend == tokenBatchSize)
+         {
+            tokensToSend = 0;
+            
+            remotingConnection.send(id, new ConsumerFlowTokenMessage(tokenBatchSize), true);                  
+         }
       }
    }
-
+   
    private void waitForOnMessageToComplete()
    {
       // Wait for any onMessage() executions to complete
 
-      if (Thread.currentThread().equals(sessionExecutor.getThread()))
+      if (Thread.currentThread() == onMessageThread)
       {
-         // the current thread already closing this ClientConsumer (this happens
-         // when the
-         // session is closed from within the MessageListener.onMessage(), for
-         // example), so no need
-         // to register another Closer (see
-         // http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+         // If called from inside onMessage then return immediately - otherwise would block forever
          return;
       }
 
       Future result = new Future();
 
-      try
-      {
-         sessionExecutor.execute(new Closer(result));
+      sessionExecutor.execute(new Closer(result));
 
-         result.getResult();
-      }
-      catch (InterruptedException e)
-      {
-      }
+      result.getResult();
    }
 
-   private void queueRunner(ListenerRunner runner)
+   private void queueRunner()
    {
-      try
-      {
-         this.sessionExecutor.execute(runner);
-      }
-      catch (InterruptedException e)
-      {
-      }
+      sessionExecutor.execute(new ListenerRunner());
    }
 
-   private void messageAdded()
+   private void checkClosed() throws MessagingException
    {
-      boolean notified = false;
-
-      if (trace)
+      if (closed)
       {
-         log.trace("Receiver thread:" + receiverThread + " handler:" + handler
-               + " listenerRunning:" + listenerRunning + " sessionExecutor:"
-               + sessionExecutor);
+         throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
       }
-
-      // If we have a thread waiting on receive() we notify it
-      if (receiverThread != null)
+   }
+   
+   private void onMessageLoop()
+   {
+      try
       {
-         if (trace)
-         {
-            log.trace(this + " notifying receiver/waiter thread");
-         }
+         onMessageThread = Thread.currentThread();
 
-         mainLock.notifyAll();
+         DeliverMessage msg = null;
 
-         notified = true;
-      }
-      else if (handler != null)
-      {
-         // We have a message handler
-         if (!listenerRunning)
+         MessageHandler theListener = null;
+                  
+         synchronized (mainLock)
          {
-            listenerRunning = true;
-
-            if (trace)
+            if (handler == null || buffer.isEmpty())
             {
-               log.trace(this + " scheduled a new ListenerRunner");
+               listenerRunning = false;
+
+               return;
             }
 
-            this.queueRunner(new ListenerRunner());
+            theListener = handler;
+
+            msg = buffer.removeFirst();              
          }
 
-         // TODO - Execute onMessage on same thread for even better throughput
-      }
-
-      // Make sure we notify any thread waiting for last delivery
-      if (!notified)
-      {
-         if (trace)
+         if (msg != null)
          {
-            log.trace("Notifying");
+            boolean expired = msg.getMessage().isExpired();
+
+            session.delivered(msg.getDeliveryID(), expired);
+            
+            flowControl();
+
+            if (!expired)
+            {
+               theListener.onMessage(msg.getMessage());
+            }
          }
 
-         mainLock.notifyAll();
+         synchronized (mainLock)
+         {
+            if (!buffer.isEmpty())
+            {
+               queueRunner();
+            }
+            else
+            {
+               listenerRunning = false;
+            }
+         }
       }
+      catch (MessagingException e)
+      {
+         log.error("Failure in ListenerRunner", e);
+      }
    }
-
-  
-
-   private void checkClosed() throws MessagingException
-   {
-      if (closed) { throw new MessagingException(
-            MessagingException.OBJECT_CLOSED, "Consumer is closed"); }
-   }
-
+   
    // Inner classes
    // --------------------------------------------------------------------------------
 
@@ -489,107 +491,12 @@
          result.setResult(null);
       }
    }
-
-   /*
-    * This class handles the execution of onMessage methods
-    */
+     
    private class ListenerRunner implements Runnable
    {
       public void run()
       {
-         try
-         {
-            DeliverMessage msg = null;
-
-            MessageHandler theListener = null;
-
-            synchronized (mainLock)
-            {
-               if (handler == null || buffer.isEmpty())
-               {
-                  listenerRunning = false;
-
-                  if (trace)
-                  {
-                     log.trace("no handler or buffer is empty, returning");
-                  }
-
-                  return;
-               }
-
-               theListener = handler;
-
-               // remove a message from the buffer
-
-               msg = buffer.removeFirst();
-
-               checkSendChangeRate();
-            }
-
-            /*
-             * Bug here is as follows: The next runner gets scheduled BEFORE the
-             * on message is executed so if the onmessage fails on acking it
-             * will be put on hold and failover will kick in, this will clear
-             * the executor so the next queud one disappears at everything
-             * grinds to a halt
-             * 
-             * Solution - don't use a session executor - have a session thread
-             * instead much nicer
-             */
-
-            if (msg != null)
-            {
-               boolean expired = msg.getMessage().isExpired();
-
-               session.delivered(msg.getDeliveryID(), expired);
-
-               if (!expired)
-               {
-                  theListener.onMessage(msg.getMessage());
-               }
-            }
-
-            synchronized (mainLock)
-            {
-               if (!buffer.isEmpty())
-               {
-                  // Queue up the next runner to run
-
-                  if (trace)
-                  {
-                     log
-                           .trace("More messages in buffer so queueing next onMessage to run");
-                  }
-
-                  queueRunner(this);
-
-                  if (trace)
-                  {
-                     log.trace("Queued next onMessage to run");
-                  }
-               }
-               else
-               {
-                  if (trace)
-                  {
-                     log
-                           .trace("no more messages in buffer, marking handler as not running");
-                  }
-
-                  listenerRunning = false;
-               }
-            }
-
-            if (trace)
-            {
-               log.trace("Exiting run()");
-            }
-         }
-         catch (MessagingException e)
-         {
-            log.error("Failure in ListenerRunner", e);
-         }
+         onMessageLoop();
       }
    }
-
 }

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerInternal.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerInternal.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,8 +21,6 @@
 {   
    String getID();
    
-   void changeRate(float newRate) throws MessagingException;
-
    void handleMessage(DeliverMessage message) throws Exception;
    
    void recover(long lastDeliveryID) throws MessagingException;

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -26,6 +26,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
@@ -39,9 +41,14 @@
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCommitMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
@@ -50,10 +57,6 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCommitMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
@@ -73,12 +76,9 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXASuspendMessage;
-import org.jboss.messaging.util.ClearableQueuedExecutor;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -117,8 +117,7 @@
    
    private boolean deliveryExpired;   
 
-   // Executor used for executing onMessage methods
-   private ClearableQueuedExecutor executor;
+   private ExecutorService executor;
 
    private MessagingRemotingConnection remotingConnection;
          
@@ -144,7 +143,7 @@
       
       this.remotingConnection = connection.getRemotingConnection();
       
-      executor = new ClearableQueuedExecutor(new LinkedQueue());
+      executor = Executors.newSingleThreadExecutor();
       
       this.lazyAckBatchSize = lazyAckBatchSize;   
    }
@@ -209,7 +208,7 @@
    }
    
    public ClientConsumer createConsumer(String queueName, String filterString, boolean noLocal,
-                                        boolean autoDeleteQueue) throws MessagingException
+                                        boolean autoDeleteQueue, boolean direct) throws MessagingException
    {
       checkClosed();
     
@@ -218,17 +217,31 @@
       
       SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.send(id, request);
       
+      int prefetchSize = response.getPrefetchSize();
+            
       ClientConsumerInternal consumer =
-         new ClientConsumerImpl(this, response.getConsumerID(), response.getBufferSize(),             
-                                executor, remotingConnection, queueName);
+         new ClientConsumerImpl(this, response.getConsumerID(),             
+                                executor, remotingConnection, direct, response.getPrefetchSize());
 
       consumers.put(response.getConsumerID(), consumer);
 
       PacketDispatcher.client.register(new ClientConsumerPacketHandler(consumer, response.getConsumerID()));
 
-      //Now we have finished creating the client consumer, we can tell the SCD
-      //we are ready
-      consumer.changeRate(1);
+      if (prefetchSize > 0) // 0 ==> flow control is disabled
+      {
+         //Now give the server consumer some initial tokens (1.5 * prefetchSize)
+         
+         int initialTokens = prefetchSize + prefetchSize >>> 1;
+         
+         remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(initialTokens), true);
+      }
+      else
+      {
+         //FIXME
+         //FIXME - for now we need to send a flow control token to ensure the return packet sender gets set
+         //FIXME
+         remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(1), true);
+      }
       
       return consumer;
    }
@@ -676,6 +689,7 @@
       }
       
       SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(lastID, !broken);
+      
       remotingConnection.send(id, message, !block);
       
       acked = true;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -70,7 +70,6 @@
    // Attributes -----------------------------------------------------------------------------------
 
    private String id;
-   private boolean closed;
    private ServerSessionEndpoint session;
    private Queue destination;
    private Filter filter;
@@ -102,40 +101,23 @@
 
    public void reset() throws Exception
    {
-      if (closed)
-      {
-         throw new IllegalStateException("Browser is closed");
-      }
-
-      log.trace(this + " is being resetted");
-
       iterator = createIterator();
    }
 
    public boolean hasNextMessage() throws Exception
    {
-      if (closed)
-      {
-         throw new IllegalStateException("Browser is closed");
-      }
-
       if (iterator == null)
       {
          iterator = createIterator();
       }
 
       boolean has = iterator.hasNext();
-      if (trace) { log.trace(this + (has ? " has": " DOESN'T have") + " a next message"); }
+
       return has;
    }
    
    public Message nextMessage() throws Exception
    {
-      if (closed)
-      {
-         throw new IllegalStateException("Browser is closed");
-      }
-
       if (iterator == null)
       {
          iterator = createIterator();
@@ -143,20 +125,11 @@
 
       Message r = (Message)iterator.next();
 
-      if (trace) { log.trace(this + " returning " + r); }
-      
       return r;
    }
 
    public Message[] nextMessageBlock(int maxMessages) throws Exception
    {
-      if (trace) { log.trace(this + " returning next message block of " + maxMessages); }
-
-      if (closed)
-      {
-         throw new IllegalStateException("Browser is closed");
-      }
-      
       if (maxMessages < 2)
       {
          throw new IllegalArgumentException("maxMessages must be >=2 otherwise use nextMessage");
@@ -184,8 +157,12 @@
    
    public void close() throws Exception
    {
-      localClose();
+      iterator = null;
+      
+      session.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
+
       session.removeBrowser(id);
+      
       log.trace(this + " closed");
    }
            
@@ -198,20 +175,6 @@
 
    // Package protected ----------------------------------------------------------------------------
    
-   void localClose() throws Exception
-   {
-      if (closed)
-      {
-         throw new IllegalStateException("Browser is already closed");
-      }
-      
-      iterator = null;
-      
-      session.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
-      
-      closed = true;
-   }
-
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -22,9 +22,9 @@
 package org.jboss.jms.server.endpoint;
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_CREATESESSION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_START;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_STOP;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_CREATESESSION;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -75,8 +75,6 @@
 
    private String id;
 
-   private volatile boolean closed;
-   
    private volatile boolean started;
 
    private String username;
@@ -147,8 +145,6 @@
          sessions.put(sessionID, ep);
       }
 
-      messagingServer.addSession(sessionID, ep);
-
       messagingServer.getRemotingService().getDispatcher().register(ep.newHandler());
       
       return new ConnectionCreateSessionResponseMessage(sessionID);
@@ -156,38 +152,21 @@
    
    public void start() throws Exception
    {
-      if (closed)
-      {
-         throw new IllegalStateException("Connection is closed");
-      }
-      
       setStarted(true);
    }
 
    public synchronized void stop() throws Exception
    {
-      if (closed)
-      {
-         throw new IllegalStateException("Connection is closed");
-      }
-
       setStarted(false);
    }
 
    public void close() throws Exception
    {
-      if (closed)
-      {
-         log.warn("Connection is already closed");
-         return;
-      }
-
-      //We clone to avoid deadlock http://jira.jboss.org/jira/browse/JBMESSAGING-836
       Map<String, ServerSessionEndpoint> sessionsClone = new HashMap<String, ServerSessionEndpoint>(sessions);
       
-      for(ServerSessionEndpoint session: sessionsClone.values())
+      for (ServerSessionEndpoint session: sessionsClone.values())
       {
-         session.localClose();
+         session.close();
       }
 
       sessions.clear();
@@ -213,8 +192,6 @@
       cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
 
       messagingServer.getRemotingService().getDispatcher().unregister(id);
-
-      closed = true;
    }
 
    // Public ---------------------------------------------------------------------------------------
@@ -295,7 +272,6 @@
    
    private void setStarted(boolean started) throws Exception
    {
-      //We clone to avoid deadlock http://jira.jboss.org/jira/browse/JBMESSAGING-836
       Map<String, ServerSessionEndpoint> sessionsClone = null;
       
       sessionsClone = new HashMap<String, ServerSessionEndpoint>(sessions);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,9 +21,11 @@
  */
 package org.jboss.jms.server.endpoint;
 
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_CHANGERATE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.jboss.messaging.core.Consumer;
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.HandleStatus;
@@ -34,7 +36,7 @@
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.Packet;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -66,44 +68,33 @@
 
    private boolean trace = log.isTraceEnabled();
 
-   private String id;
+   private final String id;
 
-   private Queue messageQueue;
+   private final Queue messageQueue;
 
-   private ServerSessionEndpoint sessionEndpoint;
+   private final ServerSessionEndpoint sessionEndpoint;
 
-   private boolean noLocal;
+   private final boolean noLocal;
 
-   private Filter filter;
+   private final Filter filter;
 
    private boolean started;
 
    // This lock protects starting and stopping
-   private Object startStopLock;
+   private final Object startStopLock;
 
-   // Must be volatile
-   private volatile boolean clientAccepting;
-
-   private int prefetchSize;
+   private final AtomicInteger availableTokens = new AtomicInteger(0);
    
-   private volatile int sendCount;
+   private final boolean autoDeleteQueue;
    
-   private boolean firstTime = true;
-   
-   private boolean autoDeleteQueue;
+   private final boolean enableFlowControl;
 
    // Constructors ---------------------------------------------------------------------------------
 
    ServerConsumerEndpoint(MessagingServer sp, String id, Queue messageQueue,                          
 					           ServerSessionEndpoint sessionEndpoint, Filter filter,
-					           boolean noLocal, 
-					           int prefetchSize, boolean autoDeleteQueue)
+					           boolean noLocal, boolean autoDeleteQueue, boolean enableFlowControl)
    {
-      if (trace)
-      {
-         log.trace("constructing consumer endpoint " + id);
-      }
-
       this.id = id;
 
       this.messageQueue = messageQueue;
@@ -112,39 +103,30 @@
 
       this.noLocal = noLocal;
 
-      // Always start as false - wait for consumer to initiate.
-      this.clientAccepting = false;
-      
       this.startStopLock = new Object();
 
-      this.prefetchSize = prefetchSize;
-      
       this.filter = filter;
                 
       this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
       
       this.autoDeleteQueue = autoDeleteQueue;
       
+      log.info("Enable flow control is " + enableFlowControl);
+      
+      this.enableFlowControl = enableFlowControl;
+      
       // adding the consumer to the queue
       messageQueue.addConsumer(this);
       
       messageQueue.deliver();
-
-      log.trace(this + " constructed");
    }
 
    // Receiver implementation ----------------------------------------------------------------------
 
    public HandleStatus handle(MessageReference ref) throws Exception
    {
-      if (trace)
+      if (enableFlowControl && availableTokens.get() == 0)
       {
-         log.trace(this + " receives " + ref + " for delivery");
-      }
-      
-      // This is ok to have outside lock - is volatile
-      if (!clientAccepting)
-      {
          if (trace) { log.trace(this + " is NOT accepting messages!"); }
 
          return HandleStatus.BUSY;
@@ -163,16 +145,12 @@
          // queue for delivery later.
          if (!started)
          {
-            if (trace) { log.trace(this + " NOT started"); }
-
             return HandleStatus.BUSY;
          }
          
-         if (trace) { log.trace(this + " has startStopLock lock, preparing the message for delivery"); }
-
          Message message = ref.getMessage();
          
-         if (!accept(message))
+         if (filter != null && !filter.match(message))
          {
             return HandleStatus.NO_MATCH;
          }
@@ -181,12 +159,8 @@
          {
             String conId = message.getConnectionID();
 
-            if (trace) { log.trace("message connection id: " + conId + " current connection connection id: " + sessionEndpoint.getConnectionEndpoint().getConnectionID()); }
-
             if (sessionEndpoint.getConnectionEndpoint().getConnectionID().equals(conId))
             {
-            	if (trace) { log.trace("Message from local connection so rejecting"); }
-            	
             	PersistenceManager pm = sessionEndpoint.getConnectionEndpoint().getMessagingServer().getPersistenceManager();
             	            	            	
             	ref.acknowledge(pm);
@@ -194,23 +168,11 @@
              	return HandleStatus.HANDLED;
             }            
          }
-                  
-         sendCount++;
-         
-         int num = prefetchSize;
-         
-         if (firstTime)
+                         
+         if (enableFlowControl)
          {
-            //We make sure we have a little extra buffer on the client side
-            num = num + num / 3 ;
+            availableTokens.decrementAndGet();
          }
-         
-         if (sendCount == num)
-         {
-            clientAccepting = false;
-            
-            firstTime = false;
-         }          
                    
          try
          {
@@ -220,31 +182,13 @@
          {
          	log.error("Failed to handle delivery", e);
          	
-         	this.started = false; // DO NOT return null or the message might get delivered more than once
+         	started = false; // DO NOT return null or the message might get delivered more than once
          }
                           
          return HandleStatus.HANDLED;
       }
    }
    
-   // Filter implementation ------------------------------------------------------------------------
-
-   public boolean accept(Message msg)
-   {
-      if (filter != null)
-      {
-         boolean accept = filter.match(msg);
-
-         if (trace) { log.trace("message filter " + (accept ? "accepts " : "DOES NOT accept ") + "the message"); }
-         
-         return accept;
-      }
-      else
-      {
-         return true;
-      }
-   }
-
    // Closeable implementation ---------------------------------------------------------------------
 
    public void close() throws Exception
@@ -254,38 +198,37 @@
          log.trace(this + " close");
       }
       
-      stop();
+      setStarted(false);
 
-      localClose();
-
-      sessionEndpoint.removeConsumer(id);
-           
+      messageQueue.removeConsumer(this);
+      
+      sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);     
+      
+      if (autoDeleteQueue)
+      {
+         if (messageQueue.getConsumerCount() == 0)
+         {
+            MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
+            
+            server.getPostOffice().removeBinding(messageQueue.getName());
+            
+            if (messageQueue.isDurable())
+            {
+               server.getPersistenceManager().deleteAllReferences(messageQueue);
+            }
+         }
+      }
+      
+      sessionEndpoint.removeConsumer(id);           
    }
 
    // ConsumerEndpoint implementation --------------------------------------------------------------
 
-   public void changeRate(float newRate) throws Exception
+   public void receiveTokens(int tokens) throws Exception
    {
-      if (trace)
-      {
-         log.trace(this + " changing rate to " + newRate);
-      }
+      availableTokens.addAndGet(tokens);
 
-      if (newRate > 0)
-      {
-         sendCount = 0;
-         
-         clientAccepting = true;
-      }
-      else
-      {
-         clientAccepting = false;
-      }
-
-      if (clientAccepting)
-      {
-         promptDelivery();
-      }
+      promptDelivery();      
    }
 
    // Public ---------------------------------------------------------------------------------------
@@ -309,62 +252,45 @@
 
    void setStarted(boolean started)
    {
-      //No need to lock since caller already has the lock
-      this.started = started;      
-   }
-    
-   void localClose() throws Exception
-   {
-      if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
-
-      messageQueue.removeConsumer(this);
+      boolean useStarted;
       
-      sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);     
-      
-      if (autoDeleteQueue)
-      {
-         if (messageQueue.getConsumerCount() == 0)
-         {
-            MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
-            
-            server.getPostOffice().removeBinding(messageQueue.getName());
-            
-            if (messageQueue.isDurable())
-            {
-               server.getPersistenceManager().deleteAllReferences(messageQueue);
-            }
-         }
-      }
-   }
-
-   void start()
-   {
       synchronized (startStopLock)
       {
-         if (started)
-         {
-            return;
-         }
-
-         started = true;
+         this.started = started;   
+         
+         useStarted = started;         
       }
-
-      // Prompt delivery
-      promptDelivery();
-   }
-
-   void stop() throws Exception
-   {
-      synchronized (startStopLock)
+      
+      //Outside the lock
+      if (useStarted)
       {
-         if (!started)
-         {
-            return;
-         }
-
-         started = false;         
+         promptDelivery();
       }
    }
+    
+//   void localClose() throws Exception
+//   {
+//      if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
+//
+//      messageQueue.removeConsumer(this);
+//      
+//      sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);     
+//      
+//      if (autoDeleteQueue)
+//      {
+//         if (messageQueue.getConsumerCount() == 0)
+//         {
+//            MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
+//            
+//            server.getPostOffice().removeBinding(messageQueue.getName());
+//            
+//            if (messageQueue.isDurable())
+//            {
+//               server.getPersistenceManager().deleteAllReferences(messageQueue);
+//            }
+//         }
+//      }
+//   }
 
    // Protected ------------------------------------------------------------------------------------
 
@@ -400,13 +326,13 @@
 
          PacketType type = packet.getType();
          
-         if (type == CONS_CHANGERATE)
+         if (type == CONS_FLOWTOKEN)
          {
             setReplier(sender);
 
-            ConsumerChangeRateMessage message = (ConsumerChangeRateMessage) packet;
+            ConsumerFlowTokenMessage message = (ConsumerFlowTokenMessage) packet;
             
-            changeRate(message.getRate());
+            receiveTokens(message.getTokens());
          }
          else if (type == CLOSE)
          {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,30 +21,30 @@
  */
 package org.jboss.jms.server.endpoint;
 
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CANCEL;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_PREPARE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_RESUME;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_ROLLBACK;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SET_TIMEOUT;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SUSPEND;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_PREPARE;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -53,6 +53,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
@@ -75,22 +78,22 @@
 import org.jboss.messaging.core.impl.filter.FilterImpl;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.Packet;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
@@ -109,9 +112,6 @@
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * Session implementation
  * 
@@ -131,8 +131,7 @@
    // Constants
    // ------------------------------------------------------------------------------------
 
-   private static final Logger log = Logger
-         .getLogger(ServerSessionEndpoint.class);
+   private static final Logger log = Logger.getLogger(ServerSessionEndpoint.class);
 
    // Static
    // ---------------------------------------------------------------------------------------
@@ -146,15 +145,13 @@
 
    private String id;
 
-   private volatile boolean closed;
-
    private ServerConnectionEndpoint connectionEndpoint;
 
    private MessagingServer sp;
 
-   private Map<String, ServerConsumerEndpoint> consumers = new HashMap<String, ServerConsumerEndpoint>();
+   private Map<String, ServerConsumerEndpoint> consumers = new ConcurrentHashMap<String, ServerConsumerEndpoint>();
 
-   private Map<String, ServerBrowserEndpoint> browsers = new HashMap<String, ServerBrowserEndpoint>();
+   private Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
 
    private PostOffice postOffice;
 
@@ -162,8 +159,7 @@
 
    private long deliveryIDSequence = 0;
 
-   // Temporary until we have our own NIO transport
-   QueuedExecutor executor = new QueuedExecutor(new LinkedQueue());
+   ExecutorService executor = Executors.newSingleThreadExecutor();
 
    private Transaction tx;
 
@@ -173,8 +169,6 @@
 
    private ResourceManager resourceManager;
 
-   private boolean strict;
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -201,8 +195,6 @@
       this.autoCommitAcks = autoCommitAcks;
 
       this.resourceManager = resourceManager;
-
-      strict = sp.getConfiguration().isStrictTck();
    }
 
    // Public
@@ -225,27 +217,15 @@
    {
       Queue expiryQueue = ref.getQueue().getExpiryQueue();
 
-      if (trace)
-      {
-         log.trace(this + " detected expired message " + ref);
-      }
-
       if (expiryQueue != null)
       {
-         if (trace)
-         {
-            log.trace(this + " sending expired message to expiry queue "
-                  + expiryQueue);
-         }
-
          Message copy = makeCopyForDLQOrExpiry(true, ref);
 
          moveInTransaction(copy, ref, expiryQueue, true);
       }
       else
       {
-         log.warn("No expiry queue has been configured so removing expired "
-               + ref);
+         log.warn("No expiry queue has been configured so removing expired " + ref);
 
          // TODO - tidy up these references - ugly
          ref.acknowledge(this.getConnectionEndpoint().getMessagingServer()
@@ -257,82 +237,54 @@
 
    void removeBrowser(String browserId) throws Exception
    {
-      synchronized (browsers)
+      if (browsers.remove(browserId) == null)
       {
-         if (browsers.remove(browserId) == null) { throw new IllegalStateException(
-               "Cannot find browser with id " + browserId + " to remove"); }
-      }
+         throw new IllegalStateException("Cannot find browser with id " + browserId + " to remove");
+      }      
    }
 
    void removeConsumer(String consumerId) throws Exception
    {
-      synchronized (consumers)
+      if (consumers.remove(consumerId) == null)
       {
-         if (consumers.remove(consumerId) == null) { throw new IllegalStateException(
-               "Cannot find consumer with id " + consumerId + " to remove"); }
-      }
+         throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+      }      
    }
 
-   void localClose() throws Exception
-   {
-      if (closed) { throw new IllegalStateException("Session is already closed"); }
+//   void localClose() throws Exception
+//   {
+//      Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+//      
+//      for (ServerConsumerEndpoint consumer: consumersClone.values())
+//      {
+//         consumer.close();
+//      }
+//
+//      consumers.clear();
+//
+//      Map<String, ServerBrowserEndpoint> browsersClone = new HashMap<String, ServerBrowserEndpoint>(browsers);
+//      
+//      for (ServerBrowserEndpoint browser: browsersClone.values())
+//      {
+//         browser.close();
+//      }
+//
+//      consumers.clear();
+//
+//      browsers.clear();
+//
+//      rollback();
+//
+//      executor.shutdown();
+//
+//      deliveries.clear();
+//
+//      sp.removeSession(id);
+//
+//      closed = true;
+//   }
 
-      if (trace) log.trace(this + " close()");
-
-      // We clone to avoid deadlock
-      // http://jira.jboss.org/jira/browse/JBMESSAGING-836
-      Map consumersClone;
-      synchronized (consumers)
-      {
-         consumersClone = new HashMap(consumers);
-      }
-
-      for (Iterator i = consumersClone.values().iterator(); i.hasNext();)
-      {
-         ((ServerConsumerEndpoint) i.next()).localClose();
-      }
-
-      consumers.clear();
-
-      // We clone to avoid deadlock
-      // http://jira.jboss.org/jira/browse/JBMESSAGING-836
-      Map browsersClone;
-      synchronized (browsers)
-      {
-         browsersClone = new HashMap(browsers);
-      }
-
-      for (Iterator i = browsersClone.values().iterator(); i.hasNext();)
-      {
-         ((ServerBrowserEndpoint) i.next()).localClose();
-      }
-
-      browsers.clear();
-
-      rollback();
-
-      // Close down the executor
-
-      // Note we need to wait for ALL tasks to complete NOT just one otherwise
-      // we can end up with the following situation
-      // prompter is queued and starts to execute
-      // prompter almost finishes executing then a message is cancelled due to
-      // this session closing
-      // this causes another prompter to be queued
-      // shutdownAfterProcessingCurrentTask is then called
-      // this means the second prompter never runs and the cancelled message
-      // doesn't get redelivered
-      executor.shutdownAfterProcessingCurrentlyQueuedTasks();
-
-      deliveries.clear();
-
-      sp.removeSession(id);
-
-      closed = true;
-   }
-
-   synchronized void handleDelivery(MessageReference ref,
-         ServerConsumerEndpoint consumer, PacketSender sender) throws Exception
+   synchronized void handleDelivery(MessageReference ref, ServerConsumerEndpoint consumer, PacketSender sender) throws Exception
    {
       // FIXME - we shouldn't have to pass in the packet Sender - this should be
       // creatable
@@ -345,62 +297,56 @@
       delivery.deliver();
    }
 
-   /**
-    * Starts this session's Consumers
-    */
    void setStarted(boolean s) throws Exception
    {
-      // We clone to prevent deadlock
-      // http://jira.jboss.org/jira/browse/JBMESSAGING-836
-      Map consumersClone;
-      synchronized (consumers)
+      Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+      
+      for (ServerConsumerEndpoint consumer: consumersClone.values())
       {
-         consumersClone = new HashMap(consumers);
+         consumer.setStarted(s);
       }
+   }
 
-      for (Iterator i = consumersClone.values().iterator(); i.hasNext();)
+   void promptDelivery(final Queue queue)
+   {
+      // TODO - do we really need to prompt on a different thread?
+      executor.execute(new Runnable()
       {
-         ServerConsumerEndpoint sce = (ServerConsumerEndpoint) i.next();
-         if (s)
+         public void run()
          {
-            sce.start();
+            queue.deliver();
          }
-         else
-         {
-            sce.stop();
-         }
-      }
+      });
    }
 
-   void promptDelivery(final Queue queue)
+   public void close() throws Exception
    {
-      if (trace)
+      Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+      
+      for (ServerConsumerEndpoint consumer: consumersClone.values())
       {
-         log.trace("Prompting delivery on " + queue);
+         consumer.close();
       }
 
-      try
-      {
-         // TODO - do we really need to prompt on a different thread?
-         this.executor.execute(new Runnable()
-         {
-            public void run()
-            {
-               queue.deliver();
-            }
-         });
+      consumers.clear();
 
-      }
-      catch (Throwable t)
+      Map<String, ServerBrowserEndpoint> browsersClone = new HashMap<String, ServerBrowserEndpoint>(browsers);
+      
+      for (ServerBrowserEndpoint browser: browsersClone.values())
       {
-         log.error("Failed to prompt delivery", t);
+         browser.close();
       }
-   }
 
-   private void close() throws Exception
-   {
-      localClose();
+      consumers.clear();
 
+      browsers.clear();
+
+      rollback();
+
+      executor.shutdown();
+
+      deliveries.clear();
+
       connectionEndpoint.removeSession(id);
 
       connectionEndpoint.getMessagingServer().getRemotingService()
@@ -1119,7 +1065,7 @@
       }
 
       ServerConsumerEndpoint ep = new ServerConsumerEndpoint(sp, consumerID,
-            binding.getQueue(), this, filter, noLocal, prefetchSize, autoDeleteQueue);
+            binding.getQueue(), this, filter, noLocal, autoDeleteQueue, prefetchSize > 0);
 
       connectionEndpoint.getMessagingServer().getRemotingService()
             .getDispatcher().register(ep.newHandler());
@@ -1195,8 +1141,6 @@
    private SessionCreateBrowserResponseMessage createBrowser(String queueName, String selector)
          throws Exception
    {
-      if (closed) { throw new IllegalStateException("Session is closed"); }
-
       Binding binding = postOffice.getBinding(queueName);
 
       if (binding == null) { throw new MessagingException(

Modified: trunk/src/main/org/jboss/messaging/core/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServer.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServer.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -65,15 +65,7 @@
    void setRemotingService(RemotingService remotingService);
    
    RemotingService getRemotingService();
-
-   ServerSessionEndpoint getSession(String sessionID);
-
-   Collection getSessions();
-
-   void addSession(String id, ServerSessionEndpoint session);
-
-   void removeSession(String id);
-
+  
    SecurityStore getSecurityManager();
 
    ConnectionManager getConnectionManager();

Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -305,13 +305,7 @@
          return false;
       }
       
-      long overtime = System.currentTimeMillis() - expiration;
-      
-      if (overtime >= 0)
-      {
-         return true;
-      }
-      return false;
+      return System.currentTimeMillis() - expiration >= 0;
    }
    
    public MessageReference createReference(Queue queue)

Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -23,11 +23,8 @@
 
 import java.beans.PropertyChangeEvent;
 import java.beans.PropertyChangeListener;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.jboss.jms.destination.JBossDestination;
@@ -38,7 +35,6 @@
 import org.jboss.jms.server.SecurityStore;
 import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
 import org.jboss.jms.server.endpoint.MessagingServerPacketHandler;
-import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
 import org.jboss.jms.server.plugin.NullUserManager;
 import org.jboss.jms.server.plugin.contract.JMSUserManager;
 import org.jboss.jms.server.security.NullAuthenticationManager;
@@ -98,12 +94,8 @@
 
    private Version version;
 
-   private boolean started;
+   private volatile boolean started;
 
-   //private boolean supportsFailover = true;
-
-   private Map<String, ServerSessionEndpoint> sessions;
-
    // wired components
 
    private SecurityMetadataStore securityStore;
@@ -140,8 +132,6 @@
 
       version = Version.instance();
 
-      sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
-
       started = false;
    }
 
@@ -311,29 +301,7 @@
       return remotingService;
    }
 
-   public ServerSessionEndpoint getSession(String sessionID)
-   {
-      return (ServerSessionEndpoint) sessions.get(sessionID);
-   }
-
-   public Collection<ServerSessionEndpoint> getSessions()
-   {
-      return sessions.values();
-   }
-
-   public void addSession(String id, ServerSessionEndpoint session)
-   {
-      sessions.put(id, session);
-   }
-
-   public void removeSession(String id)
-   {
-      if (sessions.remove(id) == null)
-      {
-         throw new IllegalStateException("Cannot find session with id " + id + " to remove");
-      }
-   }
-
+  
    public void enableMessageCounters()
    {
       messageCounterManager.start();

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -1,65 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.codec;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_CHANGERATE;
-
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- */
-public class ConsumerChangeRateMessageCodec extends
-      AbstractPacketCodec<ConsumerChangeRateMessage>
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ConsumerChangeRateMessageCodec()
-   {
-      super(CONS_CHANGERATE);
-   }
-
-   // Public --------------------------------------------------------
-
-   // AbstractPacketCodec overrides ---------------------------------
-
-   @Override
-   protected void encodeBody(ConsumerChangeRateMessage message, RemotingBuffer out) throws Exception
-   {
-      out.putInt(FLOAT_LENGTH);
-      out.putFloat(message.getRate());
-   }
-
-   @Override
-   protected ConsumerChangeRateMessage decodeBody(RemotingBuffer in)
-         throws Exception
-   {
-      int bodyLength = in.getInt();
-      if (in.remaining() < bodyLength)
-      {
-         return null;
-      }
-
-      float rate = in.getFloat();
-
-      return new ConsumerChangeRateMessage(rate);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private ----------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Copied: trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerFlowTokenMessageCodec.java (from rev 3674, trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerFlowTokenMessageCodec.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerFlowTokenMessageCodec.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class ConsumerFlowTokenMessageCodec extends AbstractPacketCodec<ConsumerFlowTokenMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ConsumerFlowTokenMessageCodec()
+   {
+      super(CONS_FLOWTOKEN);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(ConsumerFlowTokenMessage message, RemotingBuffer out) throws Exception
+   {
+      out.putInt(INT_LENGTH);
+      out.putFloat(message.getTokens());
+   }
+
+   @Override
+   protected ConsumerFlowTokenMessage decodeBody(RemotingBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      return new ConsumerFlowTokenMessage(in.getInt());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private ----------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateConsumerResponseMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateConsumerResponseMessageCodec.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateConsumerResponseMessageCodec.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -38,13 +38,13 @@
          RemotingBuffer out) throws Exception
    {
       String consumerID = response.getConsumerID();
-      int bufferSize = response.getBufferSize();
+      int prefetchSize = response.getPrefetchSize();
 
       int bodyLength = sizeof(consumerID) + INT_LENGTH;
        
       out.putInt(bodyLength);
       out.putNullableString(consumerID);
-      out.putInt(bufferSize);
+      out.putInt(prefetchSize);
    }
 
    @Override
@@ -58,9 +58,9 @@
       }
 
       String consumerID = in.getNullableString();
-      int bufferSize = in.getInt();
+      int prefetchSize = in.getInt();
  
-      return new SessionCreateConsumerResponseMessage(consumerID, bufferSize);
+      return new SessionCreateConsumerResponseMessage(consumerID, prefetchSize);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -15,7 +15,7 @@
 import org.jboss.messaging.core.remoting.codec.BytesPacketCodec;
 import org.jboss.messaging.core.remoting.codec.ConnectionCreateSessionMessageCodec;
 import org.jboss.messaging.core.remoting.codec.ConnectionCreateSessionResponseMessageCodec;
-import org.jboss.messaging.core.remoting.codec.ConsumerChangeRateMessageCodec;
+import org.jboss.messaging.core.remoting.codec.ConsumerFlowTokenMessageCodec;
 import org.jboss.messaging.core.remoting.codec.CreateConnectionMessageCodec;
 import org.jboss.messaging.core.remoting.codec.CreateConnectionResponseMessageCodec;
 import org.jboss.messaging.core.remoting.codec.DeliverMessageCodec;
@@ -62,7 +62,7 @@
 import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.ConnectionStartMessage;
 import org.jboss.messaging.core.remoting.wireformat.ConnectionStopMessage;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
@@ -172,7 +172,7 @@
       addCodecForEmptyPacket(PacketType.CONN_STOP,
             ConnectionStopMessage.class);
 
-      addCodec(ConsumerChangeRateMessage.class, ConsumerChangeRateMessageCodec.class);
+      addCodec(ConsumerFlowTokenMessage.class, ConsumerFlowTokenMessageCodec.class);
 
       addCodec(DeliverMessage.class, DeliverMessageCodec.class);
 

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -1,55 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.wireformat;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_CHANGERATE;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
- * @version <tt>$Revision$</tt>
- */
-public class ConsumerChangeRateMessage extends AbstractPacket
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private final float rate;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ConsumerChangeRateMessage(float rate)
-   {
-      super(CONS_CHANGERATE);
-
-      this.rate = rate;
-   }
-
-   // Public --------------------------------------------------------
-
-   public float getRate()
-   {
-      return rate;
-   }
-
-   @Override
-   public String toString()
-   {
-      return getParentString() + ", rate=" + rate + "]";
-   }
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Copied: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java (from rev 3674, trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class ConsumerFlowTokenMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final int tokens;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ConsumerFlowTokenMessage(int tokens)
+   {
+      super(CONS_FLOWTOKEN);
+
+      this.tokens = tokens;
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getTokens()
+   {
+      return tokens;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", tokens=" + tokens + "]";
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -80,7 +80,7 @@
    SESS_XA_GET_TIMEOUT_RESP            ((byte)82),
        
    // Consumer 
-   CONS_CHANGERATE                     ((byte)90);
+   CONS_FLOWTOKEN                      ((byte)90);
    
 
    private final byte type;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -23,20 +23,20 @@
    // Attributes ----------------------------------------------------
 
    private final String consumerID;
-   private final int bufferSize;
+   private final int prefetchSize;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateConsumerResponseMessage(String consumerID, int bufferSize)
+   public SessionCreateConsumerResponseMessage(String consumerID, int prefetchSize)
    {
       super(SESS_CREATECONSUMER_RESP);
 
       Assert.assertValidID(consumerID);
 
       this.consumerID = consumerID;
-      this.bufferSize = bufferSize;
+      this.prefetchSize = prefetchSize;
    }
 
    // Public --------------------------------------------------------
@@ -46,9 +46,9 @@
       return consumerID;
    }
 
-   public int getBufferSize()
+   public int getPrefetchSize()
    {
-      return bufferSize;
+      return prefetchSize;
    }
 
    @Override
@@ -56,7 +56,7 @@
    {
       StringBuffer buf = new StringBuffer(getParentString());
       buf.append(", consumerID=" + consumerID);
-      buf.append(", bufferSize=" + bufferSize);
+      buf.append(", prefetchSize=" + prefetchSize);
       buf.append("]");
       return buf.toString();
    }

Modified: trunk/src/main/org/jboss/messaging/util/AbstractHashSet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/AbstractHashSet.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/util/AbstractHashSet.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -33,7 +33,7 @@
  */
 public abstract class AbstractHashSet<Key> extends AbstractSet<Key>
 {
-   private Map theMap;
+   private Map<Key, Object> theMap;
 
    private static Object dummy = new Object();
 
@@ -42,7 +42,7 @@
       theMap = buildInternalHashMap();
    }
 
-   protected abstract Map buildInternalHashMap();
+   protected abstract Map<Key, Object> buildInternalHashMap();
 
    public int size()
    {

Deleted: trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -1,73 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.util;
-
-import EDU.oswego.cs.dl.util.concurrent.Channel;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-/**
- * A ClearableQueuedExecutor
- * 
- * This class extends the QueuedExector with a method to clear all but the currently
- * executing task without shutting it down.
- * 
- * We need this functionality when failing over a session.
- * 
- * In that case we need to clear all tasks apart from the currently executing one.
- * 
- * We can't just shutdownAfterProcessingCurrentTask then use another instance
- * after failover since when failover resumes the current task and the next delivery
- * will be executed on different threads and smack into each other.
- * 
- * http://jira.jboss.org/jira/browse/JBMESSAGING-904
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ClearableQueuedExecutor extends QueuedExecutor
-{
-   public ClearableQueuedExecutor()
-   {
-   }
-
-   public ClearableQueuedExecutor(Channel channel)
-   {
-      super(channel);
-   }
-
-   public void clearAllExceptCurrentTask()
-   {
-      try
-      { 
-        while (queue_.poll(0) != null);
-      }
-      catch (InterruptedException ex)
-      {
-        Thread.currentThread().interrupt();
-      }
-   }
-   
-}
-

Deleted: trunk/src/main/org/jboss/messaging/util/ConcurrentReaderHashSet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ConcurrentReaderHashSet.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/util/ConcurrentReaderHashSet.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -1,52 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.util;
-
-import java.util.Map;
-
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-
-/**
- * 
- * A ConcurrentReaderHashSet.
- * 
- * Offers same concurrency as ConcurrentHashMap but for a Set
- * 
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="clebert.suconic at jboss.com">Clebert Suconic</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class ConcurrentReaderHashSet<Key> extends AbstractHashSet<Key>
-{
-   public ConcurrentReaderHashSet()
-   {
-      super();
-   }
-
-   protected Map buildInternalHashMap()
-   {
-      return new ConcurrentReaderHashMap();
-   }
-
-}

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptor.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptor.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptor.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -7,21 +7,21 @@
 
 package org.jboss.messaging.core.remoting.impl.integration;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
 import org.jboss.messaging.core.remoting.wireformat.Packet;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
 public class DummyInterceptor implements Interceptor
 {
    protected Logger log = Logger.getLogger(DummyInterceptor.class);
 
    boolean sendException = false;
    boolean changeMessage = false;
-   SynchronizedInt syncCounter = new SynchronizedInt(0);
+   AtomicInteger syncCounter = new AtomicInteger(0);
    
    public int getCounter()
    {
@@ -36,7 +36,7 @@
    public void intercept(Packet packet) throws MessagingException
    {
       log.info("DummyFilter packet = " + packet.getClass().getName());
-      syncCounter.add(1);
+      syncCounter.addAndGet(1);
       if (sendException)
       {
          throw new MessagingException(MessagingException.INTERNAL_ERROR);

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptorB.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptorB.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptorB.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -7,19 +7,19 @@
 
 package org.jboss.messaging.core.remoting.impl.integration;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.wireformat.Packet;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
 public class DummyInterceptorB implements Interceptor
 {
 
    protected Logger log = Logger.getLogger(DummyInterceptorB.class);
 
-   static SynchronizedInt syncCounter = new SynchronizedInt(0);
+   static AtomicInteger syncCounter = new AtomicInteger(0);
    
    public static int getCounter()
    {
@@ -33,7 +33,7 @@
    
    public void intercept(Packet packet) throws MessagingException
    {
-      syncCounter.add(1);
+      syncCounter.addAndGet(1);
       log.info("DummyFilter packet = " + packet);
    }
 

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -23,7 +23,7 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ADD_ADDRESS;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_RESET;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CANCEL;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_CHANGERATE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELIVER;
@@ -102,7 +102,7 @@
 import org.jboss.messaging.core.remoting.codec.BytesPacketCodec;
 import org.jboss.messaging.core.remoting.codec.CreateConnectionMessageCodec;
 import org.jboss.messaging.core.remoting.codec.CreateConnectionResponseMessageCodec;
-import org.jboss.messaging.core.remoting.codec.ConsumerChangeRateMessageCodec;
+import org.jboss.messaging.core.remoting.codec.ConsumerFlowTokenMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateBrowserMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateBrowserResponseMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerMessageCodec;
@@ -149,7 +149,7 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionBrowserResetMessage;
 import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
@@ -610,7 +610,7 @@
       AbstractPacketCodec codec = new SessionCreateConsumerResponseMessageCodec();
       SimpleRemotingBuffer buffer = encode(response, codec);
       checkHeader(buffer, response);
-      checkBody(buffer, response.getConsumerID(), response.getBufferSize());
+      checkBody(buffer, response.getConsumerID(), response.getPrefetchSize());
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -618,7 +618,7 @@
       assertTrue(decodedPacket instanceof SessionCreateConsumerResponseMessage);
       SessionCreateConsumerResponseMessage decodedResponse = (SessionCreateConsumerResponseMessage) decodedPacket;
       assertEquals(SESS_CREATECONSUMER_RESP, decodedResponse.getType());
-      assertEquals(response.getBufferSize(), decodedResponse.getBufferSize());
+      assertEquals(response.getPrefetchSize(), decodedResponse.getPrefetchSize());
    }
 
    public void testStartConnectionMessage() throws Exception
@@ -657,19 +657,19 @@
 
    public void testChangeRateMessage() throws Exception
    {
-      ConsumerChangeRateMessage message = new ConsumerChangeRateMessage(0.63f);
-      AbstractPacketCodec codec = new ConsumerChangeRateMessageCodec();
+      ConsumerFlowTokenMessage message = new ConsumerFlowTokenMessage(10);
+      AbstractPacketCodec codec = new ConsumerFlowTokenMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
       checkHeader(buffer, message);
-      checkBody(buffer, message.getRate());
+      checkBody(buffer, message.getTokens());
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
 
-      assertTrue(decodedPacket instanceof ConsumerChangeRateMessage);
-      ConsumerChangeRateMessage decodedMessage = (ConsumerChangeRateMessage) decodedPacket;
-      assertEquals(CONS_CHANGERATE, decodedMessage.getType());
-      assertEquals(message.getRate(), decodedMessage.getRate());
+      assertTrue(decodedPacket instanceof ConsumerFlowTokenMessage);
+      ConsumerFlowTokenMessage decodedMessage = (ConsumerFlowTokenMessage) decodedPacket;
+      assertEquals(CONS_FLOWTOKEN, decodedMessage.getType());
+      assertEquals(message.getTokens(), decodedMessage.getTokens());
    }
 
    public void testDeliverMessage() throws Exception

Modified: trunk/tests/src/org/jboss/test/messaging/jms/TemporaryDestinationTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/TemporaryDestinationTest.java	2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/tests/src/org/jboss/test/messaging/jms/TemporaryDestinationTest.java	2008-02-08 14:04:33 UTC (rev 3682)
@@ -68,10 +68,8 @@
    		
    		Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		
-   		log.info("** creating temp topic");
 	      TemporaryTopic tempTopic = producerSession.createTemporaryTopic();
-	      log.info("** created temp topic");
-	
+
 	      MessageProducer producer = producerSession.createProducer(tempTopic);
 	
 	      MessageConsumer consumer = consumerSession.createConsumer(tempTopic);




More information about the jboss-cvs-commits mailing list