[jboss-cvs] JBoss Messaging SVN: r3778 - in trunk: src/main/org/jboss/jms/server and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Feb 24 07:15:31 EST 2008


Author: timfox
Date: 2008-02-24 07:15:29 -0500 (Sun, 24 Feb 2008)
New Revision: 3778

Added:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionPacketHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerPacketHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java
Modified:
   trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java
   trunk/src/main/org/jboss/jms/server/ConnectionManager.java
   trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/jms/server/container/SecurityManager.java
   trunk/src/main/org/jboss/jms/server/endpoint/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerPacketHandlerSupport.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
   trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
Refactor and create interfaces for endpoints


Modified: trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -33,24 +33,28 @@
  */
 public class AsfMessageHolder
 {
-   private JBossMessage msg;
+   private final JBossMessage msg;
    
-   private String consumerID;
+   private final String consumerID;
    
-   private String queueName;
+   private final String queueName;
    
-   private int maxDeliveries;
+   private final int maxDeliveries;
    
-   private ClientSession connectionConsumerSession;
+   private final ClientSession connectionConsumerSession;
    
-   public AsfMessageHolder(JBossMessage msg, String consumerID,
-         String queueName, int maxDeliveries,
-         ClientSession connectionConsumerSession)
+   public AsfMessageHolder(final JBossMessage msg, final String consumerID,
+                           final String queueName, final int maxDeliveries,
+                           final ClientSession connectionConsumerSession)
    {
       this.msg = msg;
+      
       this.consumerID = consumerID;
+      
       this.queueName = queueName;
+      
       this.maxDeliveries = maxDeliveries;
+      
       this.connectionConsumerSession = connectionConsumerSession;
    }
 
@@ -59,48 +63,23 @@
       return msg;
    }
 
-   public void setMsg(JBossMessage msg)
-   {
-      this.msg = msg;
-   }
-
    public String getConsumerID()
    {
       return consumerID;
    }
 
-   public void setConsumerID(String consumerID)
-   {
-      this.consumerID = consumerID;
-   }
-
    public String getQueueName()
    {
       return queueName;
    }
 
-   public void setQueueName(String queueName)
-   {
-      this.queueName = queueName;
-   }
-
    public int getMaxDeliveries()
    {
       return maxDeliveries;
    }
 
-   public void setMaxDeliveries(int maxDeliveries)
-   {
-      this.maxDeliveries = maxDeliveries;
-   }
-
    public ClientSession getConnectionConsumerSession()
    {
       return connectionConsumerSession;
    }
-
-   public void setConnectionConsumerSession(ClientSession connectionConsumerSession)
-   {
-      this.connectionConsumerSession = connectionConsumerSession;
-   }
 }

Modified: trunk/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionManager.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/ConnectionManager.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -23,7 +23,7 @@
 
 import java.util.List;
 
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnection;
 import org.jboss.messaging.core.MessagingComponent;
 
 
@@ -42,20 +42,20 @@
 public interface ConnectionManager extends MessagingComponent
 {
    void registerConnection(String clientVMID,
-                           String remotingClientSessionID, ServerConnectionEndpoint endpoint);
+                           String remotingClientSessionID, ServerConnection endpoint);
 
    /**
-    * @param serverConnectionEndpoint 
+    * @param ServerConnection 
     * @return null if there is no such connection.
     */
-   ServerConnectionEndpoint unregisterConnection(String remotingClientSessionID, ServerConnectionEndpoint serverConnectionEndpoint);
+   ServerConnection unregisterConnection(String remotingClientSessionID, ServerConnection ServerConnection);
    
    /**
     * Returns a list of active connection endpoints currently maintained by an instance of this
     * manager. The implementation should make a copy of the list to avoid
     * ConcurrentModificationException. The list could be empty, but never null.
     *
-    * @return List<ServerConnectionEndpoint>
+    * @return List<ServerConnection>
     */
-   List<ServerConnectionEndpoint> getActiveConnections();
+   List<ServerConnection> getActiveConnections();
 }

Modified: trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,38 +21,38 @@
    */
 package org.jboss.jms.server;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.jms.Message;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+import javax.naming.NamingException;
+
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.api.ClientConnectionFactory;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.destination.JBossTopic;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.server.endpoint.ServerConnection;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.MessagingServerManagement;
 import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
 import org.jboss.messaging.core.impl.server.SubscriptionInfo;
-import org.jboss.jms.server.MessageStatistics;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
-import org.jboss.messaging.core.impl.filter.FilterImpl;
 import org.jboss.messaging.deployers.Deployer;
 import org.jboss.messaging.deployers.DeploymentManager;
 import org.jboss.messaging.util.JNDIUtil;
 import org.jboss.messaging.util.MessageQueueNameHelper;
-import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
-import javax.naming.NamingException;
-import javax.jms.Message;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
-
 /**
  * A Deployer used to create and add to JNDI queues, topics and connection factories. Typically this would only be used
  * in an app server env.
@@ -502,13 +502,13 @@
    public List<ClientInfo> getClients() throws Exception
    {
       List<ClientInfo> clientInfos = new ArrayList<ClientInfo>();
-      List<ServerConnectionEndpoint> endpoints = messagingServerManagement.getActiveConnections();
-      for (ServerConnectionEndpoint endpoint : endpoints)
+      List<ServerConnection> endpoints = messagingServerManagement.getActiveConnections();
+      for (ServerConnection endpoint : endpoints)
       {
          clientInfos.add(new ClientInfo(endpoint.getUsername(),
                  endpoint.getClientAddress(),
                  endpoint.isStarted(),
-                 endpoint.getCreated()));
+                 endpoint.getCreatedTime()));
       }
       return clientInfos;
    }

Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -32,7 +32,7 @@
 import org.jboss.jms.client.api.FailureListener;
 import org.jboss.jms.client.impl.JMSClientVMIdentifier;
 import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnection;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 import org.jboss.messaging.util.RemotingException;
@@ -57,9 +57,9 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private Map<String /* remoting session ID */, List<ServerConnectionEndpoint>> endpoints;
+   private Map<String /* remoting session ID */, List<ServerConnection>> endpoints;
 
-   private Set<ServerConnectionEndpoint> activeServerConnectionEndpoints;
+   private Set<ServerConnection> activeServerConnections;
 
    // the clients maps is for information only: to better identify the clients of
    // jboss messaging using their VM ID
@@ -69,27 +69,27 @@
 
    public SimpleConnectionManager()
    {
-      endpoints = new HashMap<String, List<ServerConnectionEndpoint>>();
-      activeServerConnectionEndpoints = new HashSet<ServerConnectionEndpoint>();
+      endpoints = new HashMap<String, List<ServerConnection>>();
+      activeServerConnections = new HashSet<ServerConnection>();
       clients = new HashMap<String, String>();
    }
 
    // ConnectionManager implementation -------------------------------------------------------------
 
    public synchronized void registerConnection(String clientVMID, String remotingClientSessionID,
-         ServerConnectionEndpoint endpoint)
+         ServerConnection endpoint)
    {    
-      List<ServerConnectionEndpoint> connectionEndpoints = endpoints.get(remotingClientSessionID);
+      List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
 
       if (connectionEndpoints == null)
       {
-         connectionEndpoints = new ArrayList<ServerConnectionEndpoint>();
+         connectionEndpoints = new ArrayList<ServerConnection>();
          endpoints.put(remotingClientSessionID, connectionEndpoints);
       }
 
       connectionEndpoints.add(endpoint);
 
-      activeServerConnectionEndpoints.add(endpoint);
+      activeServerConnections.add(endpoint);
 
       clients.put(remotingClientSessionID, clientVMID);
 
@@ -97,10 +97,10 @@
             Util.guidToString(remotingClientSessionID));
    }
    
-   public synchronized ServerConnectionEndpoint unregisterConnection(String remotingClientSessionID,
-         ServerConnectionEndpoint endpoint)
+   public synchronized ServerConnection unregisterConnection(String remotingClientSessionID,
+         ServerConnection endpoint)
    {
-      List<ServerConnectionEndpoint> connectionEndpoints = endpoints.get(remotingClientSessionID);
+      List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
 
       if (connectionEndpoints != null)
       {
@@ -108,7 +108,7 @@
 
          if (removed)
          {
-            activeServerConnectionEndpoints.remove(endpoint);
+            activeServerConnections.remove(endpoint);
          }
 
          log.debug("unregistered connection " + endpoint + " with remoting session ID " + remotingClientSessionID);
@@ -124,11 +124,11 @@
       return null;
    }
    
-   public synchronized List<ServerConnectionEndpoint> getActiveConnections()
+   public synchronized List<ServerConnection> getActiveConnections()
    {
       // I will make a copy to avoid ConcurrentModification
-      List<ServerConnectionEndpoint> list = new ArrayList<ServerConnectionEndpoint>();
-      list.addAll(activeServerConnectionEndpoints);
+      List<ServerConnection> list = new ArrayList<ServerConnection>();
+      list.addAll(activeServerConnections);
       return list;
    }      
       
@@ -199,15 +199,15 @@
    {
       assert remotingClientSessionID != null;
       
-      List<ServerConnectionEndpoint> connectionEndpoints = endpoints.get(remotingClientSessionID);
+      List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
       // the connection endpoints are copied in a new list to avoid concurrent modification exception
-      List<ServerConnectionEndpoint> copy;
+      List<ServerConnection> copy;
       if (connectionEndpoints != null)
-         copy = new ArrayList<ServerConnectionEndpoint>(connectionEndpoints);
+         copy = new ArrayList<ServerConnection>(connectionEndpoints);
       else
-         copy = new ArrayList<ServerConnectionEndpoint>();
+         copy = new ArrayList<ServerConnection>();
          
-      for (ServerConnectionEndpoint sce : copy)
+      for (ServerConnection sce : copy)
       {
          try
          {
@@ -244,11 +244,11 @@
          {
             buff.append("    No registered endpoints\n");
          }
-         for (Entry<String, List<ServerConnectionEndpoint>> entry : endpoints.entrySet())
+         for (Entry<String, List<ServerConnection>> entry : endpoints.entrySet())
          {
-            List<ServerConnectionEndpoint> connectionEndpoints = entry.getValue();
+            List<ServerConnection> connectionEndpoints = entry.getValue();
             buff.append("    "  + entry.getKey() + "----->\n");
-            for (ServerConnectionEndpoint sce : connectionEndpoints)
+            for (ServerConnection sce : connectionEndpoints)
             {
                buff.append("        " + sce + " (" + System.identityHashCode(sce) + ")\n");
             }

Modified: trunk/src/main/org/jboss/jms/server/container/SecurityManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/container/SecurityManager.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/container/SecurityManager.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -25,7 +25,7 @@
 import java.util.Set;
 
 import org.jboss.jms.server.SecurityStore;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnection;
 import org.jboss.jms.server.security.CheckType;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
@@ -134,7 +134,7 @@
       return granted;
    }
 
-   public void check(String address, CheckType checkType, ServerConnectionEndpoint conn)
+   public void check(String address, CheckType checkType, ServerConnection conn)
       throws MessagingException
    {
       if (trace) { log.trace("checking access permissions to " + address); }
@@ -145,7 +145,7 @@
          return;
       }
 
-      SecurityStore sm = conn.getSecurityManager();
+      SecurityStore sm = conn.getSecurityStore();
 
       // Authenticate. Successful autentication will place a new SubjectContext on thread local,
       // which will be used in the authorization process. However, we need to make sure we clean up

Modified: trunk/src/main/org/jboss/jms/server/endpoint/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/MessagingServerPacketHandler.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/MessagingServerPacketHandler.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -24,8 +24,13 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.CREATECONNECTION;
 
 import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
+import org.jboss.jms.server.ConnectionManager;
+import org.jboss.jms.server.SecurityStore;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.ResourceManager;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
@@ -44,11 +49,34 @@
 {
    private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
    
-   private MessagingServer messagingServer;
+   private final PacketDispatcher dispatcher;
+   
+   private final ResourceManager resourceManager;
+   
+   private final PersistenceManager persistenceManager;
+   
+   private final PostOffice postOffice;
+   
+   private final SecurityStore securityStore;
+   
+   private final ConnectionManager connectionManager;
 
-   public MessagingServerPacketHandler(MessagingServer messagingServer)
+   public MessagingServerPacketHandler(final PacketDispatcher dispatcher, final ResourceManager resourceManager,
+   		                              final PersistenceManager persistenceManager,
+   		                              final PostOffice postOffice, final SecurityStore securityStore,
+   		                              final ConnectionManager connectionManager)
    {
-      this.messagingServer = messagingServer;
+      this.dispatcher = dispatcher;
+      
+      this.resourceManager = resourceManager;
+      
+      this.persistenceManager = persistenceManager;
+      
+      this.postOffice = postOffice;
+      
+      this.securityStore = securityStore;
+      
+      this.connectionManager = connectionManager;
    }
    
    /*
@@ -64,7 +92,7 @@
       return ClientConnectionFactoryImpl.id;
    }
 
-   public Packet doHandle(Packet packet, PacketSender sender) throws Exception
+   public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
    {
       Packet response = null;
      
@@ -87,11 +115,9 @@
       return response;
    }
 
-   private CreateConnectionResponse
-      createConnection(String username,
-                              String password,
-                              String remotingSessionID, String clientVMID, int prefetchSize,
-                              String address)
+   private CreateConnectionResponse createConnection(final String username, final String password,
+                              final String remotingClientSessionID, final String clientVMID, final int prefetchSize,
+                              final String clientAddress)
       throws Exception
    {
       log.trace("creating a new connection for user " + username);
@@ -101,39 +127,19 @@
       // up thread local immediately after we used the information, otherwise some other people
       // security my be screwed up, on account of thread local security stack being corrupted.
 
-      messagingServer.getSecurityManager().authenticate(username, password);
+      securityStore.authenticate(username, password);
 
       // We don't need the SubjectContext on thread local anymore, clean it up
       SecurityActions.popSubjectContext();
 
-      //Client ID is a JMS concept and does not belong on the server
-      
-//      String clientIDUsed = clientID;
-//
-//      // see if there is a preconfigured client id for the user
-//      if (username != null)
-//      {
-//         String preconfClientID =
-//            messagingServer.getJmsUserManagerInstance().getPreConfiguredClientID(username);
-//
-//         if (preconfClientID != null)
-//         {
-//            clientIDUsed = preconfClientID;
-//         }
-//      }
+      final ServerConnection connection =
+         new ServerConnectionEndpoint(username, password,
+                                      remotingClientSessionID, clientVMID, clientAddress,
+                                      prefetchSize, dispatcher, resourceManager, persistenceManager,
+                                      postOffice, securityStore, connectionManager);
 
-      // create the corresponding "server-side" connection endpoint and register it with the
-      // server peer's ClientManager
-      final ServerConnectionEndpoint endpoint =
-         new ServerConnectionEndpoint(messagingServer, username, password, prefetchSize,
-                                      remotingSessionID, clientVMID, address);
+      dispatcher.register(new ServerConnectionPacketHandler(connection));
 
-      String connectionID = endpoint.getConnectionID();
-
-      messagingServer.getRemotingService().getDispatcher().register(endpoint.newHandler());
-
-      log.trace("created and registered " + endpoint);
-
-      return new CreateConnectionResponse(connectionID);
+      return new CreateConnectionResponse(connection.getID());
    }
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -29,6 +29,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.UUID;
 
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.Message;
@@ -67,18 +68,19 @@
    // Attributes -----------------------------------------------------------------------------------
 
    private final String id;
-   private final ServerSessionEndpoint session;
+   private final ServerSession session;
    private final Queue destination;
    private final Filter filter;
    private Iterator iterator;
 
    // Constructors ---------------------------------------------------------------------------------
 
-   ServerBrowserEndpoint(ServerSessionEndpoint session, String id,
+   ServerBrowserEndpoint(ServerSession session,
                          Queue destination, String messageFilter) throws Exception
    {     
       this.session = session;
-      this.id = id;
+      id = UUID.randomUUID().toString();
+      
       this.destination = destination;
 
 		if (messageFilter != null)
@@ -93,6 +95,11 @@
 
    // BrowserEndpoint implementation ---------------------------------------------------------------
 
+   public String getID()
+   {
+   	return id;
+   }
+   
    public void reset() throws Exception
    {
       iterator = createIterator();
@@ -153,8 +160,6 @@
    {
       iterator = null;
       
-      session.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
-
       session.removeBrowser(id);
       
       log.trace(this + " closed");

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,66 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+import org.jboss.jms.server.SecurityStore;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionResponseMessage;
+
+/**
+ * 
+ * A ServerConnection
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ServerConnection
+{
+	String getID();
+	
+	ConnectionCreateSessionResponseMessage createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
+                                                        PacketSender sender) throws Exception;
+	
+	void start() throws Exception;
+	
+	void stop() throws Exception;
+	
+	void close() throws Exception;
+	
+	SecurityStore getSecurityStore();
+	
+	String getUsername();
+	
+	String getPassword();
+		
+	void removeSession(String sessionID) throws Exception;
+	
+	void addTemporaryQueue(Queue queue);
+	
+	void removeTemporaryQueue(Queue queue);
+	
+	boolean isStarted();
+	
+	long getCreatedTime();
+	
+	String getClientAddress();		
+}

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,11 +21,6 @@
   */
 package org.jboss.jms.server.endpoint;
 
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_CREATESESSION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_START;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_STOP;
-
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -37,19 +32,16 @@
 import org.jboss.jms.server.ConnectionManager;
 import org.jboss.jms.server.SecurityStore;
 import org.jboss.messaging.core.Binding;
-import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.PersistenceManager;
 import org.jboss.messaging.core.PostOffice;
 import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.ResourceManager;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionMessage;
 import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.Packet;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.util.MessagingException;
 
 /**
  * Concrete implementation of ConnectionEndpoint.
@@ -61,7 +53,7 @@
  *
  * $Id$
  */
-public class ServerConnectionEndpoint
+public class ServerConnectionEndpoint implements ServerConnection
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -75,87 +67,102 @@
 
    private final String id;
 
-   private volatile boolean started;
-
    private final String username;
    
    private final String password;
 
    private final String remotingClientSessionID;
    
-   private final String jmsClientVMID;
+   private final String clientAddress;
+      
+   private final int prefetchSize;
 
-   private final MessagingServer messagingServer;
-
+   private final PacketDispatcher dispatcher;
+   
+   private final ResourceManager resourceManager;
+   
+   private final PersistenceManager persistenceManager;   
+   
    private final PostOffice postOffice;
    
-   private final SecurityStore sm;
+   private final SecurityStore securityStore;
    
-   private final ConnectionManager cm;
+   private final ConnectionManager connectionManager;
 
-   private final ConcurrentMap<String, ServerSessionEndpoint> sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
+   private final long createdTime;
+         
+   private final ConcurrentMap<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
 
    private final Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
-
-   private final int prefetchSize;
-
-   private String clientAddress;
-   private long created;
-
+      
+   private volatile boolean started;
+   
+  
    // Constructors ---------------------------------------------------------------------------------
-
-   public ServerConnectionEndpoint(MessagingServer messagingServer,
-                                   String username, String password, int prefetchSize,
-                                   String remotingSessionID,
-                                   String clientVMID,
-                                   String clientAddress) throws Exception
+      
+   public ServerConnectionEndpoint(final String username, final String password,
+   		                          final String remotingClientSessionID, final String jmsClientVMID,
+   		                          final String clientAddress,
+   		                          final int prefetchSize, final PacketDispatcher dispatcher,
+   		                          final ResourceManager resourceManager,
+   		                          final PersistenceManager persistenceManager,
+   		                          final PostOffice postOffice, final SecurityStore securityStore,
+   		                          final ConnectionManager connectionManager)
    {
-      this.messagingServer = messagingServer;
-
-      sm = messagingServer.getSecurityManager();
-      cm = messagingServer.getConnectionManager();
-      postOffice = messagingServer.getPostOffice();
-
-      started = false;
-
-      this.id = UUID.randomUUID().toString();
+   	id = UUID.randomUUID().toString();
       
-      this.prefetchSize = prefetchSize;
-
-      this.username = username;
+   	this.username = username;
       
       this.password = password;
+      
+      this.remotingClientSessionID = remotingClientSessionID;
 
-      this.remotingClientSessionID = remotingSessionID;
-
-      this.jmsClientVMID = clientVMID;
-
       this.clientAddress = clientAddress;
 
-      created = System.currentTimeMillis();
+      this.prefetchSize = prefetchSize;
 
-      cm.registerConnection(jmsClientVMID, remotingClientSessionID, this);
+      this.dispatcher = dispatcher;
+      
+      this.resourceManager = resourceManager;
+      
+      this.persistenceManager = persistenceManager;
+      
+      this.postOffice = postOffice;
+      
+      this.securityStore = securityStore;
+      
+      this.connectionManager = connectionManager;
+      
+      started = false;
+      
+      createdTime = System.currentTimeMillis();
+
+      connectionManager.registerConnection(jmsClientVMID, remotingClientSessionID, this);
    }
 
-   // ConnectionDelegate implementation ------------------------------------------------------------
+   // ServerConnection implementation ------------------------------------------------------------
 
-   public ConnectionCreateSessionResponseMessage createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
-                                              PacketSender sender)
-      throws Exception
+   public String getID()
+   {
+   	return id;
+   }
+   
+   public ConnectionCreateSessionResponseMessage createSession(final boolean xa, final boolean autoCommitSends,
+   		                                                      final boolean autoCommitAcks,
+                                                               final PacketSender sender) throws Exception
    {           
-      String sessionID = UUID.randomUUID().toString();
- 
-      ServerSessionEndpoint ep =
-         new ServerSessionEndpoint(sessionID, this, autoCommitSends, autoCommitAcks, xa, sender, messagingServer.getResourceManager());            
+      ServerSession session =
+         new ServerSessionEndpoint(autoCommitSends, autoCommitAcks, prefetchSize, xa, this, resourceManager,
+         		sender, dispatcher, persistenceManager, postOffice);
 
       synchronized (sessions)
       {
-         sessions.put(sessionID, ep);
+         sessions.put(session.getID(), session);
       }
 
-      messagingServer.getRemotingService().getDispatcher().register(ep.newHandler());
+      dispatcher.register(new ServerSessionPacketHandler(session, prefetchSize));
       
-      return new ConnectionCreateSessionResponseMessage(sessionID);
+      return new ConnectionCreateSessionResponseMessage(session.getID());
    }
    
    public void start() throws Exception
@@ -170,9 +177,9 @@
 
    public void close() throws Exception
    {
-      Map<String, ServerSessionEndpoint> sessionsClone = new HashMap<String, ServerSessionEndpoint>(sessions);
+      Map<String, ServerSession> sessionsClone = new HashMap<String, ServerSession>(sessions);
       
-      for (ServerSessionEndpoint session: sessionsClone.values())
+      for (ServerSession session: sessionsClone.values())
       {
          session.close();
       }
@@ -197,13 +204,16 @@
 
       temporaryQueues.clear();      
 
-      cm.unregisterConnection(remotingClientSessionID, this);
+      connectionManager.unregisterConnection(remotingClientSessionID, this);
 
-      messagingServer.getRemotingService().getDispatcher().unregister(id);
+      dispatcher.unregister(id);
    }
-
-   // Public ---------------------------------------------------------------------------------------
-
+   
+   public SecurityStore getSecurityStore()
+   {
+      return securityStore;
+   }
+   
    public String getUsername()
    {
       return username;
@@ -213,152 +223,65 @@
    {
       return password;
    }
-
-   public long getCreated()
+   
+   public void removeSession(final String sessionId) throws Exception
    {
-      return created;
+      if (sessions.remove(sessionId) == null)
+      {
+         throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+      }      
    }
 
-   public String getClientAddress()
+   public void addTemporaryQueue(final Queue queue)
    {
-      return clientAddress;
+      temporaryQueues.add(queue);      
    }
-
-   public SecurityStore getSecurityManager()
+   
+   public void removeTemporaryQueue(final Queue queue)
    {
-      return sm;
+      temporaryQueues.remove(queue);      
    }
-
-   public MessagingServer getMessagingServer()
+   
+   public int getPrefetchSize()
    {
-      return messagingServer;
-   }
-
-   public PacketHandler newHandler()
-   {
-      return new ConnectionPacketHandler();
-   }
-
-   public String toString()
-   {
-      return "ConnectionEndpoint[" + id + "]";
-   }
-
-   // Package protected ----------------------------------------------------------------------------
-
-   int getPrefetchSize()
-   {
       return prefetchSize;
    }
-
-   String getConnectionID()
-   {
-      return id;
-   }
-
+   
    public boolean isStarted()
    {
       return started;
    }
-
-   void removeSession(String sessionId) throws Exception
+   
+   public long getCreatedTime()
    {
-      if (sessions.remove(sessionId) == null)
-      {
-         throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
-      }      
+      return createdTime;
    }
 
-   void addTemporaryQueue(Queue queue)
+   public String getClientAddress()
    {
-      temporaryQueues.add(queue);      
+      return clientAddress;
    }
-   
-   void removeTemporaryQueue(Queue queue)
-   {
-      temporaryQueues.remove(queue);      
-   }
 
-   String getRemotingClientSessionID()
+   // Public ---------------------------------------------------------------------------------------
+    
+   public String toString()
    {
-      return remotingClientSessionID;
+      return "ConnectionEndpoint[" + id + "]";
    }
-  
-   // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
    
-   private void setStarted(boolean started) throws Exception
+   private void setStarted(final boolean started) throws Exception
    {
-      Map<String, ServerSessionEndpoint> sessionsClone = null;
+      Map<String, ServerSession> sessionsClone = null;
       
-      sessionsClone = new HashMap<String, ServerSessionEndpoint>(sessions);
+      sessionsClone = new HashMap<String, ServerSession>(sessions);
             
-      for (ServerSessionEndpoint session: sessionsClone.values() )
+      for (ServerSession session: sessionsClone.values() )
       {
          session.setStarted(started);
       }
       
       this.started = started;      
-   }   
-    
-   // Inner classes --------------------------------------------------------------------------------
-
-   private class ConnectionPacketHandler extends ServerPacketHandlerSupport
-   {
-      public ConnectionPacketHandler()
-      {
-      }
-
-      public String getID()
-      {
-         return ServerConnectionEndpoint.this.id;
-      }
-
-      public Packet doHandle(Packet packet, PacketSender sender) throws Exception
-      {
-         Packet response = null;
-
-         PacketType type = packet.getType();
-         
-         if (type == CONN_CREATESESSION)
-         {
-            ConnectionCreateSessionMessage request = (ConnectionCreateSessionMessage) packet;
-            
-            response = createSession(request.isXA(), request.isAutoCommitSends(), request.isAutoCommitAcks(), sender);
-         }
-         else if (type == CONN_START)
-         {
-            start();
-         }
-         else if (type == CONN_STOP)
-         {
-            stop();
-         }
-         else if (type == CLOSE)
-         {
-            close();
-         }                       
-         else
-         {
-            throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
-                                         "Unsupported packet " + type);
-         }
-
-         // reply if necessary
-         if (response == null && packet.isOneWay() == false)
-         {
-            response = new NullPacket();               
-         }
-         
-         return response;
-      }
-
-      @Override
-      public String toString()
-      {
-         return "ConnectionAdvisedPacketHandler[id=" + id + "]";
-      }
-   }
-
+   }         
 }

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionPacketHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionPacketHandler.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,102 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_START;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_STOP;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.util.MessagingException;
+
+/**
+ * 
+ * A ServerConnectionPacketHandler
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ServerConnectionPacketHandler extends ServerPacketHandlerSupport
+{
+	private final ServerConnection connection;
+	
+   public ServerConnectionPacketHandler(final ServerConnection connection)
+   {
+   	this.connection = connection;
+   }
+
+   public String getID()
+   {
+      return connection.getID();
+   }
+
+   public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
+   {
+      Packet response = null;
+
+      PacketType type = packet.getType();
+      
+      if (type == CONN_CREATESESSION)
+      {
+         ConnectionCreateSessionMessage request = (ConnectionCreateSessionMessage) packet;
+         
+         response = connection.createSession(request.isXA(), request.isAutoCommitSends(), request.isAutoCommitAcks(), sender);
+      }
+      else if (type == CONN_START)
+      {
+         connection.start();
+      }
+      else if (type == CONN_STOP)
+      {
+         connection.stop();
+      }
+      else if (type == CLOSE)
+      {
+         connection.close();
+      }                       
+      else
+      {
+         throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
+                                      "Unsupported packet " + type);
+      }
+
+      // reply if necessary
+      if (response == null && packet.isOneWay() == false)
+      {
+         response = new NullPacket();               
+      }
+      
+      return response;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "ConnectionAdvisedPacketHandler[id=" + connection.getID() + "]";
+   }
+}

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumer.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumer.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,42 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+import org.jboss.messaging.core.Consumer;
+
+/**
+ * 
+ * A ServerConsumer
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ServerConsumer extends Consumer
+{
+	String getID();
+	
+	void close() throws Exception;
+	
+	void setStarted(boolean started) throws Exception;
+	
+	void receiveTokens(int tokens) throws Exception;
+}

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,27 +21,18 @@
  */
 package org.jboss.jms.server.endpoint;
 
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
-
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.jboss.messaging.core.Consumer;
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.HandleStatus;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.MessagingServer;
 import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.Packet;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
 import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.util.MessagingException;
 
 /**
  * Concrete implementation of a ClientConsumer. 
@@ -56,7 +47,7 @@
  * 
  * @version <tt>$Revision$</tt> $Id$
  */
-public class ServerConsumerEndpoint implements Consumer
+public class ServerConsumerEndpoint implements ServerConsumer
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -66,65 +57,76 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private boolean trace = log.isTraceEnabled();
+   private final boolean trace = log.isTraceEnabled();
 
    private final String id;
 
    private final Queue messageQueue;
-
-   private final ServerSessionEndpoint sessionEndpoint;
-
+   
    private final boolean noLocal;
 
    private final Filter filter;
-
-   private boolean started;
-
-   // This lock protects starting and stopping
-   private final Object startStopLock;
-
-   private final AtomicInteger availableTokens = new AtomicInteger(0);
    
    private final boolean autoDeleteQueue;
    
    private final boolean enableFlowControl;
    
+   private final String connectionID;   
+   
+   private final ServerSession sessionEndpoint;
+
    private final PersistenceManager persistenceManager;
+   
+   private final PostOffice postOffice;
+         
+   private final Object startStopLock = new Object();
 
+   private final AtomicInteger availableTokens = new AtomicInteger(0);
+   
+   private boolean started;
+
    // Constructors ---------------------------------------------------------------------------------
 
-   ServerConsumerEndpoint(MessagingServer sp, String id, Queue messageQueue,                          
-					           ServerSessionEndpoint sessionEndpoint, Filter filter,
-					           boolean noLocal, boolean autoDeleteQueue, boolean enableFlowControl)
+   ServerConsumerEndpoint(final Queue messageQueue, final boolean noLocal, final Filter filter,
+   		                 final boolean autoDeleteQueue, final boolean enableFlowControl,
+   		                 final String connectionID, final ServerSession sessionEndpoint,
+					           final PersistenceManager persistenceManager, final PostOffice postOffice,
+					           final boolean started)
    {
-      this.id = id;
-
+   	id = UUID.randomUUID().toString();
+      
       this.messageQueue = messageQueue;
-
-      this.sessionEndpoint = sessionEndpoint;
-
+      
       this.noLocal = noLocal;
 
-      this.startStopLock = new Object();
-
       this.filter = filter;
-                
-      this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
       
       this.autoDeleteQueue = autoDeleteQueue;
       
       this.enableFlowControl = enableFlowControl;
       
-      this.persistenceManager = sessionEndpoint.getConnectionEndpoint().getMessagingServer().getPersistenceManager();
+      this.connectionID = connectionID;
+
+      this.sessionEndpoint = sessionEndpoint;
+
+      this.persistenceManager = persistenceManager;
       
-      // adding the consumer to the queue
+      this.postOffice = postOffice;
+      
+      this.started = started;
+      
       messageQueue.addConsumer(this);
       
       messageQueue.deliver();
    }
 
-   // Receiver implementation ----------------------------------------------------------------------
+   // ServerConsumer implementation ----------------------------------------------------------------------
 
+   public String getID()
+   {
+   	return id;
+   }
+   
    public HandleStatus handle(MessageReference ref) throws Exception
    {
       if (enableFlowControl && availableTokens.get() == 0)
@@ -161,11 +163,9 @@
          {
             String conId = message.getConnectionID();
 
-            if (sessionEndpoint.getConnectionEndpoint().getConnectionID().equals(conId))
-            {
-            	PersistenceManager pm = sessionEndpoint.getConnectionEndpoint().getMessagingServer().getPersistenceManager();
-            	            	            	
-            	ref.acknowledge(pm);
+            if (connectionID.equals(conId))
+            {	            	
+            	ref.acknowledge(persistenceManager);
             	
              	return HandleStatus.HANDLED;
             }            
@@ -191,8 +191,6 @@
       }
    }
    
-   // Closeable implementation ---------------------------------------------------------------------
-
    public void close() throws Exception
    {
       if (trace)
@@ -204,56 +202,24 @@
 
       messageQueue.removeConsumer(this);
       
-      sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);     
-      
       if (autoDeleteQueue)
       {
          if (messageQueue.getConsumerCount() == 0)
-         {
-            MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
+         {  
+            postOffice.removeBinding(messageQueue.getName());
             
-            server.getPostOffice().removeBinding(messageQueue.getName());
-            
             if (messageQueue.isDurable())
             {
-               server.getPersistenceManager().deleteAllReferences(messageQueue);
+               persistenceManager.deleteAllReferences(messageQueue);
             }
          }
       }
       
       sessionEndpoint.removeConsumer(id);           
    }
-
-   // ConsumerEndpoint implementation --------------------------------------------------------------
-
-   public void receiveTokens(int tokens) throws Exception
-   {
-      availableTokens.addAndGet(tokens);
-
-      promptDelivery();      
-   }
-
-   // Public ---------------------------------------------------------------------------------------
-
-   public String toString()
-   {
-      return "ConsumerEndpoint[" + id + "]";
-   }
-
-   public PacketHandler newHandler()
-   {
-      return new ServerConsumerEndpointPacketHandler();
-   }
-
-   // Package protected ----------------------------------------------------------------------------
    
-   String getID()
+   public void setStarted(boolean started)
    {
-   	return this.id;
-   }
-
-   void setStarted(boolean started)
-   {
       boolean useStarted;
       
       synchronized (startStopLock)
@@ -269,61 +235,25 @@
          promptDelivery();
       }
    }
-    
-   // Protected ------------------------------------------------------------------------------------
+   
+   public void receiveTokens(int tokens) throws Exception
+   {
+      availableTokens.addAndGet(tokens);
 
-   // Private --------------------------------------------------------------------------------------
+      promptDelivery();      
+   }
 
-   private void promptDelivery()
+   // Public -----------------------------------------------------------------------------
+     
+   public String toString()
    {
-      sessionEndpoint.promptDelivery(messageQueue);
+      return "ConsumerEndpoint[" + id + "]";
    }
-
-   // Inner classes --------------------------------------------------------------------------------
    
-   private class ServerConsumerEndpointPacketHandler extends ServerPacketHandlerSupport
-   {
+   // Private --------------------------------------------------------------------------------------
 
-      public String getID()
-      {
-         return ServerConsumerEndpoint.this.id;
-      }
-
-      public Packet doHandle(Packet packet, PacketSender sender) throws Exception
-      {
-         Packet response = null;
-
-         PacketType type = packet.getType();
-         
-         if (type == CONS_FLOWTOKEN)
-         {
-            ConsumerFlowTokenMessage message = (ConsumerFlowTokenMessage) packet;
-            
-            receiveTokens(message.getTokens());
-         }
-         else if (type == CLOSE)
-         {
-            close();
-         }
-         else
-         {
-            throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
-                  "Unsupported packet " + type);
-         }
-
-         // reply if necessary
-         if (response == null && packet.isOneWay() == false)
-         {
-            response = new NullPacket();               
-         }
-         
-         return response;
-      }
-
-      @Override
-      public String toString()
-      {
-         return "ServerConsumerEndpointPacketHandler[id=" + id + "]";
-      }
-   }
+   private void promptDelivery()
+   {
+      sessionEndpoint.promptDelivery(messageQueue);
+   } 
 }

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerPacketHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerPacketHandler.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,92 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.util.MessagingException;
+
+/**
+ * 
+ * A ServerConsumerPacketHandler
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ServerConsumerPacketHandler extends ServerPacketHandlerSupport
+{
+	private final ServerConsumer consumer;
+	
+	public ServerConsumerPacketHandler(final ServerConsumer consumer)
+	{
+		this.consumer = consumer;
+	}
+
+   public String getID()
+   {
+      return consumer.getID();
+   }
+
+   public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
+   {
+      Packet response = null;
+
+      PacketType type = packet.getType();
+      
+      if (type == CONS_FLOWTOKEN)
+      {
+         ConsumerFlowTokenMessage message = (ConsumerFlowTokenMessage) packet;
+         
+         consumer.receiveTokens(message.getTokens());
+      }
+      else if (type == CLOSE)
+      {
+         consumer.close();
+      }
+      else
+      {
+         throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
+               "Unsupported packet " + type);
+      }
+
+      // reply if necessary
+      if (response == null && packet.isOneWay() == false)
+      {
+         response = new NullPacket();               
+      }
+      
+      return response;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "ServerConsumerEndpointPacketHandler[id=" + consumer.getID() + "]";
+   }
+}

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerPacketHandlerSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerPacketHandlerSupport.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerPacketHandlerSupport.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -1,3 +1,24 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
 package org.jboss.jms.server.endpoint;
 
 import org.jboss.logging.Logger;

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,112 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAResponseMessage;
+
+/**
+ * 
+ * A ServerSession
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ServerSession
+{
+	String getID();
+	
+	void removeBrowser(String browserID) throws Exception;
+	
+	void removeConsumer(String consumerID) throws Exception;
+	
+	void close() throws Exception;
+	
+	void setStarted(boolean started) throws Exception;
+	
+	void handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
+	
+	void promptDelivery(Queue queue);
+	
+	boolean send(String address, Message msg) throws Exception;
+
+   void acknowledge(long deliveryID, boolean allUpTo) throws Exception;
+
+   void rollback() throws Exception;
+
+   void cancel(long deliveryID, boolean expired) throws Exception;
+
+   void commit() throws Exception;
+
+   SessionXAResponseMessage XACommit(boolean onePhase, Xid xid) throws Exception;
+
+   SessionXAResponseMessage XAEnd(Xid xid, boolean failed) throws Exception;
+
+   SessionXAResponseMessage XAForget(Xid xid);
+
+   SessionXAResponseMessage XAJoin(Xid xid) throws Exception;
+
+   SessionXAResponseMessage XAPrepare(Xid xid) throws Exception;
+
+   SessionXAResponseMessage XAResume(Xid xid) throws Exception;
+
+   SessionXAResponseMessage XARollback(Xid xid) throws Exception;
+
+   SessionXAResponseMessage XAStart(Xid xid);
+
+   SessionXAResponseMessage XASuspend() throws Exception;
+
+   List<Xid> getInDoubtXids() throws Exception;
+
+   int getXATimeout();
+
+   boolean setXATimeout(int timeoutSeconds);
+
+   void addAddress(String address) throws Exception;
+
+   void removeAddress(String address) throws Exception;
+
+   void createQueue(String address, String queueName, String filterString, boolean durable, boolean temporary) throws Exception;
+
+   void deleteQueue(String queueName) throws Exception;
+
+   SessionCreateConsumerResponseMessage  createConsumer(String queueName, String filterString,
+                     boolean noLocal, boolean autoDeleteQueue, int prefetchSize) throws Exception;
+
+   SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception;
+
+   SessionBindingQueryResponseMessage executeBindingQuery(SessionBindingQueryMessage request) throws Exception;
+
+   SessionCreateBrowserResponseMessage createBrowser(String queueName, String selector) throws Exception;
+}

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,31 +21,6 @@
  */
 package org.jboss.jms.server.endpoint;
 
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ACKNOWLEDGE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CANCEL;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_GET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_JOIN;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_PREPARE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_RESUME;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_ROLLBACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_START;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SUSPEND;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -68,7 +43,7 @@
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.PersistenceManager;
 import org.jboss.messaging.core.PostOffice;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.ResourceManager;
@@ -76,39 +51,15 @@
 import org.jboss.messaging.core.impl.DeliveryImpl;
 import org.jboss.messaging.core.impl.TransactionImpl;
 import org.jboss.messaging.core.impl.filter.FilterImpl;
-import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.Packet;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
-import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAPrepareMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
@@ -126,13 +77,12 @@
  *
  * $Id$
  */
-public class ServerSessionEndpoint
+public class ServerSessionEndpoint implements ServerSession
 {
    // Constants
    // ------------------------------------------------------------------------------------
 
-   private static final Logger log = Logger
-         .getLogger(ServerSessionEndpoint.class);
+   private static final Logger log = Logger.getLogger(ServerSessionEndpoint.class);
 
    // Static
    // ---------------------------------------------------------------------------------------
@@ -145,17 +95,27 @@
    private final boolean trace = log.isTraceEnabled();
 
    private final String id;
+   
+   private final boolean autoCommitSends;
 
-   private final ServerConnectionEndpoint connectionEndpoint;
+   private final boolean autoCommitAcks;
+   
+   private final ServerConnection connection;
+   
+   private final ResourceManager resourceManager;
 
-   private final MessagingServer sp;
+   private final PacketSender sender;
+   
+   private final PacketDispatcher dispatcher;
+   
+   private final PersistenceManager persistenceManager;
+   
+   private final PostOffice postOffice;
+         
+   private final Map<String, ServerConsumer> consumers = new ConcurrentHashMap<String, ServerConsumer>();
 
-   private final Map<String, ServerConsumerEndpoint> consumers = new ConcurrentHashMap<String, ServerConsumerEndpoint>();
-
    private final Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
 
-   private final PostOffice postOffice;
-
    private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
 
    private long deliveryIDSequence = 0;
@@ -164,80 +124,79 @@
 
    private Transaction tx;
 
-   private final boolean autoCommitSends;
-
-   private final boolean autoCommitAcks;
-
-   private final ResourceManager resourceManager;
-
-   private PacketSender sender;
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
-   ServerSessionEndpoint(String sessionID,
-         ServerConnectionEndpoint connectionEndpoint, boolean autoCommitSends,
-         boolean autoCommitAcks, boolean xa, PacketSender sender, ResourceManager resourceManager)
-         throws Exception
+   ServerSessionEndpoint(final boolean autoCommitSends,
+                         final boolean autoCommitAcks, final int prefetchSize,
+                         final boolean xa, final ServerConnection connection,
+                         final ResourceManager resourceManager, final PacketSender sender, 
+                         final PacketDispatcher dispatcher, final PersistenceManager persistenceManager,
+                         final PostOffice postOffice) throws Exception
    {
-      this.id = sessionID;
+   	id = UUID.randomUUID().toString();
+            
+      this.autoCommitSends = autoCommitSends;
 
-      this.connectionEndpoint = connectionEndpoint;
-
-      sp = connectionEndpoint.getMessagingServer();
-
-      postOffice = sp.getPostOffice();
-
+      this.autoCommitAcks = autoCommitAcks;
+      
       if (!xa)
       {
          tx = new TransactionImpl();
       }
 
-      this.autoCommitSends = autoCommitSends;
+      this.connection = connection;
 
-      this.autoCommitAcks = autoCommitAcks;
-      
-      this.sender = sender;
-
       this.resourceManager = resourceManager;
+            
+      this.sender = sender;
+    
+      this.dispatcher = dispatcher;
       
+      this.persistenceManager = persistenceManager;
+      
+      this.postOffice = postOffice;
+            
       if (log.isTraceEnabled())
+      {
          log.trace("created server session endpoint for " + sender.getRemoteAddress());
+      }
    }
 
-   // Public
+   // ServerSession implementation
    // ---------------------------------------------------------------------------------------
-
-   public ServerConnectionEndpoint getConnectionEndpoint()
+   
+   public String getID()
    {
-      return connectionEndpoint;
+   	return id;
    }
-
-   public String toString()
+   
+   public ServerConnection getConnection()
    {
-      return "SessionEndpoint[" + id + "]";
+      return connection;
    }
 
-   // Package protected
-   // ----------------------------------------------------------------------------
-
-   void removeBrowser(String browserId) throws Exception
+   public void removeBrowser(final String browserId) throws Exception
    {
       if (browsers.remove(browserId) == null)
       {
          throw new IllegalStateException("Cannot find browser with id " + browserId + " to remove");
       }
+      
+      dispatcher.unregister(browserId);           
    }
 
-   void removeConsumer(String consumerId) throws Exception
+   public void removeConsumer(final String consumerId) throws Exception
    {
       if (consumers.remove(consumerId) == null)
       {
          throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
       }
+      
+      dispatcher.unregister(consumerId);           
    }
-
-   synchronized void handleDelivery(MessageReference ref, ServerConsumerEndpoint consumer) throws Exception
+   
+   public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
    {
       Delivery delivery = new DeliveryImpl(ref, consumer.getID(), deliveryIDSequence++, sender);
 
@@ -245,34 +204,22 @@
 
       delivery.deliver();
    }
-
-   void setStarted(boolean s) throws Exception
+   
+   public void setStarted(final boolean s) throws Exception
    {
-      Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+      Map<String, ServerConsumer> consumersClone = new HashMap<String, ServerConsumer>(consumers);
 
-      for (ServerConsumerEndpoint consumer: consumersClone.values())
+      for (ServerConsumer consumer: consumersClone.values())
       {
          consumer.setStarted(s);
       }
    }
 
-   void promptDelivery(final Queue queue)
-   {
-      // TODO - do we really need to prompt on a different thread?
-      executor.execute(new Runnable()
-      {
-         public void run()
-         {
-            queue.deliver();
-         }
-      });
-   }
-
    public void close() throws Exception
    {
-      Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+      Map<String, ServerConsumer> consumersClone = new HashMap<String, ServerConsumer>(consumers);
 
-      for (ServerConsumerEndpoint consumer: consumersClone.values())
+      for (ServerConsumer consumer: consumersClone.values())
       {
          consumer.close();
       }
@@ -296,20 +243,32 @@
 
       deliveries.clear();
 
-      connectionEndpoint.removeSession(id);
+      connection.removeSession(id);
 
-      connectionEndpoint.getMessagingServer().getRemotingService()
-            .getDispatcher().unregister(id);
+      dispatcher.unregister(id);
    }
-
-   private boolean send(String address, Message msg) throws Exception
+   
+   public void promptDelivery(final Queue queue)
    {
+      // TODO - do we really need to prompt on a different thread?
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            queue.deliver();
+         }
+      });
+   }
+   
+   public boolean send(final String address, final Message msg) throws Exception
+   {
       //check the address exists, if it doesnt add if the user has the correct privileges
-      if(!postOffice.containsAllowableAddress(address))
+      if (!postOffice.containsAllowableAddress(address))
       {
          try
          {
-            security.check(address, CheckType.CREATE, getConnectionEndpoint());
+            security.check(address, CheckType.CREATE, connection);
+            
             postOffice.addAllowableAddress(address);
          }
          catch (MessagingException e)
@@ -318,15 +277,15 @@
          }
       }
       //check the user has write access to this address
-      security.check(address, CheckType.WRITE, getConnectionEndpoint());
+      security.check(address, CheckType.WRITE, connection);
       // Assign the message an internal id - this is used to key it in the store
-      msg.setMessageID(sp.getPersistenceManager().generateMessageID());
+      msg.setMessageID(persistenceManager.generateMessageID());
 
       // This allows the no-local consumers to filter out the messages that come
       // from the same
       // connection.
 
-      msg.setConnectionID(connectionEndpoint.getConnectionID());
+      msg.setConnectionID(connection.getID());
 
       postOffice.route(address, msg);
 
@@ -342,7 +301,7 @@
          {
             if (msg.getNumDurableReferences() != 0)
             {
-               sp.getPersistenceManager().addMessage(msg);
+               persistenceManager.addMessage(msg);
             }
 
             msg.send();
@@ -356,8 +315,7 @@
       }
    }
 
-   private synchronized void acknowledge(long deliveryID, boolean allUpTo)
-         throws Exception
+   public synchronized void acknowledge(final long deliveryID, final boolean allUpTo) throws Exception
    {
       // Note that we do not consider it an error if the deliveries cannot be
       // found to be acked.
@@ -391,7 +349,7 @@
 
                if (autoCommitAcks)
                {
-                  ref.acknowledge(sp.getPersistenceManager());
+                  ref.acknowledge(persistenceManager);
                }
                else
                {
@@ -429,7 +387,7 @@
 
                if (autoCommitAcks)
                {
-                  ref.acknowledge(sp.getPersistenceManager());
+                  ref.acknowledge(persistenceManager);
                }
                else
                {
@@ -445,7 +403,7 @@
       }
    }
 
-   private void rollback() throws Exception
+   public void rollback() throws Exception
    {
       if (tx == null)
       {
@@ -471,10 +429,10 @@
          deliveryIDSequence -= tx.getAcknowledgementsCount();
       }
 
-      tx.rollback(sp.getPersistenceManager());
+      tx.rollback(persistenceManager);
    }
 
-   private void cancel(long deliveryID, boolean expired) throws Exception
+   public void cancel(final long deliveryID, final boolean expired) throws Exception
    {
       if (deliveryID == -1)
       {
@@ -494,7 +452,7 @@
             deliveries.clear();
          }
 
-         cancelTx.rollback(sp.getPersistenceManager());
+         cancelTx.rollback(persistenceManager);
       }
       else if (expired)
       {
@@ -511,7 +469,7 @@
 
             if (delivery.getDeliveryID() == deliveryID)
             {
-               delivery.getReference().expire(sp.getPersistenceManager());
+               delivery.getReference().expire(persistenceManager);
 
                iter.remove();
 
@@ -525,13 +483,12 @@
       }
    }
 
-   private void commit() throws Exception
+   public void commit() throws Exception
    {
-      tx.commit(true, sp.getPersistenceManager());
+      tx.commit(true, persistenceManager);
    }
 
-   private SessionXAResponseMessage XACommit(boolean onePhase, Xid xid)
-         throws Exception
+   public SessionXAResponseMessage XACommit(final boolean onePhase, final Xid xid) throws Exception
    {
       if (tx != null)
       {
@@ -554,7 +511,7 @@
             XAException.XAER_PROTO,
             "Cannot commit transaction, it is suspended " + xid); }
 
-      theTx.commit(onePhase, sp.getPersistenceManager());
+      theTx.commit(onePhase, persistenceManager);
 
       boolean removed = resourceManager.removeTransaction(xid);
 
@@ -568,7 +525,7 @@
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
 
-   private SessionXAResponseMessage XAEnd(Xid xid, boolean failed) throws Exception
+   public SessionXAResponseMessage XAEnd(final Xid xid, final boolean failed) throws Exception
    {
       if (tx != null && tx.getXid().equals(xid))
       {
@@ -610,7 +567,7 @@
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
 
-   private SessionXAResponseMessage XAForget(Xid xid)
+   public SessionXAResponseMessage XAForget(final Xid xid)
    {
       // Do nothing since we don't support heuristic commits / rollback from the
       // resource manager
@@ -618,7 +575,7 @@
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
 
-   private SessionXAResponseMessage XAJoin(Xid xid) throws Exception
+   public SessionXAResponseMessage XAJoin(final Xid xid) throws Exception
    {
       Transaction theTx = resourceManager.getTransaction(xid);
 
@@ -637,7 +594,7 @@
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
 
-   private SessionXAResponseMessage XAPrepare(Xid xid) throws Exception
+   public SessionXAResponseMessage XAPrepare(final Xid xid) throws Exception
    {
       if (tx != null)
       {
@@ -677,13 +634,13 @@
       }
       else
       {
-         theTx.prepare(sp.getPersistenceManager());
+         theTx.prepare(persistenceManager);
 
          return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
       }
    }
 
-   private SessionXAResponseMessage XAResume(Xid xid) throws Exception
+   public SessionXAResponseMessage XAResume(final Xid xid) throws Exception
    {
       if (tx != null)
       {
@@ -713,7 +670,7 @@
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
 
-   private SessionXAResponseMessage XARollback(Xid xid) throws Exception
+   public SessionXAResponseMessage XARollback(final Xid xid) throws Exception
    {
       if (tx != null)
       {
@@ -736,7 +693,7 @@
             XAException.XAER_PROTO,
             "Cannot rollback transaction, it is suspended " + xid); }
 
-      theTx.rollback(sp.getPersistenceManager());
+      theTx.rollback(persistenceManager);
 
       boolean removed = resourceManager.removeTransaction(xid);
 
@@ -750,7 +707,7 @@
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
 
-   private SessionXAResponseMessage XAStart(Xid xid)
+   public SessionXAResponseMessage XAStart(final Xid xid)
    {
       if (tx != null)
       {
@@ -775,7 +732,7 @@
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
 
-   private SessionXAResponseMessage XASuspend() throws Exception
+   public SessionXAResponseMessage XASuspend() throws Exception
    {
       if (tx == null)
       {
@@ -800,38 +757,32 @@
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
 
-   private List<Xid> getInDoubtXids() throws Exception
+   public List<Xid> getInDoubtXids() throws Exception
    {
       return null;
    }
 
-   private int getXATimeout()
+   public int getXATimeout()
    {
       return resourceManager.getTimeoutSeconds();
    }
 
-   private boolean setXATimeout(int timeoutSeconds)
+   public boolean setXATimeout(int timeoutSeconds)
    {
       return resourceManager.setTimeoutSeconds(timeoutSeconds);
    }
 
-   // Protected
-   // ------------------------------------------------------------------------------------
-
-   // Private
-   // --------------------------------------------------------------------------------------
-
-   private void addAddress(String address) throws Exception
+   public void addAddress(final String address) throws Exception
    {
       if (postOffice.containsAllowableAddress(address))
       {
          throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
       }
-      security.check(address, CheckType.CREATE, getConnectionEndpoint());
+      security.check(address, CheckType.CREATE, connection);
       postOffice.addAllowableAddress(address);
    }
 
-   private void removeAddress(String address) throws Exception
+   public void removeAddress(final String address) throws Exception
    {
       if (!postOffice.removeAllowableAddress(address))
       {
@@ -839,16 +790,15 @@
       }
    }
 
-   private void createQueue(String address, String queueName,
-         String filterString, boolean durable, boolean temporary)
-         throws Exception
+   public void createQueue(final String address, final String queueName,
+         final String filterString, boolean durable, final boolean temporary) throws Exception
    {
       //make sure the user has privileges to create this address
-      if(!postOffice.containsAllowableAddress(address))
+      if (!postOffice.containsAllowableAddress(address))
       {
          try
          {
-            security.check(address, CheckType.CREATE, getConnectionEndpoint());
+            security.check(address, CheckType.CREATE, connection);
             postOffice.addAllowableAddress(address);
          }
          catch (MessagingException e)
@@ -882,11 +832,11 @@
       {
          Queue queue = binding.getQueue();
 
-         connectionEndpoint.addTemporaryQueue(queue);
+         connection.addTemporaryQueue(queue);
       }
    }
 
-   private void deleteQueue(String queueName) throws Exception
+   public void deleteQueue(final String queueName) throws Exception
    {
       Binding binding = postOffice.removeBinding(queueName);
 
@@ -899,17 +849,18 @@
 
       if (queue.isDurable())
       {
-         sp.getPersistenceManager().deleteAllReferences(binding.getQueue());
+      	persistenceManager.deleteAllReferences(binding.getQueue());
       }
 
       if (queue.isTemporary())
       {
-         connectionEndpoint.removeTemporaryQueue(queue);
+         connection.removeTemporaryQueue(queue);
       }
    }
 
-   private SessionCreateConsumerResponseMessage createConsumer(String queueName,  String filterString,
-                                                 boolean noLocal, boolean autoDeleteQueue) throws Exception
+   public SessionCreateConsumerResponseMessage
+      createConsumer(final String queueName, final String filterString,
+                     final boolean noLocal, final boolean autoDeleteQueue, final int prefetchSize) throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
 
@@ -917,11 +868,9 @@
       {
          throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
       }
-      security.check(binding.getAddress(), CheckType.READ, getConnectionEndpoint());
-      int prefetchSize = connectionEndpoint.getPrefetchSize();
-
-      String consumerID = UUID.randomUUID().toString();
-
+      
+      security.check(binding.getAddress(), CheckType.READ, connection);
+      
       Filter filter = null;
 
       if (filterString != null)
@@ -929,26 +878,26 @@
          filter = new FilterImpl(filterString);
       }
 
-      ServerConsumerEndpoint ep = new ServerConsumerEndpoint(sp, consumerID,
-            binding.getQueue(), this, filter, noLocal, autoDeleteQueue, prefetchSize > 0);
+      ServerConsumer consumer =
+      	new ServerConsumerEndpoint(binding.getQueue(), noLocal, filter, autoDeleteQueue, prefetchSize > 0, connection.getID(),
+                                    this, persistenceManager, postOffice, connection.isStarted());
 
-      connectionEndpoint.getMessagingServer().getRemotingService()
-            .getDispatcher().register(ep.newHandler());
+      dispatcher.register(new ServerConsumerPacketHandler(consumer));
 
-      SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(consumerID,
+      SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(consumer.getID(),
             prefetchSize);
 
       synchronized (consumers)
       {
-         consumers.put(consumerID, ep);
+         consumers.put(consumer.getID(), consumer);
       }
 
-      log.trace(this + " created and registered " + ep);
+      log.trace(this + " created and registered " + consumer);
 
       return response;
    }
 
-   public SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception
+   public SessionQueueQueryResponseMessage executeQueueQuery(final SessionQueueQueryMessage request) throws Exception
    {
       if (request.getQueueName() == null)
       {
@@ -979,7 +928,7 @@
       return response;
    }
 
-   public SessionBindingQueryResponseMessage executeBindingQuery(SessionBindingQueryMessage request) throws Exception
+   public SessionBindingQueryResponseMessage executeBindingQuery(final SessionBindingQueryMessage request) throws Exception
    {
       if (request.getAddress() == null)
       {
@@ -1003,14 +952,15 @@
       return new SessionBindingQueryResponseMessage(exists, queueNames);
    }
 
-   private SessionCreateBrowserResponseMessage createBrowser(String queueName, String selector)
+   public SessionCreateBrowserResponseMessage createBrowser(final String queueName, final String selector)
          throws Exception
    {
       if(!postOffice.containsAllowableAddress(queueName))
       {
          try
          {
-            security.check(queueName, CheckType.CREATE, this.getConnectionEndpoint());
+            security.check(queueName, CheckType.CREATE, connection);
+            
             postOffice.addAllowableAddress(queueName);
          }
          catch (MessagingException e)
@@ -1024,224 +974,30 @@
       {
          throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
       }
-      security.check(binding.getAddress(), CheckType.READ, this.getConnectionEndpoint());
-      String browserID = UUID.randomUUID().toString();
+      security.check(binding.getAddress(), CheckType.READ, connection);
+      
+      ServerBrowserEndpoint browser = new ServerBrowserEndpoint(this, binding.getQueue(), selector);
 
-      ServerBrowserEndpoint ep = new ServerBrowserEndpoint(this, browserID,
-            binding.getQueue(), selector);
-
       // still need to synchronized since close() can come in on a different
       // thread
       synchronized (browsers)
       {
-         browsers.put(browserID, ep);
+         browsers.put(browser.getID(), browser);
       }
 
-      connectionEndpoint.getMessagingServer().getRemotingService()
-            .getDispatcher().register(ep.newHandler());
+      dispatcher.register(browser.newHandler());
 
-      log.trace(this + " created and registered " + ep);
+      log.trace(this + " created and registered " + browser);
 
-      return new SessionCreateBrowserResponseMessage(browserID);
+      return new SessionCreateBrowserResponseMessage(browser.getID());
    }
-
-   public PacketHandler newHandler()
+   
+   // Public ---------------------------------------------------------------------------------------------
+   
+   public String toString()
    {
-      return new ServerSessionEndpointPacketHandler();
+      return "SessionEndpoint[" + id + "]";
    }
+   
 
-   // Inner classes
-   // --------------------------------------------------------------------------------
-
-   private class ServerSessionEndpointPacketHandler extends ServerPacketHandlerSupport
-   {
-      public ServerSessionEndpointPacketHandler()
-      {
-      }
-
-      public String getID()
-      {
-         return ServerSessionEndpoint.this.id;
-      }
-
-      public Packet doHandle(Packet packet, PacketSender sender)
-            throws Exception
-      {
-         Packet response = null;
-
-         PacketType type = packet.getType();
-
-         // TODO use a switch for this
-         if (type == SESS_SEND)
-         {
-            SessionSendMessage message = (SessionSendMessage) packet;
-
-            send(message.getAddress(), message.getMessage());
-         }
-         else if (type == SESS_CREATECONSUMER)
-         {
-            SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
-
-            response = createConsumer(request.getQueueName(), request
-                  .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue());
-         }
-         else if (type == SESS_CREATEQUEUE)
-         {
-            SessionCreateQueueMessage request = (SessionCreateQueueMessage) packet;
-
-            createQueue(request.getAddress(), request.getQueueName(), request
-                  .getFilterString(), request.isDurable(), request
-                  .isTemporary());
-         }
-         else if (type == SESS_DELETE_QUEUE)
-         {
-            SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
-
-            deleteQueue(request.getQueueName());
-         }
-         else if (type == SESS_QUEUEQUERY)
-         {
-            SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
-
-            response = executeQueueQuery(request);
-         }
-         else if (type == SESS_BINDINGQUERY)
-         {
-            SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
-
-            response = executeBindingQuery(request);
-         }
-         else if (type == SESS_CREATEBROWSER)
-         {
-            SessionCreateBrowserMessage request = (SessionCreateBrowserMessage) packet;
-
-            response = createBrowser(request.getQueueName(), request
-                  .getFilterString());
-         }
-         else if (type == CLOSE)
-         {
-            close();
-         }
-         else if (type == SESS_ACKNOWLEDGE)
-         {
-            SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
-
-            acknowledge(message.getDeliveryID(), message.isAllUpTo());
-         }
-         else if (type == SESS_COMMIT)
-         {
-            commit();
-         }
-         else if (type == SESS_ROLLBACK)
-         {
-            rollback();
-         }
-         else if (type == SESS_CANCEL)
-         {
-            SessionCancelMessage message = (SessionCancelMessage) packet;
-
-            cancel(message.getDeliveryID(), message.isExpired());
-         }
-         else if (type == SESS_XA_COMMIT)
-         {
-            SessionXACommitMessage message = (SessionXACommitMessage) packet;
-
-            response = XACommit(message.isOnePhase(), message.getXid());
-         }
-         else if (type == SESS_XA_END)
-         {
-            SessionXAEndMessage message = (SessionXAEndMessage) packet;
-
-            response = XAEnd(message.getXid(), message.isFailed());
-         }
-         else if (type == SESS_XA_FORGET)
-         {
-            SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
-
-            response = XAForget(message.getXid());
-         }
-         else if (type == SESS_XA_JOIN)
-         {
-            SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
-
-            response = XAJoin(message.getXid());
-         }
-         else if (type == SESS_XA_RESUME)
-         {
-            SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
-
-            response = XAResume(message.getXid());
-         }
-         else if (type == SESS_XA_ROLLBACK)
-         {
-            SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
-
-            response = XARollback(message.getXid());
-         }
-         else if (type == SESS_XA_START)
-         {
-            SessionXAStartMessage message = (SessionXAStartMessage) packet;
-
-            response = XAStart(message.getXid());
-         }
-         else if (type == SESS_XA_SUSPEND)
-         {
-            response = XASuspend();
-         }
-         else if (type == SESS_XA_PREPARE)
-         {
-            SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
-
-            response = XAPrepare(message.getXid());
-         }
-         else if (type == SESS_XA_INDOUBT_XIDS)
-         {
-            List<Xid> xids = getInDoubtXids();
-
-            response = new SessionXAGetInDoubtXidsResponseMessage(xids);
-         }
-         else if (type == SESS_XA_GET_TIMEOUT)
-         {
-            response = new SessionXAGetTimeoutResponseMessage(getXATimeout());
-         }
-         else if (type == SESS_XA_SET_TIMEOUT)
-         {
-            SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet;
-
-            response = new SessionXASetTimeoutResponseMessage(setXATimeout(message
-                  .getTimeoutSeconds()));
-         }
-         else if (type == PacketType.SESS_ADD_ADDRESS)
-         {
-            SessionAddAddressMessage message = (SessionAddAddressMessage) packet;
-
-            addAddress(message.getAddress());
-         }
-         else if (type == PacketType.SESS_REMOVE_ADDRESS)
-         {
-            SessionRemoveAddressMessage message = (SessionRemoveAddressMessage) packet;
-
-            removeAddress(message.getAddress());
-         }
-         else
-         {
-            throw new MessagingException(MessagingException.UNSUPPORTED_PACKET, "Unsupported packet " + type);
-         }
-
-         // reply if necessary
-         if (response == null && packet.isOneWay() == false)
-         {
-            response = new NullPacket();
-         }
-
-         return response;
-      }
-
-      @Override
-      public String toString()
-      {
-         return "ServerSessionEndpointPacketHandler[id=" + id + "]";
-      }
-   }
-
 }

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,286 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CANCEL;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_PREPARE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_RESUME;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_ROLLBACK;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_START;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SUSPEND;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.util.MessagingException;
+
+
+/**
+ * 
+ * A ServerSessionPacketHandler
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ServerSessionPacketHandler extends ServerPacketHandlerSupport
+{
+	private final ServerSession session;
+	
+	private final int prefetchSize;
+	
+	public ServerSessionPacketHandler(final ServerSession session, final int prefetchSize)
+   {
+		this.session = session;
+		
+		this.prefetchSize = prefetchSize;
+   }
+
+   public String getID()
+   {
+      return session.getID();
+   }
+
+   public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
+   {
+      Packet response = null;
+
+      PacketType type = packet.getType();
+
+      // TODO use a switch for this
+      if (type == SESS_SEND)
+      {
+         SessionSendMessage message = (SessionSendMessage) packet;
+
+         session.send(message.getAddress(), message.getMessage());
+      }
+      else if (type == SESS_CREATECONSUMER)
+      {
+         SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
+
+         response = session.createConsumer(request.getQueueName(), request
+               .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue(), prefetchSize);
+      }
+      else if (type == SESS_CREATEQUEUE)
+      {
+         SessionCreateQueueMessage request = (SessionCreateQueueMessage) packet;
+
+         session.createQueue(request.getAddress(), request.getQueueName(), request
+               .getFilterString(), request.isDurable(), request
+               .isTemporary());
+      }
+      else if (type == SESS_DELETE_QUEUE)
+      {
+         SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
+
+         session.deleteQueue(request.getQueueName());
+      }
+      else if (type == SESS_QUEUEQUERY)
+      {
+         SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
+
+         response = session.executeQueueQuery(request);
+      }
+      else if (type == SESS_BINDINGQUERY)
+      {
+         SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
+
+         response = session.executeBindingQuery(request);
+      }
+      else if (type == SESS_CREATEBROWSER)
+      {
+         SessionCreateBrowserMessage request = (SessionCreateBrowserMessage) packet;
+
+         response = session.createBrowser(request.getQueueName(), request
+               .getFilterString());
+      }
+      else if (type == CLOSE)
+      {
+      	session.close();
+      }
+      else if (type == SESS_ACKNOWLEDGE)
+      {
+         SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
+
+         session.acknowledge(message.getDeliveryID(), message.isAllUpTo());
+      }
+      else if (type == SESS_COMMIT)
+      {
+      	session.commit();
+      }
+      else if (type == SESS_ROLLBACK)
+      {
+      	session.rollback();
+      }
+      else if (type == SESS_CANCEL)
+      {
+         SessionCancelMessage message = (SessionCancelMessage) packet;
+
+         session.cancel(message.getDeliveryID(), message.isExpired());
+      }
+      else if (type == SESS_XA_COMMIT)
+      {
+         SessionXACommitMessage message = (SessionXACommitMessage) packet;
+
+         response = session.XACommit(message.isOnePhase(), message.getXid());
+      }
+      else if (type == SESS_XA_END)
+      {
+         SessionXAEndMessage message = (SessionXAEndMessage) packet;
+
+         response = session.XAEnd(message.getXid(), message.isFailed());
+      }
+      else if (type == SESS_XA_FORGET)
+      {
+         SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
+
+         response = session.XAForget(message.getXid());
+      }
+      else if (type == SESS_XA_JOIN)
+      {
+         SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
+
+         response = session.XAJoin(message.getXid());
+      }
+      else if (type == SESS_XA_RESUME)
+      {
+         SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
+
+         response = session.XAResume(message.getXid());
+      }
+      else if (type == SESS_XA_ROLLBACK)
+      {
+         SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
+
+         response = session.XARollback(message.getXid());
+      }
+      else if (type == SESS_XA_START)
+      {
+         SessionXAStartMessage message = (SessionXAStartMessage) packet;
+
+         response = session.XAStart(message.getXid());
+      }
+      else if (type == SESS_XA_SUSPEND)
+      {
+         response = session.XASuspend();
+      }
+      else if (type == SESS_XA_PREPARE)
+      {
+         SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
+
+         response = session.XAPrepare(message.getXid());
+      }
+      else if (type == SESS_XA_INDOUBT_XIDS)
+      {
+         List<Xid> xids = session.getInDoubtXids();
+
+         response = new SessionXAGetInDoubtXidsResponseMessage(xids);
+      }
+      else if (type == SESS_XA_GET_TIMEOUT)
+      {
+         response = new SessionXAGetTimeoutResponseMessage(session.getXATimeout());
+      }
+      else if (type == SESS_XA_SET_TIMEOUT)
+      {
+         SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet;
+
+         response = new SessionXASetTimeoutResponseMessage(session.setXATimeout(message
+               .getTimeoutSeconds()));
+      }
+      else if (type == PacketType.SESS_ADD_ADDRESS)
+      {
+         SessionAddAddressMessage message = (SessionAddAddressMessage) packet;
+
+         session.addAddress(message.getAddress());
+      }
+      else if (type == PacketType.SESS_REMOVE_ADDRESS)
+      {
+         SessionRemoveAddressMessage message = (SessionRemoveAddressMessage) packet;
+
+         session.removeAddress(message.getAddress());
+      }
+      else
+      {
+         throw new MessagingException(MessagingException.UNSUPPORTED_PACKET, "Unsupported packet " + type);
+      }
+
+      // reply if necessary
+      if (response == null && packet.isOneWay() == false)
+      {
+         response = new NullPacket();
+      }
+
+      return response;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "ServerSessionPacketHandler[id=" + session.getID() + "]";
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServer.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServer.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,11 +21,9 @@
  */
 package org.jboss.messaging.core;
 
-
 import java.util.HashSet;
 
 import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.SecurityStore;
 import org.jboss.jms.server.security.Role;
 import org.jboss.messaging.core.remoting.RemotingService;
 import org.jboss.messaging.util.HierarchicalRepository;
@@ -63,20 +61,14 @@
    
    RemotingService getRemotingService();
   
-   SecurityStore getSecurityManager();
-
    ConnectionManager getConnectionManager();
 
-   MemoryManager getMemoryManager();
-
    PersistenceManager getPersistenceManager();
 
    void setPersistenceManager(PersistenceManager persistenceManager);
 
    PostOffice getPostOffice();
    
-   ResourceManager getResourceManager();
-
    HierarchicalRepository<HashSet<Role>> getSecurityRepository();
 
    void setPostOffice(PostOffice postOffice);

Modified: trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,14 +21,14 @@
   */
 package org.jboss.messaging.core;
 
-import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
-import org.jboss.messaging.core.impl.filter.FilterImpl;
-import org.jboss.jms.client.api.ClientConnectionFactory;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
-
-import java.util.List;
 import java.util.Collection;
+import java.util.List;
 
+import org.jboss.jms.client.api.ClientConnectionFactory;
+import org.jboss.jms.server.endpoint.ServerConnection;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+
 /**
  * This interface describes the management interface exposed by the server
  * 
@@ -37,10 +37,6 @@
  */
 public interface MessagingServerManagement
 {
-//   String getServerVersion();
-//   
-//   Configuration getConfiguration();
-//   
    boolean isStarted();
 
    void createQueue(String address,String name) throws Exception;
@@ -93,7 +89,7 @@
 
    public int getConsumerCountForQueue(String queue) throws Exception;
 
-   List<ServerConnectionEndpoint> getActiveConnections();
+   List<ServerConnection> getActiveConnections();
 
    void moveMessages(String toQueue, String fromQueue, FilterImpl filter) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -30,7 +30,6 @@
 
 import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.SecurityStore;
 import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
 import org.jboss.jms.server.endpoint.MessagingServerPacketHandler;
 import org.jboss.jms.server.security.NullAuthenticationManager;
@@ -203,7 +202,13 @@
          remotingService.addFailureListener(connectionManager);
          memoryManager.start();
          postOffice.start();
-         MessagingServerPacketHandler serverPacketHandler = new MessagingServerPacketHandler(this);
+         
+         MessagingServerPacketHandler serverPacketHandler =
+         	new MessagingServerPacketHandler(remotingService.getDispatcher(), resourceManager,
+         			                           persistenceManager, postOffice, securityStore,
+         			                           connectionManager);
+         
+         
          getRemotingService().getDispatcher().register(serverPacketHandler);
 
          ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -318,20 +323,20 @@
 
    public void createQueue(String address, String name) throws Exception
    {
-      if (getPostOffice().getBinding(name) == null)
+      if (postOffice.getBinding(name) == null)
       {
-         getPostOffice().addBinding(address, name, null, true, false);
+         postOffice.addBinding(address, name, null, true, false);
       }
 
-      if (!getPostOffice().containsAllowableAddress(address))
+      if (!postOffice.containsAllowableAddress(address))
       {
-         getPostOffice().addAllowableAddress(address);
+         postOffice.addAllowableAddress(address);
       }
    }
 
    public boolean destroyQueuesByAddress(String address) throws Exception
    {
-      List<Binding> bindings = getPostOffice().getBindingsForAddress(address);
+      List<Binding> bindings = postOffice.getBindingsForAddress(address);
 
       boolean destroyed = false;
 
@@ -339,23 +344,23 @@
       {
          Queue queue = binding.getQueue();
 
-         getPersistenceManager().deleteAllReferences(queue);
+         persistenceManager.deleteAllReferences(queue);
 
          queue.removeAllReferences();
 
-         getPostOffice().removeBinding(queue.getName());
+         postOffice.removeBinding(queue.getName());
 
          destroyed = true;
       }
 
-      getPostOffice().removeAllowableAddress(address);
+      postOffice.removeAllowableAddress(address);
 
       return destroyed;
    }
 
    public boolean destroyQueue(String name) throws Exception
    {
-      Binding binding = getPostOffice().getBinding(name);
+      Binding binding = postOffice.getBinding(name);
 
       boolean destroyed = false;
 
@@ -363,11 +368,11 @@
       {
          Queue queue = binding.getQueue();
 
-         getPersistenceManager().deleteAllReferences(queue);
+         persistenceManager.deleteAllReferences(queue);
 
          queue.removeAllReferences();
 
-         getPostOffice().removeBinding(queue.getName());
+         postOffice.removeBinding(queue.getName());
 
          destroyed = true;
       }
@@ -377,9 +382,9 @@
 
    public boolean addAddress(String address)
    {
-      if (!getPostOffice().containsAllowableAddress(address))
+      if (!postOffice.containsAllowableAddress(address))
       {
-         getPostOffice().addAllowableAddress(address);
+      	postOffice.addAllowableAddress(address);
          return true;
       }
       return false;
@@ -387,9 +392,9 @@
 
    public boolean removeAddress(String address)
    {
-      if (getPostOffice().containsAllowableAddress(address))
+      if (postOffice.containsAllowableAddress(address))
       {
-         getPostOffice().removeAllowableAddress(address);
+      	postOffice.removeAllowableAddress(address);
          return true;
       }
       return false;
@@ -451,21 +456,11 @@
       }
    }
 
-   public SecurityStore getSecurityManager()
-   {
-      return securityStore;
-   }
-
    public ConnectionManager getConnectionManager()
    {
       return connectionManager;
    }
 
-   public MemoryManager getMemoryManager()
-   {
-      return memoryManager;
-   }
-
    public PersistenceManager getPersistenceManager()
    {
       return persistenceManager;
@@ -486,11 +481,6 @@
       this.postOffice = postOffice;
    }
 
-   public ResourceManager getResourceManager()
-   {
-      return resourceManager;
-   }
-
    public HierarchicalRepository<HashSet<Role>> getSecurityRepository()
    {
       return securityRepository;
@@ -524,7 +514,7 @@
    {
 
 
-      List<Binding> bindings = getPostOffice().getBindingsForAddress(address);
+      List<Binding> bindings = postOffice.getBindingsForAddress(address);
 
       boolean destroyed = false;
 
@@ -532,16 +522,16 @@
       {
          Queue queue = binding.getQueue();
 
-         getPersistenceManager().deleteAllReferences(queue);
+         persistenceManager.deleteAllReferences(queue);
 
          queue.removeAllReferences();
 
-         getPostOffice().removeBinding(queue.getName());
+         postOffice.removeBinding(queue.getName());
 
          destroyed = true;
       }
 
-      getPostOffice().removeAllowableAddress(address);
+      postOffice.removeAllowableAddress(address);
 
       return destroyed;
    }

Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,21 +21,30 @@
    */
 package org.jboss.messaging.core.impl.server;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledFuture;
 
 import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.jboss.jms.client.api.ClientConnectionFactory;
 import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
-import org.jboss.jms.client.SelectorTranslator;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
-import org.jboss.messaging.core.*;
+import org.jboss.jms.server.endpoint.ServerConnection;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.MessagingComponent;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.MessagingServerManagement;
 import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
 import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
-import org.jboss.messaging.core.impl.filter.FilterImpl;
 import org.jboss.messaging.util.MessagingException;
 
 /**
@@ -325,7 +334,7 @@
       return getQueue(queue).getConsumerCount();
    }
 
-   public  List<ServerConnectionEndpoint> getActiveConnections()
+   public  List<ServerConnection> getActiveConnections()
    {
       return messagingServer.getConnectionManager().getActiveConnections();
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -102,7 +102,8 @@
          callFilters(packet);
          handler.handle(packet, sender);
 
-      } else
+      }
+      else
       {
          log.error("Unhandled packet " + packet);
       }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2008-02-24 12:15:29 UTC (rev 3778)
@@ -651,8 +651,6 @@
 	      
 	      assertRemainingMessages(NUM_MESSAGES);
 	
-			log.info("Sent messages");
-	
 			int count = 0;
 	
 			Message m = null;
@@ -662,8 +660,6 @@
 	         
 				m = consumer.receive(200);
 				
-				log.info("got message " + m);
-	         
 	         assertRemainingMessages(NUM_MESSAGES - (i + 1));
 	         
 				if (m == null) break;
@@ -703,11 +699,9 @@
    {       
       final int BATCH_SIZE = 10;
 
-      log.info("*********** DEPLOYING CF");
       ArrayList<String> bindings = new ArrayList<String>();
       bindings.add("mycf");
       deployConnectionFactory(null,"mycf", bindings, -1, -1, -1, -1, false, false, false, BATCH_SIZE);
-      log.info("************ DONE DEPLOY");
       Connection conn = null;
       
       try




More information about the jboss-cvs-commits mailing list