[jboss-cvs] JBoss Messaging SVN: r5128 - in trunk: src/main/org/jboss/messaging/core/client/impl and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 17 06:14:47 EDT 2008


Author: timfox
Date: 2008-10-17 06:14:46 -0400 (Fri, 17 Oct 2008)
New Revision: 5128

Added:
   trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStartMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStopMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeliveryCompleteMessage.java
   trunk/tests/src/org/jboss/messaging/tests/integration/consumer/
Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java
Log:
Reverted andy's last browser changes


Copied: trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientBrowser.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.client;
+
+import org.jboss.messaging.core.exception.MessagingException;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface ClientBrowser
+{
+   long getID();
+   
+   void reset() throws MessagingException;
+
+   ClientMessage nextMessage() throws MessagingException;
+   
+   boolean hasNextMessage() throws MessagingException;
+      
+   void close() throws MessagingException;
+   
+   boolean isClosed();
+
+   void cleanUp();
+}

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -27,7 +27,7 @@
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  */
 public interface ClientConsumer
 {      
@@ -45,13 +45,5 @@
    
    boolean isClosed();   
    
-   boolean isDirect();
-
-   boolean awaitMessage(long timeOut) throws Exception;
-
-   void stop() throws MessagingException;
-
-   void start() throws MessagingException;
-
-   void restart() throws MessagingException;
+   boolean isDirect();   
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -22,21 +22,21 @@
 
 package org.jboss.messaging.core.client;
 
+import javax.transaction.xa.XAResource;
+
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.util.SimpleString;
 
-import javax.transaction.xa.XAResource;
-
 /*
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  * 
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  */
 public interface ClientSession extends XAResource
 {
@@ -64,12 +64,11 @@
                                  SimpleString filterString,
                                  boolean direct,
                                  int windowSize,
-                                 int maxRate,
-                                 boolean isBrowser) throws MessagingException;
+                                 int maxRate) throws MessagingException;
 
-   ClientConsumer createBrowser(SimpleString queueName, SimpleString filterString) throws MessagingException;
+   ClientBrowser createBrowser(SimpleString queueName, SimpleString filterString) throws MessagingException;
 
-   ClientConsumer createBrowser(SimpleString queueName) throws MessagingException;
+   ClientBrowser createBrowser(SimpleString queueName) throws MessagingException;
 
    ClientProducer createProducer(SimpleString address) throws MessagingException;
 

Copied: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,140 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @version <tt>$Revision: 3602 $</tt> $Id: ClientBrowserImpl.java 3602 2008-01-21 17:48:32Z timfox $
+ */
+public class ClientBrowserImpl implements ClientBrowser
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private final long id;
+
+   private final ClientSessionInternal session;
+
+   private final Channel channel;
+
+   private volatile boolean closed;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ClientBrowserImpl(final ClientSessionInternal session, final long id, final Channel channel)
+   {
+      this.id = id;
+
+      this.session = session;
+
+      this.channel = channel;
+   }
+
+   // ClientBrowser implementation -----------------------------------------------------------------
+
+   public long getID()
+   {
+      return id;
+   }
+
+   public synchronized void close() throws MessagingException
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      try
+      {
+         channel.sendBlocking(new SessionBrowserCloseMessage(id));
+      }
+      finally
+      {
+         session.removeBrowser(this);
+
+         closed = true;
+      }
+   }
+
+   public synchronized void cleanUp()
+   {
+      session.removeBrowser(this);
+
+      closed = true;
+   }
+
+   public boolean isClosed()
+   {
+      return closed;
+   }
+
+   public void reset() throws MessagingException
+   {
+      checkClosed();
+
+      channel.sendBlocking(new SessionBrowserResetMessage(id));
+   }
+
+   public boolean hasNextMessage() throws MessagingException
+   {
+      checkClosed();
+
+      SessionBrowserHasNextMessageResponseMessage response = (SessionBrowserHasNextMessageResponseMessage)channel.sendBlocking(new SessionBrowserHasNextMessageMessage(id));
+
+      return response.hasNext();
+   }
+
+   public ClientMessage nextMessage() throws MessagingException
+   {
+      checkClosed();
+
+      SessionBrowseMessage response = (SessionBrowseMessage)channel.sendBlocking(new SessionBrowserNextMessageMessage(id));
+
+      return response.getClientMessage();
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Package Private ------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   private void checkClosed() throws MessagingException
+   {
+      if (closed)
+      {
+         throw new MessagingException(MessagingException.OBJECT_CLOSED, "Browser is closed");
+      }
+   }
+
+   // Inner Classes --------------------------------------------------------------------------------
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -12,6 +12,10 @@
 
 package org.jboss.messaging.core.client.impl;
 
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -19,20 +23,14 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStartMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStopMessage;
 import org.jboss.messaging.util.Future;
 
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * @version <tt>$Revision: 3603 $</tt> $Id: ClientConsumerImpl.java 3603 2008-01-21 18:49:20Z timfox $
  */
 public class ClientConsumerImpl implements ClientConsumerInternal
@@ -65,8 +63,6 @@
 
    private final Runner runner = new Runner();
 
-   private final boolean isBrowser;
-
    private volatile Thread receiverThread;
 
    private volatile Thread onMessageThread;
@@ -77,8 +73,6 @@
 
    private volatile int creditsToSend;
 
-   private boolean messagesWaiting = true;
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -87,8 +81,7 @@
                              final int clientWindowSize,
                              final boolean direct,
                              final Executor executor,
-                             final Channel channel,
-                             final boolean isBrowser)
+                             final Channel channel)
    {
       this.id = id;
 
@@ -101,8 +94,6 @@
       this.clientWindowSize = clientWindowSize;
 
       this.direct = direct;
-
-      this.isBrowser = isBrowser;
    }
 
    // ClientConsumer implementation
@@ -209,11 +200,6 @@
    {
       checkClosed();
 
-      if (isBrowser)
-      {
-         throw new MessagingException(MessagingException.ILLEGAL_STATE,
-                                      "Cannot set MessageHandler - consumer is in browser mode");
-      }
       if (receiverThread != null)
       {
          throw new MessagingException(MessagingException.ILLEGAL_STATE,
@@ -251,15 +237,6 @@
       }
    }
 
-   public void deliveryComplete()
-   {
-      synchronized (this)
-      {
-         messagesWaiting = false;
-         notify();
-      }
-   }
-
    public boolean isClosed()
    {
       return closed;
@@ -270,82 +247,6 @@
       return direct;
    }
 
-   /**
-    * if there are messages in the buffer then we just return true. If we have received all of the messages being sent we
-    * return false. If there are no messages in the buffer and still some in transit then we wait until they have been delivered.
-    * @return
-    * @throws Exception
-    */
-   public boolean awaitMessage(long timeOut) throws Exception
-   {
-      if (!buffer.isEmpty())
-      {
-         return true;
-      }
-      else
-      {
-         // we only need to syncronize if the buffer is empty
-         synchronized (this)
-         {
-            if (!buffer.isEmpty())
-            {
-               return true;
-            }
-            if (messagesWaiting)
-            {
-               wait(timeOut);
-            }
-            return !buffer.isEmpty();
-         }
-      }
-   }
-
-   public void stop() throws MessagingException
-   {
-      if (!isBrowser)
-      {
-         throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot stop Consumer in non browser mode");
-      }
-      synchronized (this)
-      {
-         // if there are still messages in transit tell the server to stop and wait
-         if (messagesWaiting)
-         {
-            // tell the server to stop
-            channel.send(new SessionConsumerStopMessage(id));
-            do
-            {
-               try
-               {
-                  wait();
-               }
-               catch (InterruptedException e)
-               {
-                  throw new IllegalStateException(e.getMessage());
-               }
-            }
-            while (messagesWaiting);
-         }
-         buffer.clear();
-      }
-   }
-
-   public void start() throws MessagingException
-   {
-      if (!isBrowser)
-      {
-         throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot stop Consumer in non browser mode");
-      }
-      messagesWaiting = true;
-      channel.send(new SessionConsumerStartMessage(id));
-   }
-
-   public void restart() throws MessagingException
-   {
-      stop();
-      start();
-   }
-
    // ClientConsumerInternal implementation
    // --------------------------------------------------------------
 
@@ -422,7 +323,7 @@
    {
       return creditsToSend;
    }
-
+   
    // Public
    // ---------------------------------------------------------------------------------------
 
@@ -553,14 +454,9 @@
          synchronized (this)
          {
             if (receiverThread != null)
-            {
+            {              
                // Wake up any receive() thread that might be waiting
                notify();
-
-               messagesWaiting = false;
-               
-               // Wake up any receive() thread that might be waiting
-               notify();
             }
 
             handler = null;

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -30,7 +30,6 @@
  * A ClientConsumerInternal
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="andy.taylor at jboss.org">Andy Taylor</a>
  *
  */
 public interface ClientConsumerInternal extends ClientConsumer
@@ -48,6 +47,4 @@
    int getCreditsToSend();
 
    void cleanUp() throws Exception;
-
-   void deliveryComplete();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -21,6 +21,20 @@
  */
 package org.jboss.messaging.core.client.impl;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientBrowser;
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -42,6 +56,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
@@ -74,18 +89,6 @@
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
 /*
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
@@ -95,7 +98,7 @@
  * 
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * 
  * @version <tt>$Revision: 3603 $</tt> $Id: ClientSessionImpl.java 3603 2008-01-21 18:49:20Z timfox $
  * 
@@ -128,6 +131,8 @@
 
    private volatile RemotingConnection remotingConnection;
 
+   private final Map<Long, ClientBrowser> browsers = new ConcurrentHashMap<Long, ClientBrowser>();
+
    private final Map<Long, ClientProducerInternal> producers = new ConcurrentHashMap<Long, ClientProducerInternal>();
 
    private final Map<Long, ClientConsumerInternal> consumers = new ConcurrentHashMap<Long, ClientConsumerInternal>();
@@ -296,24 +301,21 @@
                             filterString,
                             direct,
                             connectionFactory.getConsumerWindowSize(),
-                            connectionFactory.getConsumerMaxRate(),
-                            false);
+                            connectionFactory.getConsumerMaxRate());
    }
 
    public ClientConsumer createConsumer(final SimpleString queueName,
                                         final SimpleString filterString,
                                         final boolean direct,
                                         final int windowSize,
-                                        final int maxRate,
-                                        final boolean isBrowser) throws MessagingException
+                                        final int maxRate) throws MessagingException
    {
       checkClosed();
       
       SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(queueName,
                                                                               filterString,
                                                                               windowSize,
-                                                                              maxRate,
-                                                                              isBrowser);
+                                                                              maxRate);
 
       SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)channel.sendBlocking(request);
 
@@ -351,8 +353,7 @@
                                                                clientWindowSize,
                                                                direct,
                                                                executor,
-                                                               channel,
-                                                               isBrowser);
+                                                               channel);
 
       addConsumer(consumer);
 
@@ -365,19 +366,24 @@
       return consumer;
    }
 
-   public ClientConsumer createBrowser(final SimpleString queueName) throws MessagingException
+   public ClientBrowser createBrowser(final SimpleString queueName) throws MessagingException
    {
       return createBrowser(queueName, null);
    }
 
-   public ClientConsumer createBrowser(final SimpleString queueName, final SimpleString filterString) throws MessagingException
+   public ClientBrowser createBrowser(final SimpleString queueName, final SimpleString filterString) throws MessagingException
    {
-      return createConsumer(queueName,
-                            filterString,
-                            false,
-                            connectionFactory.getConsumerWindowSize(),
-                            connectionFactory.getConsumerMaxRate(),
-                            true);
+      checkClosed();
+
+      SessionCreateBrowserMessage request = new SessionCreateBrowserMessage(queueName, filterString);
+
+      channel.sendBlocking(request);
+
+      ClientBrowser browser = new ClientBrowserImpl(this, idGenerator.generateID(), channel);
+
+      addBrowser(browser);
+
+      return browser;
    }
 
    public ClientProducer createProducer(final SimpleString address) throws MessagingException
@@ -629,6 +635,10 @@
       producers.put(producer.getID(), producer);
    }
 
+   public void addBrowser(final ClientBrowser browser)
+   {
+      browsers.put(browser.getID(), browser);
+   }
 
    public void removeConsumer(final ClientConsumerInternal consumer) throws MessagingException
    {
@@ -645,6 +655,11 @@
       }
    }
 
+   public void removeBrowser(final ClientBrowser browser)
+   {
+      browsers.remove(browser.getID());
+   }
+
    public Set<ClientProducerInternal> getProducers()
    {
       return new HashSet<ClientProducerInternal>(producers.values());
@@ -655,6 +670,10 @@
       return new HashSet<ClientConsumerInternal>(consumers.values());
    }
 
+   public Set<ClientBrowser> getBrowsers()
+   {
+      return new HashSet<ClientBrowser>(browsers.values());
+   }
 
    public Map<SimpleString, ClientProducerInternal> getProducerCache()
    {
@@ -671,16 +690,6 @@
       }
    }
 
-   public void deliveryComplete(long consumerID)
-   {
-      ClientConsumerInternal consumer = consumers.get(consumerID);
-
-      if (consumer != null)
-      {
-         consumer.deliveryComplete();
-      }
-   }
-
    public void receiveProducerCredits(final long producerID, final int credits) throws Exception
    {
       ClientProducerInternal producer = producers.get(producerID);
@@ -1144,6 +1153,13 @@
       {
          producer.cleanUp();
       }
+
+      Set<ClientBrowser> browsersClone = new HashSet<ClientBrowser>(browsers.values());
+
+      for (ClientBrowser browser : browsersClone)
+      {
+         browser.cleanUp();
+      }
    }
 
    private void closeChildren() throws MessagingException
@@ -1161,6 +1177,13 @@
       {
          producer.close();
       }
+
+      Set<ClientBrowser> browsersClone = new HashSet<ClientBrowser>(browsers.values());
+
+      for (ClientBrowser browser : browsersClone)
+      {
+         browser.close();
+      }
    }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -12,20 +12,20 @@
 
 package org.jboss.messaging.core.client.impl;
 
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientBrowser;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.util.SimpleString;
 
-import java.util.Map;
-import java.util.Set;
-
 /**
  * A ClientSessionInternal
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
 public interface ClientSessionInternal extends ClientSession
 {
@@ -37,14 +37,20 @@
 
    void addProducer(ClientProducerInternal producer);
 
+   void addBrowser(ClientBrowser browser);
+
    void removeConsumer(ClientConsumerInternal consumer) throws MessagingException;
 
    void removeProducer(ClientProducerInternal producer);
 
+   void removeBrowser(ClientBrowser browser);
+
    Set<ClientProducerInternal> getProducers();
 
    Set<ClientConsumerInternal> getConsumers();
 
+   Set<ClientBrowser> getBrowsers();
+
    Map<SimpleString, ClientProducerInternal> getProducerCache();
 
    //void cleanUp() throws Exception;
@@ -54,6 +60,4 @@
    void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
 
    boolean handleFailover(final RemotingConnection backupConnection);
-
-   void deliveryComplete(long consumerID);
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -22,15 +22,15 @@
 
 package org.jboss.messaging.core.client.impl;
 
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELIVERY_COMPLETE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeliveryCompleteMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 
@@ -39,7 +39,6 @@
  * A ClientSessionPacketHandler
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  *
  */
 public class ClientSessionPacketHandler implements ChannelHandler
@@ -77,14 +76,6 @@
                
                break;
             }
-            case SESS_DELIVERY_COMPLETE:
-            {
-               SessionDeliveryCompleteMessage message = (SessionDeliveryCompleteMessage) packet;
-
-               clientSession.deliveryComplete(message.getConsumerID());
-
-               break;
-            }
             case EXCEPTION:
             {
                //TODO - we can provide a means for async exceptions to get back to to client

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -15,7 +15,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.INITIAL_BUFFER_SIZE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
@@ -27,18 +26,21 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_STOP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELIVERY_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
@@ -110,18 +112,22 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStartMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStopMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeliveryCompleteMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
@@ -153,8 +159,6 @@
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * 
  * @version <tt>$Revision$</tt> $Id$
  */
 public class RemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
@@ -517,7 +521,7 @@
 
    private void doWrite(final Packet packet)
    {
-      final MessagingBuffer buffer = transportConnection.createBuffer(INITIAL_BUFFER_SIZE);
+      final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
 
       packet.encode(buffer);
 
@@ -539,7 +543,7 @@
          }
          case PONG:
          {
-            packet = new PacketImpl(PONG);
+            packet = new PacketImpl(PacketImpl.PONG);
             break;
          }
          case EXCEPTION:
@@ -607,16 +611,11 @@
             packet = new SessionCreateProducerResponseMessage();
             break;
          }
-         case SESS_CONSUMER_STOP:
+         case SESS_CREATEBROWSER:
          {
-            packet = new SessionConsumerStopMessage();
+            packet = new SessionCreateBrowserMessage();
             break;
          }
-         case SESS_CONSUMER_START:
-         {
-            packet = new SessionConsumerStartMessage();
-            break;
-         }
          case SESS_ACKNOWLEDGE:
          {
             packet = new SessionAcknowledgeMessage();
@@ -624,12 +623,12 @@
          }
          case SESS_COMMIT:
          {
-            packet = new PacketImpl(SESS_COMMIT);
+            packet = new PacketImpl(PacketImpl.SESS_COMMIT);
             break;
          }
          case SESS_ROLLBACK:
          {
-            packet = new PacketImpl(SESS_ROLLBACK);
+            packet = new PacketImpl(PacketImpl.SESS_ROLLBACK);
             break;
          }
          case SESS_QUEUEQUERY:
@@ -672,6 +671,31 @@
             packet = new SessionBindingQueryResponseMessage();
             break;
          }
+         case PacketImpl.SESS_BROWSER_MESSAGE:
+         {
+            packet = new SessionBrowseMessage();
+            break;
+         }
+         case SESS_BROWSER_RESET:
+         {
+            packet = new SessionBrowserResetMessage();
+            break;
+         }
+         case SESS_BROWSER_HASNEXTMESSAGE:
+         {
+            packet = new SessionBrowserHasNextMessageMessage();
+            break;
+         }
+         case SESS_BROWSER_HASNEXTMESSAGE_RESP:
+         {
+            packet = new SessionBrowserHasNextMessageResponseMessage();
+            break;
+         }
+         case SESS_BROWSER_NEXTMESSAGE:
+         {
+            packet = new SessionBrowserNextMessageMessage();
+            break;
+         }
          case SESS_XA_START:
          {
             packet = new SessionXAStartMessage();
@@ -709,7 +733,7 @@
          }
          case SESS_XA_SUSPEND:
          {
-            packet = new PacketImpl(SESS_XA_SUSPEND);
+            packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
             break;
          }
          case SESS_XA_RESUME:
@@ -724,7 +748,7 @@
          }
          case SESS_XA_INDOUBT_XIDS:
          {
-            packet = new PacketImpl(SESS_XA_INDOUBT_XIDS);
+            packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
             break;
          }
          case SESS_XA_INDOUBT_XIDS_RESP:
@@ -744,7 +768,7 @@
          }
          case SESS_XA_GET_TIMEOUT:
          {
-            packet = new PacketImpl(SESS_XA_GET_TIMEOUT);
+            packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
             break;
          }
          case SESS_XA_GET_TIMEOUT_RESP:
@@ -754,12 +778,12 @@
          }
          case SESS_START:
          {
-            packet = new PacketImpl(SESS_START);
+            packet = new PacketImpl(PacketImpl.SESS_START);
             break;
          }
          case SESS_STOP:
          {
-            packet = new PacketImpl(SESS_STOP);
+            packet = new PacketImpl(PacketImpl.SESS_STOP);
             break;
          }
          case SESS_FLOWTOKEN:
@@ -792,9 +816,9 @@
             packet = new SessionProducerCloseMessage();
             break;
          }
-         case SESS_DELIVERY_COMPLETE:
+         case SESS_BROWSER_CLOSE:
          {
-            packet = new SessionDeliveryCompleteMessage();
+            packet = new SessionBrowserCloseMessage();
             break;
          }
          case SESS_SCHEDULED_SEND:
@@ -1043,7 +1067,7 @@
                                             "Timed out waiting for response when sending packet " + packet.getType());
             }
 
-            if (response.getType() == EXCEPTION)
+            if (response.getType() == PacketImpl.EXCEPTION)
             {
                final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
 
@@ -1287,6 +1311,7 @@
             else if (handler != null)
             {              
                checkConfirmation(packet);
+               
                handler.handlePacket(packet);                               
             }
             else

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -20,7 +20,6 @@
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @version <tt>$Revision$</tt>
  */
 public class PacketImpl implements Packet
@@ -70,9 +69,9 @@
 
    public static final byte SESS_CREATEPRODUCER_RESP = 43;
 
-   public static final byte SESS_CONSUMER_STOP = 44;
+   public static final byte SESS_CREATEBROWSER = 44;
 
-   public static final byte SESS_CONSUMER_START = 45;
+   public static final byte SESS_CREATEBROWSER_RESP = 45;
 
    public static final byte SESS_ACKNOWLEDGE = 46;
 
@@ -96,6 +95,16 @@
 
    public static final byte SESS_BINDINGQUERY_RESP = 56;
 
+   public static final byte SESS_BROWSER_MESSAGE = 57;
+
+   public static final byte SESS_BROWSER_RESET = 58;
+
+   public static final byte SESS_BROWSER_HASNEXTMESSAGE = 59;
+
+   public static final byte SESS_BROWSER_HASNEXTMESSAGE_RESP = 60;
+
+   public static final byte SESS_BROWSER_NEXTMESSAGE = 61;
+
    public static final byte SESS_XA_START = 62;
 
    public static final byte SESS_XA_END = 63;
@@ -144,6 +153,8 @@
 
    public static final byte SESS_PRODUCER_CLOSE = 85;
 
+   public static final byte SESS_BROWSER_CLOSE = 86;
+
    public static final byte SESS_RECEIVE_MSG = 87;
 
    public static final byte SESS_MANAGEMENT_SEND = 88;
@@ -154,8 +165,6 @@
    
    public static final byte SESS_REPLICATE_DELIVERY = 91;
 
-   public static final byte SESS_DELIVERY_COMPLETE = 92;
-
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowseMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientMessageImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerMessage;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionBrowseMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(SessionReceiveMessage.class);
+
+   // Attributes ----------------------------------------------------
+
+   private ClientMessage clientMessage;
+   
+   private ServerMessage serverMessage;
+
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   
+   public SessionBrowseMessage(final ServerMessage message)
+   {
+      super(SESS_BROWSER_MESSAGE);
+      
+      this.serverMessage = message;
+      
+      this.clientMessage = null;
+   }
+   
+   public SessionBrowseMessage()
+   {
+      super(SESS_BROWSER_MESSAGE);
+   }
+
+   // Public --------------------------------------------------------
+
+   public boolean isResponse()
+   {
+      return true;
+   }
+   
+   public ClientMessage getClientMessage()
+   {
+      return clientMessage;
+   }
+   
+   public ServerMessage getServerMessage()
+   {
+      return serverMessage;
+   }
+
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      serverMessage.encode(buffer);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      //TODO can be optimised
+      
+      clientMessage = new ClientMessageImpl();
+      
+      clientMessage.decode(buffer);
+      
+      clientMessage.getBody().flip();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionBrowserCloseMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long browserID;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionBrowserCloseMessage(final long browserID)
+   {
+      super(SESS_BROWSER_CLOSE);
+
+      this.browserID = browserID;
+   }
+   
+   public SessionBrowserCloseMessage()
+   {
+      super(SESS_BROWSER_CLOSE);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getBrowserID()
+   {
+      return browserID;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putLong(browserID);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      browserID = buffer.getLong();
+   }
+   
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", browserID=" + browserID + "]";
+   }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof SessionBrowserCloseMessage == false)
+      {
+         return false;
+      }
+            
+      SessionBrowserCloseMessage r = (SessionBrowserCloseMessage)other;
+      
+      return super.equals(other) && this.browserID == r.browserID;
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class SessionBrowserHasNextMessageMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long browserID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionBrowserHasNextMessageMessage(final long browserID)
+   {
+      super(SESS_BROWSER_HASNEXTMESSAGE);
+
+      this.browserID = browserID;
+   }
+   
+   public SessionBrowserHasNextMessageMessage()
+   {
+      super(SESS_BROWSER_HASNEXTMESSAGE);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getBrowserID()
+   {
+      return browserID;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putLong(browserID);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      browserID = buffer.getLong();       
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", browserID=" + browserID + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java (from rev 5120, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class SessionBrowserHasNextMessageResponseMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private boolean hasNext;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionBrowserHasNextMessageResponseMessage(final boolean hasNext)
+   {
+      super(SESS_BROWSER_HASNEXTMESSAGE_RESP);
+
+      this.hasNext = hasNext;
+   }
+   
+   public SessionBrowserHasNextMessageResponseMessage()
+   {
+      super(SESS_BROWSER_HASNEXTMESSAGE_RESP);
+   }
+
+   // Public --------------------------------------------------------
+
+   public boolean isResponse()
+   {
+      return true;
+   }
+   
+   public boolean hasNext()
+   {
+      return hasNext;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putBoolean(hasNext);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      hasNext = buffer.getBoolean();       
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", hasNext=" + hasNext + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class SessionBrowserNextMessageMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long browserID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionBrowserNextMessageMessage(final long browserID)
+   {
+      super(SESS_BROWSER_NEXTMESSAGE);
+
+      this.browserID = browserID;
+   }
+   
+   public SessionBrowserNextMessageMessage()
+   {
+      super(SESS_BROWSER_NEXTMESSAGE);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getBrowserID()
+   {
+      return browserID;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putLong(browserID);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      browserID = buffer.getLong();       
+   }
+   
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", browserID=" + browserID + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,89 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class SessionBrowserResetMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long browserID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionBrowserResetMessage(final long browserID)
+   {
+      super(SESS_BROWSER_RESET);
+
+      this.browserID = browserID;
+   }
+   
+   public SessionBrowserResetMessage()
+   {
+      super(SESS_BROWSER_RESET);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getBrowserID()
+   {
+      return browserID;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putLong(browserID);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      browserID = buffer.getLong();       
+   }
+   
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", browserID=" + browserID + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStartMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStartMessage.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStartMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -1,58 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SessionConsumerStartMessage extends PacketImpl
-{
-   private long consumerId;
-
-   public SessionConsumerStartMessage(long consumerId)
-   {
-      super(SESS_CONSUMER_START);
-      this.consumerId = consumerId;
-   }
-
-   public SessionConsumerStartMessage()
-   {
-      super(SESS_CONSUMER_START);
-   }
-
-   public long getConsumerId()
-   {
-      return consumerId;
-   }
-
-   public void encodeBody(MessagingBuffer buffer)
-   {
-      buffer.putLong(consumerId);
-   }
-
-   public void decodeBody(MessagingBuffer buffer)
-   {
-      consumerId = buffer.getLong();
-   }
-}
\ No newline at end of file

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStopMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStopMessage.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerStopMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -1,58 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SessionConsumerStopMessage extends PacketImpl
-{
-   private long consumerId;
-
-   public SessionConsumerStopMessage(long consumerId)
-   {
-      super(SESS_CONSUMER_STOP);
-      this.consumerId = consumerId;
-   }
-
-   public SessionConsumerStopMessage()
-   {
-      super(SESS_CONSUMER_STOP);
-   }
-
-   public long getConsumerId()
-   {
-      return consumerId;
-   }
-
-   public void encodeBody(MessagingBuffer buffer)
-   {
-      buffer.putLong(consumerId);
-   }
-
-   public void decodeBody(MessagingBuffer buffer)
-   {
-      consumerId = buffer.getLong();
-   }
-}

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class SessionCreateBrowserMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString queueName;
+   
+   private SimpleString filterString;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionCreateBrowserMessage(final SimpleString queueName, final SimpleString filterString)
+   {
+      super(SESS_CREATEBROWSER);
+
+      this.queueName = queueName;
+      this.filterString = filterString;
+   }
+   
+   public SessionCreateBrowserMessage()
+   {
+      super(SESS_CREATEBROWSER);
+   }
+   
+   // Public --------------------------------------------------------
+
+   public SimpleString getQueueName()
+   {
+      return queueName;
+   }
+
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putSimpleString(queueName);
+      buffer.putNullableSimpleString(filterString);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      queueName = buffer.getSimpleString();
+      filterString = buffer.getNullableSimpleString();
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", queueName=" + queueName + ", filterString="
+            + filterString + "]";
+   }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof SessionCreateBrowserMessage == false)
+      {
+         return false;
+      }
+            
+      SessionCreateBrowserMessage r = (SessionCreateBrowserMessage)other;
+      
+      return super.equals(other) && this.queueName.equals(r.queueName) &&
+             this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -28,7 +28,6 @@
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  *
  * @version <tt>$Revision$</tt>
  */
@@ -45,15 +44,13 @@
    private int windowSize;
    
    private int maxRate;
-
-   private boolean isBrowser;
-
+      
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    public SessionCreateConsumerMessage(final SimpleString queueName, final SimpleString filterString,   		                              
-   		                              final int windowSize, final int maxRate, final boolean isBrowser)
+   		                              final int windowSize, final int maxRate)
    {
       super(SESS_CREATECONSUMER);
 
@@ -61,7 +58,6 @@
       this.filterString = filterString;
       this.windowSize = windowSize;
       this.maxRate = maxRate;
-      this.isBrowser = isBrowser;
    }
    
    public SessionCreateConsumerMessage()
@@ -102,19 +98,13 @@
    {
    	return maxRate;
    }
-
-   public boolean isBrowser()
-   {
-      return isBrowser;
-   }
-
+   
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putSimpleString(queueName);
       buffer.putNullableSimpleString(filterString);
       buffer.putInt(windowSize);
       buffer.putInt(maxRate);
-      buffer.putBoolean(isBrowser);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
@@ -123,7 +113,6 @@
       filterString = buffer.getNullableSimpleString();
       windowSize = buffer.getInt();
       maxRate = buffer.getInt();
-      isBrowser = buffer.getBoolean();
    }
 
    public boolean equals(Object other)

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeliveryCompleteMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeliveryCompleteMessage.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeliveryCompleteMessage.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -1,58 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SessionDeliveryCompleteMessage extends PacketImpl
-{
-   private long consumerID;
-
-   public SessionDeliveryCompleteMessage(long consumerID)
-   {
-      super(SESS_DELIVERY_COMPLETE);
-      this.consumerID = consumerID;
-   }
-
-   public SessionDeliveryCompleteMessage()
-   {
-      super(SESS_DELIVERY_COMPLETE);
-   }
-
-   public void encodeBody(MessagingBuffer buffer)
-   {
-      buffer.putLong(consumerID);
-   }
-
-   public void decodeBody(MessagingBuffer buffer)
-   {
-      consumerID = buffer.getLong();
-   }
-
-   public long getConsumerID()
-   {
-      return consumerID;
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -24,16 +24,12 @@
 
 
 import java.util.List;
-import java.util.concurrent.Executor;
 
-
-
 /**
  * 
  * A ServerConsumer
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  *
  */
 public interface ServerConsumer extends Consumer
@@ -55,10 +51,4 @@
 	void failedOver();
 	
 	void deliverReplicated(final long messageID) throws Exception;
-
-   void deliver(Executor executor);
-
-   void stop() throws Exception;
-
-   void start();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -22,6 +22,10 @@
 
 package org.jboss.messaging.core.server;
 
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
@@ -29,11 +33,9 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.server.impl.ServerBrowserImpl;
 import org.jboss.messaging.util.SimpleString;
 
-import javax.transaction.xa.Xid;
-import java.util.List;
-
 /**
  *
  * A ServerSession
@@ -50,6 +52,8 @@
 
    String getPassword();
 
+   void removeBrowser(ServerBrowserImpl browser) throws Exception;
+
    void removeConsumer(ServerConsumer consumer) throws Exception;
 
    void removeProducer(ServerProducer producer) throws Exception;
@@ -109,8 +113,7 @@
    SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
                                                        SimpleString filterString,
                                                        int windowSize,
-                                                       int maxRate,
-                                                       boolean isBrowser) throws Exception;
+                                                       int maxRate) throws Exception;
 
    SessionCreateProducerResponseMessage createProducer(SimpleString address,
                                                        int windowSize,
@@ -121,16 +124,26 @@
 
    SessionBindingQueryResponseMessage executeBindingQuery(SimpleString address) throws Exception;
 
+   void createBrowser(SimpleString queueName, SimpleString filterString) throws Exception;
+
    void closeConsumer(long consumerID) throws Exception;
 
    void closeProducer(long producerID) throws Exception;
 
+   void closeBrowser(long browserID) throws Exception;
+
    void receiveConsumerCredits(long consumerID, int credits) throws Exception;
 
    void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
 
    void sendScheduledProducerMessage(long producerID, ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
 
+   boolean browserHasNextMessage(long browserID) throws Exception;
+
+   ServerMessage browserNextMessage(long browserID) throws Exception;
+
+   void browserReset(long browserID) throws Exception;
+
    int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
 
    void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
@@ -138,10 +151,4 @@
    void failedOver() throws Exception;
    
    void handleReplicatedDelivery(long consumerID, long messageID) throws Exception;   
-
-   void promptDelivery(ServerConsumer browser);
-
-   void resetConsumer(long consumerID) throws Exception;
-
-   void reStartConsumer(long consumerId);
 }

Copied: trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java (from rev 5121, trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,191 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.Message;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * Concrete implementation of BrowserEndpoint.
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @version <tt>$Revision: 3778 $</tt>
+ *
+ * $Id: ServerBrowserImpl.java 3778 2008-02-24 12:15:29Z timfox $
+ */
+public class ServerBrowserImpl
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(ServerBrowserImpl.class);
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private final long id;
+   private final ServerSession session;
+   private final Queue destination;
+   private final Filter filter;
+   private Iterator<ServerMessage> iterator;
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ServerBrowserImpl(final long id, final ServerSession session,                            
+                            final Queue destination, final String messageFilter) throws MessagingException
+   {
+      this.id = id;
+      
+      this.session = session;
+
+      this.destination = destination;
+
+		if (messageFilter != null)
+		{
+		   filter = new FilterImpl(new SimpleString(messageFilter));
+		}
+		else
+		{
+		   filter = null;
+		}
+   }
+
+   // BrowserEndpoint implementation ---------------------------------------------------------------
+
+   public long getID()
+   {
+   	return id;
+   }
+
+   public void reset() throws Exception
+   {
+      iterator = createIterator();
+   }
+
+   public boolean hasNextMessage() throws Exception
+   {
+      if (iterator == null)
+      {
+         iterator = createIterator();
+      }
+
+      boolean has = iterator.hasNext();
+
+      return has;
+   }
+
+   public ServerMessage nextMessage() throws Exception
+   {
+      if (iterator == null)
+      {
+         iterator = createIterator();
+      }
+
+      ServerMessage r = iterator.next();
+
+      return r;
+   }
+
+   public Message[] nextMessageBlock(int maxMessages) throws Exception
+   {
+      if (maxMessages < 2)
+      {
+         throw new IllegalArgumentException("maxMessages must be >=2 otherwise use nextMessage");
+      }
+
+      if (iterator == null)
+      {
+         iterator = createIterator();
+      }
+
+      List<ServerMessage> messages = new ArrayList<ServerMessage>(maxMessages);
+      int i = 0;
+      while (i < maxMessages)
+      {
+         if (iterator.hasNext())
+         {
+            ServerMessage m = iterator.next();
+            messages.add(m);
+            i++;
+         }
+         else
+         {
+            break;
+         }
+      }
+		return messages.toArray(new Message[messages.size()]);
+   }
+
+   public void close() throws Exception
+   {
+      iterator = null;
+
+      session.removeBrowser(this);
+
+      log.trace(this + " closed");
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return "BrowserEndpoint[" + id + "]";
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   private Iterator<ServerMessage> createIterator()
+   {
+      List<MessageReference> refs = destination.list(filter);
+
+      List<ServerMessage> msgs = new ArrayList<ServerMessage>();
+
+      for (MessageReference ref: refs)
+      {
+         msgs.add(ref.getMessage());
+      }
+
+      return msgs.iterator();
+   }
+
+   // Inner classes --------------------------------------------------------------------------------
+   
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -22,11 +22,8 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.filter.Filter;
@@ -35,7 +32,6 @@
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.DelayedResult;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeliveryCompleteMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
 import org.jboss.messaging.core.server.HandleStatus;
@@ -52,7 +48,6 @@
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * 
  * @version <tt>$Revision: 3783 $</tt> $Id: ServerConsumerImpl.java 3783 2008-02-25 12:15:14Z timfox $
  */
@@ -75,13 +70,13 @@
 
    private final Queue messageQueue;
 
-   protected final Filter filter;
+   private final Filter filter;
 
-   protected final ServerSession session;
+   private final ServerSession session;
 
    private final Object startStopLock = new Object();
 
-   protected final AtomicInteger availableCredits;
+   private final AtomicInteger availableCredits;
 
    private boolean started;
 
@@ -93,18 +88,8 @@
 
    private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
 
-   protected final Channel channel;
+   private final Channel channel;
 
-   private boolean browseOnly;
-
-   private Iterator<MessageReference> iterator;
-
-   private DeliveryRunner deliveryRunner = new DeliveryRunner();
-
-   private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
-
-   private boolean delivering = false;
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -118,13 +103,10 @@
                              final StorageManager storageManager,
                              final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                              final PostOffice postOffice,
-                             final Channel channel,
-                             final boolean browseOnly)
+                             final Channel channel)
    {
       this.id = id;
-
-      this.browseOnly = browseOnly;
-
+      
       this.messageQueue = messageQueue;
 
       this.filter = filter;
@@ -150,10 +132,7 @@
 
       this.channel = channel;
 
-      if (!browseOnly)
-      {
-         messageQueue.addConsumer(this);
-      }
+      messageQueue.addConsumer(this);
    }
 
    // ServerConsumer implementation
@@ -342,26 +321,6 @@
       }
    }
 
-   public void stop() throws Exception
-   {
-      delivering = false;
-   }
-
-   public void start()
-   {
-      iterator = getQueue().list(filter).iterator();
-      delivering = true;
-      promptDelivery();
-   }
-
-   public void deliver(Executor executor)
-   {
-      if (delivering && waitingToDeliver.compareAndSet(false, true))
-      {
-         executor.execute(deliveryRunner);
-      }
-   }
-
    // Public
    // -----------------------------------------------------------------------------
 
@@ -370,44 +329,10 @@
 
    private void promptDelivery()
    {
-      if (browseOnly)
-      {
-         session.promptDelivery(this);
-      }
-      else
-      {
-         session.promptDelivery(messageQueue);
-      }
+      session.promptDelivery(messageQueue);
    }
 
    // Inner classes
    // ------------------------------------------------------------------------
-   private class DeliveryRunner implements Runnable
-   {
-      public void run()
-      {
 
-         waitingToDeliver.set(false);
-
-         synchronized (ServerConsumerImpl.this)
-         {
-            while (delivering && iterator.hasNext() && !(availableCredits != null && availableCredits.get() <= 0))
-            {
-               MessageReference ref = iterator.next();
-               if (availableCredits != null)
-               {
-                  availableCredits.addAndGet(-ref.getMessage().getEncodeSize());
-               }
-               channel.send(new SessionReceiveMessage(id, ref.getMessage(), 1));
-            }
-            // inform the client there are no more messages
-            if (!iterator.hasNext() || !delivering)
-            {
-               channel.send(new SessionDeliveryCompleteMessage(id));
-               iterator = null;
-            }
-         }
-
-      }
-   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -104,6 +104,8 @@
 
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
 
+   private final Map<Long, ServerBrowserImpl> browsers = new ConcurrentHashMap<Long, ServerBrowserImpl>();
+
    private final Map<Long, ServerProducer> producers = new ConcurrentHashMap<Long, ServerProducer>();
 
    private final Executor executor;
@@ -218,6 +220,14 @@
       return id;
    }
 
+   public void removeBrowser(final ServerBrowserImpl browser) throws Exception
+   {
+      if (browsers.remove(browser.getID()) == null)
+      {
+         throw new IllegalStateException("Cannot find browser with id " + browser.getID() + " to remove");
+      }
+   }
+
    public void removeConsumer(final ServerConsumer consumer) throws Exception
    {
       if (consumers.remove(consumer.getID()) == null)
@@ -269,6 +279,15 @@
 
       consumers.clear();
 
+      Set<ServerBrowserImpl> browsersClone = new HashSet<ServerBrowserImpl>(browsers.values());
+
+      for (ServerBrowserImpl browser : browsersClone)
+      {
+         browser.close();
+      }
+
+      browsers.clear();
+
       Set<ServerProducer> producersClone = new HashSet<ServerProducer>(producers.values());
 
       for (ServerProducer producer : producersClone)
@@ -286,21 +305,6 @@
       queue.deliverAsync(executor);
    }
 
-   public void promptDelivery(ServerConsumer consumer)
-   {
-      consumer.deliver(executor);
-   }
-
-   public void resetConsumer(long consumerID) throws Exception
-   {
-      consumers.get(consumerID).stop();
-   }
-
-   public void reStartConsumer(long consumerID)
-   {
-      consumers.get(consumerID).start();
-   }
-
    public void send(final ServerMessage msg) throws Exception
    {
       // check the user has write access to this address.
@@ -862,8 +866,7 @@
    public SessionCreateConsumerResponseMessage createConsumer(final SimpleString queueName,
                                                               final SimpleString filterString,
                                                               int windowSize,
-                                                              int maxRate,
-                                                              final boolean isBrowser) throws Exception
+                                                              int maxRate) throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
 
@@ -904,8 +907,7 @@
                                                        storageManager,
                                                        queueSettingsRepository,
                                                        postOffice,
-                                                       channel,
-                                                       isBrowser);
+                                                       channel);
 
       SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(windowSize);
 
@@ -971,6 +973,25 @@
       return new SessionBindingQueryResponseMessage(exists, queueNames);
    }
 
+   public void createBrowser(final SimpleString queueName, final SimpleString filterString) throws Exception
+   {
+      Binding binding = postOffice.getBinding(queueName);
+
+      if (binding == null)
+      {
+         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+      }
+
+      securityStore.check(binding.getAddress(), CheckType.READ, this);
+
+      ServerBrowserImpl browser = new ServerBrowserImpl(idGenerator.generateID(),
+                                                        this,
+                                                        binding.getQueue(),
+                                                        filterString == null ? null : filterString.toString());
+
+      browsers.put(browser.getID(), browser);
+   }
+
    /**
     * Create a producer for the specified address
     *
@@ -1022,6 +1043,26 @@
       return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
    }
 
+   public boolean browserHasNextMessage(final long browserID) throws Exception
+   {
+      return browsers.get(browserID).hasNextMessage();
+   }
+
+   public ServerMessage browserNextMessage(final long browserID) throws Exception
+   {
+      return browsers.get(browserID).nextMessage();
+   }
+
+   public void browserReset(final long browserID) throws Exception
+   {
+      browsers.get(browserID).reset();
+   }
+
+   public void closeBrowser(final long browserID) throws Exception
+   {
+      browsers.get(browserID).close();
+   }
+
    public void closeConsumer(final long consumerID) throws Exception
    {
       consumers.get(consumerID).close();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -16,11 +16,14 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_STOP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
@@ -31,7 +34,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SCHEDULED_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
@@ -63,13 +65,19 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStartMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerStopMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
@@ -163,8 +171,7 @@
                response = session.createConsumer(request.getQueueName(),
                                                  request.getFilterString(),
                                                  request.getWindowSize(),
-                                                 request.getMaxRate(),
-                                                 request.isBrowser());
+                                                 request.getMaxRate());
                break;
             }
             case SESS_CREATEQUEUE:
@@ -197,6 +204,13 @@
                response = session.executeBindingQuery(request.getAddress());
                break;
             }
+            case SESS_CREATEBROWSER:
+            {
+               SessionCreateBrowserMessage request = (SessionCreateBrowserMessage)packet;
+               session.createBrowser(request.getQueueName(), request.getFilterString());
+               response = new NullResponseMessage();
+               break;
+            }
             case SESS_CREATEPRODUCER:
             {
                SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
@@ -206,20 +220,6 @@
                                                  request.isAutoGroupId());
                break;
             }
-            case SESS_CONSUMER_STOP:
-            {
-               SessionConsumerStopMessage request = (SessionConsumerStopMessage) packet;
-               session.resetConsumer(request.getConsumerId());
-               response = new NullResponseMessage();
-               break;
-            }
-            case SESS_CONSUMER_START:
-            {
-               SessionConsumerStartMessage request = (SessionConsumerStartMessage) packet;
-               session.reStartConsumer(request.getConsumerId());
-               response = new NullResponseMessage();
-               break;
-            }
             case SESS_ACKNOWLEDGE:
             {
                SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
@@ -362,6 +362,13 @@
                response = new NullResponseMessage();
                break;
             }
+            case SESS_BROWSER_CLOSE:
+            {
+               SessionBrowserCloseMessage message = (SessionBrowserCloseMessage)packet;
+               session.closeBrowser(message.getBrowserID());
+               response = new NullResponseMessage();
+               break;
+            }
             case SESS_FLOWTOKEN:
             {
                SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
@@ -390,13 +397,33 @@
                }
                break;
             }
+            case SESS_BROWSER_HASNEXTMESSAGE:
+            {
+               SessionBrowserHasNextMessageMessage message = (SessionBrowserHasNextMessageMessage)packet;
+               response = new SessionBrowserHasNextMessageResponseMessage(session.browserHasNextMessage(message.getBrowserID()));
+               break;
+            }
+            case SESS_BROWSER_NEXTMESSAGE:
+            {
+               SessionBrowserNextMessageMessage message = (SessionBrowserNextMessageMessage)packet;
+               ServerMessage smsg = session.browserNextMessage(message.getBrowserID());
+               response = new SessionBrowseMessage(smsg);
+               break;
+            }
+            case SESS_BROWSER_RESET:
+            {
+               SessionBrowserResetMessage message = (SessionBrowserResetMessage)packet;
+               session.browserReset(message.getBrowserID());
+               response = new NullResponseMessage();
+               break;
+            }
             case SESS_MANAGEMENT_SEND:
             {
                SessionSendManagementMessage message = (SessionSendManagementMessage)packet;
                session.handleManagementMessage(message);
                break;
             }
-            case SESS_REPLICATE_DELIVERY:
+            case PacketImpl.SESS_REPLICATE_DELIVERY:
             {
                SessionReplicateDeliveryMessage message = (SessionReplicateDeliveryMessage)packet;
                session.handleReplicatedDelivery(message.getConsumerID(), message.getMessageID());
@@ -433,7 +460,7 @@
          
          if (result == null)
          {
-            //Not clustered - just send now
+            // Not clustered - just send now
             channel.send(response);              
             
             if (closeChannel)

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossQueueBrowser.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -22,20 +22,19 @@
 
 package org.jboss.messaging.jms.client;
 
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
+import java.util.Enumeration;
 
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
-import java.util.Enumeration;
-import java.util.NoSuchElementException;
 
+import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  *
  * $Id$
  */
@@ -45,22 +44,19 @@
 
    private static final Logger log = Logger.getLogger(JBossQueueBrowser.class);
 
-   private static final long NEXT_MESSAGE_TIMEOUT = 5000;
-
    // Static ---------------------------------------------------------------------------------------
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private ClientConsumer consumer;
+   private ClientBrowser browser;
    private Queue queue;
    private String messageSelector;
-   private boolean firstTime = true;
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public JBossQueueBrowser(Queue queue, String messageSelector, ClientConsumer consumer)
+   public JBossQueueBrowser(Queue queue, String messageSelector, ClientBrowser browser)
    {
-      this.consumer = consumer;
+      this.browser = browser;
       this.queue = queue;
       this.messageSelector = messageSelector;
    }
@@ -71,7 +67,7 @@
    {
       try
       {
-         consumer.close();
+         browser.close();
       }
       catch (MessagingException e)
       {
@@ -83,20 +79,12 @@
    {
       try
       {
-         if(firstTime)
-         {
-            consumer.start();
-            firstTime = false;
-         }
-         else
-         {
-            consumer.restart();
-         }
+         browser.reset();
          return new BrowserEnumeration();
       }
       catch (MessagingException e)
       {
-         throw JMSExceptionHelper.convertFromMessagingException(e);
+         throw JMSExceptionHelper.convertFromMessagingException(e);     
       }
    }
 
@@ -114,7 +102,7 @@
 
    public String toString()
    {
-      return "JBossQueueBrowser->" + consumer;
+      return "JBossQueueBrowser->" + browser;
    }
 
    // Package protected ----------------------------------------------------------------------------
@@ -131,9 +119,9 @@
       {
          try
          {            
-            return consumer.awaitMessage(NEXT_MESSAGE_TIMEOUT);
+            return browser.hasNextMessage();
          }
-         catch (Exception e)
+         catch (MessagingException e)
          {
             throw new IllegalStateException(e.getMessage());
          }
@@ -143,18 +131,8 @@
       {
          try
          {
-            if(!hasMoreElements())
-            {
-               throw new NoSuchElementException();  
-            }
+            ClientMessage message = browser.nextMessage();
 
-            ClientMessage message = consumer.receiveImmediate();
-
-            if(message == null)
-            {
-               throw new NoSuchElementException();
-            }
-
             JBossMessage jbm = JBossMessage.createMessage(message, null);
             
             jbm.doBeforeReceive();                        

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -22,19 +22,10 @@
 
 package org.jboss.messaging.jms.client;
 
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.jms.JBossDestination;
-import org.jboss.messaging.jms.JBossQueue;
-import org.jboss.messaging.jms.JBossTemporaryQueue;
-import org.jboss.messaging.jms.JBossTemporaryTopic;
-import org.jboss.messaging.jms.JBossTopic;
-import org.jboss.messaging.util.SimpleString;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
 
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
@@ -67,11 +58,22 @@
 import javax.jms.XASession;
 import javax.jms.XATopicSession;
 import javax.transaction.xa.XAResource;
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
 
+import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.JBossTemporaryQueue;
+import org.jboss.messaging.jms.JBossTemporaryTopic;
+import org.jboss.messaging.jms.JBossTopic;
+import org.jboss.messaging.util.SimpleString;
+
 /**
  * 
  * Note that we *do not* support JMS ASF (Application Server Facilities) optional
@@ -79,7 +81,7 @@
  * 
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * 
  * @version <tt>$Revision$</tt>
  * 
@@ -638,7 +640,7 @@
       {
          String coreSelector = SelectorTranslator.convertToJBMFilterString(filterString);
 
-         ClientConsumer browser = session.createBrowser(jbq.getSimpleAddress(),
+         ClientBrowser browser = session.createBrowser(jbq.getSimpleAddress(),
                                                        coreSelector == null ? null : new SimpleString(coreSelector));
 
          return new JBossQueueBrowser(queue, filterString, browser);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -79,137 +79,237 @@
 
    // Public --------------------------------------------------------
 
+   class RunnableTestA extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestA(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestB extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestB(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestC extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestC(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestD extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestD(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestE extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestE(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestF extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestF(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestG extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestG(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestH extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestH(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestI extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestI(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestJ extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestJ(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestK extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestK(sf, threadNum);
+      }
+   }
+   
+   class RunnableTestL extends RunnableTest
+   {
+      public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+      {
+         doTestL(sf);
+      }
+   }
+   
+   public static void main(String[] args)
+   {
+      log.info("Starting tests");
+      
+      try
+      {
+         MultiThreadRandomFailoverTest test = new MultiThreadRandomFailoverTest(); 
+         
+         log.info("starting a");
+         test.setUp();
+         test.testA();
+         test.tearDown();
+         
+         log.info("starting b");
+         test.setUp();
+         test.testB();
+         test.tearDown();
+         
+         log.info("starting c");
+         test.setUp();
+         test.testC();
+         test.tearDown();
+         
+         log.info("starting d");
+         test.setUp();
+         test.testD();
+         test.tearDown();
+         
+         log.info("starting e");
+         test.setUp();
+         test.testE();
+         test.tearDown();
+         
+         log.info("starting f");
+         test.setUp();
+         test.testF();
+         test.tearDown();
+         
+         log.info("starting g");
+         test.setUp();
+         test.testG();
+         test.tearDown();
+         
+         log.info("starting h");
+         test.setUp();
+         test.testH();
+         test.tearDown();
+         
+         log.info("starting i");
+         test.setUp();
+         test.testI();
+         test.tearDown();
+         
+         log.info("starting j");
+         test.setUp();
+         test.testJ();
+         test.tearDown();
+         
+         log.info("starting k");
+         test.setUp();
+         test.testK();
+         test.tearDown();
+         
+         log.info("starting l");
+         test.setUp();
+         test.testL();
+         test.tearDown();
+         
+         log.info("done");
+      }
+      catch (Throwable t)
+      {
+         log.error("Tests failed", t);
+      }
+   }
 
+
    public void testA() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestA(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestA(), NUM_THREADS);
    }
    
    public void testB() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestB(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestB(), NUM_THREADS);
    }
    
    public void testC() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestC(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestC(), NUM_THREADS);
    }
    
    public void testD() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestD(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestD(), NUM_THREADS);
    }
-
+   
    public void testE() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestE(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestE(), NUM_THREADS);
    }
-
+   
    public void testF() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestF(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestF(), NUM_THREADS);
    }
-
+   
    public void testG() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestG(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestG(), NUM_THREADS);
    }
    
    public void testH() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestH(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestH(), NUM_THREADS);
    }
    
    public void testI() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestI(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestI(), NUM_THREADS);
    }
    
    public void testJ() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestJ(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestJ(), NUM_THREADS);
    }
    
    public void testK() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestK(sf, threadNum);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestK(), NUM_THREADS);
    }
    
    public void testL() throws Exception
    {
-      runTestMultipleThreads(new RunnableTest()
-      {
-         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-         {
-            doTestL(sf);
-         }
-      }, NUM_THREADS);
+      runTestMultipleThreads(new RunnableTestL(), NUM_THREADS);
    }
    
    // Package protected ---------------------------------------------
@@ -1277,7 +1377,7 @@
 
    protected int getNumIterations()
    {
-      return 20;
+      return 2;
    }
    
    protected void setUp() throws Exception

Copied: trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java (from rev 5121, trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.timing.core.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.util.id.GUID;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledMessageTest extends UnitTestCase
+{
+   private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+
+   private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+   private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/journal";
+
+   private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/bindings";
+
+   private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/page";
+
+   private SimpleString atestq = new SimpleString("ascheduledtestq");
+
+   private MessagingService messagingService;
+
+   private ConfigurationImpl configuration;
+
+   protected void setUp() throws Exception
+   {
+      File file = new File(journalDir);
+      File file2 = new File(bindingsDir);
+      File file3 = new File(pageDir);
+      deleteDirectory(file);
+      file.mkdirs();
+      deleteDirectory(file2);
+      file2.mkdirs();
+      deleteDirectory(file3);
+      file3.mkdirs();
+      configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setPagingDirectory(pageDir);
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (messagingService != null)
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e)
+         {
+            //ignore
+         }
+      }
+      new File(journalDir).delete();
+      new File(bindingsDir).delete();
+      new File(pageDir).delete();
+   }
+
+   public void testRecoveredMessageDeliveredCorrectly() throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      message.getBody().flip();
+      message.setDurable(true);
+      Calendar cal = Calendar.getInstance();
+      cal.roll(Calendar.SECOND, 10);
+      producer.send(message, cal.getTimeInMillis());
+
+      producer.close();
+      session.close();
+      messagingService.stop();
+      messagingService = null;
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService.start();
+
+      sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+      session = sessionFactory.createSession(false, true, true, false);
+
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(10000);
+      assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+      assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+
+   public void testRecoveredTxMessageDeliveredCorrectly() throws Exception
+   {
+       Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(true, false, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      session.start(xid,  XAResource.TMNOFLAGS);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      message.getBody().flip();
+      message.setDurable(true);
+      Calendar cal = Calendar.getInstance();
+      cal.roll(Calendar.SECOND, 10);
+      producer.send(message, cal.getTimeInMillis());
+      session.end(xid, XAResource.TMSUCCESS);
+      session.prepare(xid);
+      producer.close();
+      session.close();
+      messagingService.stop();
+      messagingService = null;
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService.start();
+
+      sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+      session = sessionFactory.createSession(true, false, false, false);
+      session.commit(xid, true);
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(10000);
+      assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+      assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+
+   public void testPagedMessageDeliveredCorrectly() throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      configuration.setPagingMaxGlobalSizeBytes(0);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      message.getBody().flip();
+      message.setDurable(true);
+      Calendar cal = Calendar.getInstance();
+      cal.roll(Calendar.SECOND, 10);
+      producer.send(message, cal.getTimeInMillis());
+
+      producer.close();
+
+      
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(10000);
+      assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+      assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossQueueBrowserTest.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -22,26 +22,29 @@
 
 package org.jboss.messaging.tests.unit.jms.client;
 
-import junit.framework.TestCase;
 import static org.easymock.EasyMock.createStrictMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
-import org.jboss.messaging.core.client.ClientConsumer;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import java.util.Enumeration;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientBrowser;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.jms.JBossQueue;
 import org.jboss.messaging.jms.client.JBossMessage;
 import org.jboss.messaging.jms.client.JBossQueueBrowser;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import java.util.Enumeration;
-
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -64,7 +67,7 @@
    {
       String messageSelector = "color = 'green'";
       Queue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       replay(clientBrowser);
 
       JBossQueueBrowser browser = new JBossQueueBrowser(queue, messageSelector,
@@ -77,7 +80,7 @@
    public void testGetQueue() throws Exception
    {
       Queue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       replay(clientBrowser);
 
       JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
@@ -90,7 +93,7 @@
    public void testClose() throws Exception
    {
       Queue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       clientBrowser.close();
       replay(clientBrowser);
 
@@ -105,7 +108,7 @@
    public void testCloseThrowsException() throws Exception
    {
       Queue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       clientBrowser.close();
       expectLastCall().andThrow(new MessagingException());
 
@@ -128,8 +131,8 @@
    public void testGetEnumeration() throws Exception
    {
       Queue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
-      clientBrowser.start();
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+      clientBrowser.reset();
       replay(clientBrowser);
 
       JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
@@ -141,18 +144,40 @@
       verify(clientBrowser);
    }
 
+   public void testGetEnumerationThrowsException() throws Exception
+   {
+      Queue queue = new JBossQueue(randomString());
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+      clientBrowser.reset();
+      expectLastCall().andThrow(new MessagingException());
+      replay(clientBrowser);
+
+      JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
+            clientBrowser);
+
+      try
+      {
+         browser.getEnumeration();
+         fail("JMSException");
+      } catch (JMSException e)
+      {
+      }
+
+      verify(clientBrowser);
+   }
+
    public void testGetEnumerationWithOneMessage() throws Exception
    {
       Queue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       ClientMessage clientMessage = createStrictMock(ClientMessage.class);
       MessagingBuffer buffer = createStrictMock(MessagingBuffer.class);
-      clientBrowser.start();
-      expect(clientBrowser.awaitMessage(5000)).andStubReturn(true);
+      clientBrowser.reset();
+      expect(clientBrowser.hasNextMessage()).andReturn(true);
       expect(clientMessage.getType()).andReturn(JBossMessage.TYPE);
       expect(clientMessage.getBody()).andStubReturn(buffer);
-      expect(clientBrowser.receiveImmediate()).andReturn(clientMessage);
-      expect(clientBrowser.awaitMessage(5000)).andReturn(false);
+      expect(clientBrowser.nextMessage()).andReturn(clientMessage);
+      expect(clientBrowser.hasNextMessage()).andReturn(false);
       replay(clientMessage, clientBrowser);
 
       JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
@@ -171,9 +196,9 @@
          throws Exception
    {
       Queue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
-      clientBrowser.start();
-      expect(clientBrowser.awaitMessage(5000)).andThrow(new MessagingException());
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+      clientBrowser.reset();
+      expect(clientBrowser.hasNextMessage()).andThrow(new MessagingException());
       replay(clientBrowser);
 
       JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,
@@ -196,10 +221,10 @@
    public void testGetEnumerationWithNextThrowsException() throws Exception
    {
       Queue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
-      clientBrowser.start();
-      expect(clientBrowser.awaitMessage(5000)).andStubReturn(true);
-      expect(clientBrowser.receiveImmediate()).andThrow(new MessagingException());
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
+      clientBrowser.reset();
+      expect(clientBrowser.hasNextMessage()).andReturn(true);
+      expect(clientBrowser.nextMessage()).andThrow(new MessagingException());
       replay(clientBrowser);
 
       JBossQueueBrowser browser = new JBossQueueBrowser(queue, null,

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java	2008-10-16 22:34:53 UTC (rev 5127)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java	2008-10-17 10:14:46 UTC (rev 5128)
@@ -22,8 +22,6 @@
 
 package org.jboss.messaging.tests.unit.jms.client;
 
-import junit.framework.TestCase;
-import org.easymock.EasyMock;
 import static org.easymock.EasyMock.createStrictMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -32,28 +30,13 @@
 import static org.easymock.EasyMock.isNull;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientMessageImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.jms.JBossDestination;
-import org.jboss.messaging.jms.JBossQueue;
-import org.jboss.messaging.jms.JBossTemporaryQueue;
-import org.jboss.messaging.jms.JBossTemporaryTopic;
-import org.jboss.messaging.jms.JBossTopic;
-import org.jboss.messaging.jms.client.JBossConnection;
-import org.jboss.messaging.jms.client.JBossMessage;
-import org.jboss.messaging.jms.client.JBossSession;
 import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-import org.jboss.messaging.util.SimpleString;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -80,10 +63,31 @@
 import javax.jms.TopicSession;
 import javax.jms.TransactionInProgressException;
 import javax.transaction.xa.XAResource;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
+import junit.framework.TestCase;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientMessageImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.JBossTemporaryQueue;
+import org.jboss.messaging.jms.JBossTemporaryTopic;
+import org.jboss.messaging.jms.JBossTopic;
+import org.jboss.messaging.jms.client.JBossConnection;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.util.SimpleString;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -1215,7 +1219,7 @@
    public void testCreateBrowser() throws Exception
    {
       JBossQueue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       expect(mockClientSession.createBrowser(queue.getSimpleAddress(), null))
             .andReturn(clientBrowser);
 
@@ -1236,7 +1240,7 @@
    public void testCreateBrowserThrowsException() throws Exception
    {
       JBossQueue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       expect(mockClientSession.createBrowser(queue.getSimpleAddress(), null))
             .andThrow(new MessagingException());
 
@@ -1262,7 +1266,7 @@
    public void testCreateBrowserWithEmptyFilter() throws Exception
    {
       JBossQueue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       expect(mockClientSession.createBrowser(queue.getSimpleAddress(), null))
             .andReturn(clientBrowser);
 
@@ -1284,7 +1288,7 @@
    {
       String filter = "color = 'red'";
       JBossQueue queue = new JBossQueue(randomString());
-      ClientConsumer clientBrowser = createStrictMock(ClientConsumer.class);
+      ClientBrowser clientBrowser = createStrictMock(ClientBrowser.class);
       expect(
             mockClientSession.createBrowser(queue.getSimpleAddress(),
                   new SimpleString(filter))).andReturn(clientBrowser);




More information about the jboss-cvs-commits mailing list