[hornetq-commits] JBoss hornetq SVN: r8827 - in branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server: impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 21 09:46:21 EST 2010


Author: jmesnil
Date: 2010-01-21 09:46:21 -0500 (Thu, 21 Jan 2010)
New Revision: 8827

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/BindingQueryResult.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/QueueQueryResult.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/SessionCallback.java
Removed:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* sync with the trunk: svn merge -r 8806:8821 https://svn.jboss.org/repos/hornetq/trunk

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/BindingQueryResult.java (from rev 8819, trunk/src/main/org/hornetq/core/server/BindingQueryResult.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/BindingQueryResult.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/BindingQueryResult.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * 
+ * A BindingQueryResult
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class BindingQueryResult
+{
+   private boolean exists;
+
+   private List<SimpleString> queueNames;
+
+   public BindingQueryResult(final boolean exists, final List<SimpleString> queueNames)
+   {
+      this.exists = exists;
+
+      this.queueNames = queueNames;
+   }
+
+   public boolean isExists()
+   {
+      return exists;
+   }
+
+   public List<SimpleString> getQueueNames()
+   {
+      return queueNames;
+   }
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -23,10 +23,8 @@
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.security.Role;
@@ -70,24 +68,19 @@
 
    void unregisterActivateCallback(ActivateCallback callback);
 
-   ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
-
    /** The journal at the backup server has to be equivalent as the journal used on the live node. 
     *  Or else the backup node is out of sync. */
    ReplicationEndpoint connectToReplicationEndpoint(Channel channel) throws Exception;
 
-   CreateSessionResponseMessage createSession(String name,
-                                              long channelID,
-                                              String username,
-                                              String password,
-                                              int minLargeMessageSize,
-                                              int incrementingVersion,
-                                              RemotingConnection remotingConnection,
-                                              boolean autoCommitSends,
-                                              boolean autoCommitAcks,
-                                              boolean preAcknowledge,
-                                              boolean xa,
-                                              int confirmationWindowSize) throws Exception;
+   ServerSession createSession(String name,
+                               String username,
+                               String password,
+                               int minLargeMessageSize,
+                               CoreRemotingConnection remotingConnection,
+                               boolean autoCommitSends,
+                               boolean autoCommitAcks,
+                               boolean preAcknowledge,
+                               boolean xa) throws Exception;
 
    void removeSession(String name) throws Exception;
 
@@ -136,4 +129,6 @@
    void setGroupingHandler(GroupingHandler groupingHandler);
 
    GroupingHandler getGroupingHandler();
+
+   boolean checkActivate() throws Exception;
 }

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/QueueQueryResult.java (from rev 8819, trunk/src/main/org/hornetq/core/server/QueueQueryResult.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/QueueQueryResult.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/QueueQueryResult.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * 
+ * A QueueQueryResult
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class QueueQueryResult
+{
+   private SimpleString name;
+   
+   private boolean exists;
+
+   private boolean durable;
+
+   private int consumerCount;
+
+   private int messageCount;
+
+   private SimpleString filterString;
+
+   private SimpleString address;
+   
+   private boolean temporary;
+
+   public QueueQueryResult(final SimpleString name,
+                                           final SimpleString address,                                           
+                                           final boolean durable,
+                                           final boolean temporary,
+                                           final SimpleString filterString,
+                                           final int consumerCount,
+                                           final int messageCount)
+   {
+      this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
+   }
+
+   public QueueQueryResult()
+   {
+      this(null, null, false, false, null, 0, 0, false);
+   }
+
+   private QueueQueryResult(final SimpleString name,
+                                            final SimpleString address,
+                                            final boolean durable,
+                                            final boolean temporary,
+                                            final SimpleString filterString,
+                                            final int consumerCount,
+                                            final int messageCount,
+                                            final boolean exists)
+   {
+      this.durable = durable;
+      
+      this.temporary = temporary;
+
+      this.consumerCount = consumerCount;
+
+      this.messageCount = messageCount;
+
+      this.filterString = filterString;
+
+      this.address = address;
+      
+      this.name = name;
+
+      this.exists = exists;
+   }
+
+   public boolean isExists()
+   {
+      return exists;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public int getConsumerCount()
+   {
+      return consumerCount;
+   }
+
+   public int getMessageCount()
+   {
+      return messageCount;
+   }
+
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+   
+   public SimpleString getName()
+   {
+      return name;
+   }
+   
+   public boolean isTemporary()
+   {
+      return temporary;
+   }
+
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -13,35 +13,12 @@
 
 package org.hornetq.core.server;
 
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.hornetq.core.server.impl.ServerSessionPacketHandler;
+import java.util.List;
 
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.SimpleString;
+
 /**
  *
  * A ServerSession
@@ -54,8 +31,6 @@
 {
    String getName();
 
-   long getID();
-
    String getUsername();
 
    String getPassword();
@@ -66,75 +41,75 @@
 
    void removeConsumer(ServerConsumer consumer) throws Exception;
 
-   void close() throws Exception;
+   void acknowledge(long consumerID, long messageID) throws Exception;
 
-   void handleAcknowledge(final SessionAcknowledgeMessage packet);
+   void expire(long consumerID, long messageID) throws Exception;
 
-   void handleExpired(final SessionExpiredMessage packet);
+   void rollback(boolean considerLastMessageAsDelivered) throws Exception;
 
-   void handleRollback(RollbackMessage packet);
+   void commit() throws Exception;
 
-   void handleCommit(Packet packet);
+   void xaCommit(Xid xid, boolean onePhase) throws Exception;
 
-   void handleXACommit(SessionXACommitMessage packet);
+   void xaEnd(Xid xid) throws Exception;
 
-   void handleXAEnd(SessionXAEndMessage packet);
+   void xaForget(Xid xid) throws Exception;
 
-   void handleXAForget(SessionXAForgetMessage packet);
+   void xaJoin(Xid xid) throws Exception;
 
-   void handleXAJoin(SessionXAJoinMessage packet);
+   void xaPrepare(Xid xid) throws Exception;
 
-   void handleXAPrepare(SessionXAPrepareMessage packet);
+   void xaResume(Xid xid) throws Exception;
 
-   void handleXAResume(SessionXAResumeMessage packet);
+   void xaRollback(Xid xid) throws Exception;
 
-   void handleXARollback(SessionXARollbackMessage packet);
+   void xaStart(Xid xid) throws Exception;
 
-   void handleXAStart(SessionXAStartMessage packet);
+   void xaSuspend() throws Exception;
 
-   void handleXASuspend(Packet packet);
+   List<Xid> xaGetInDoubtXids();
 
-   void handleGetInDoubtXids(Packet packet);
+   int xaGetTimeout();
 
-   void handleGetXATimeout(Packet packet);
+   void xaSetTimeout(int timeout);
 
-   void handleSetXATimeout(SessionXASetTimeoutMessage packet);
+   void start();
 
-   void handleStart(Packet packet);
+   void stop();
 
-   void handleStop(Packet packet);
+   void createQueue(SimpleString address,
+                          SimpleString name,
+                          SimpleString filterString,
+                          boolean temporary,
+                          boolean durable) throws Exception;
 
-   void handleCreateQueue(CreateQueueMessage packet);
+   void deleteQueue(SimpleString name) throws Exception;
 
-   void handleDeleteQueue(SessionDeleteQueueMessage packet);
+   void createConsumer(long consumerID, SimpleString name, SimpleString filterString, boolean browseOnly) throws Exception;
 
-   void handleCreateConsumer(SessionCreateConsumerMessage packet);
+   QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
 
-   void handleExecuteQueueQuery(SessionQueueQueryMessage packet);
+   BindingQueryResult executeBindingQuery(SimpleString address);
 
-   void handleExecuteBindingQuery(SessionBindingQueryMessage packet);
+   void closeConsumer(long consumerID) throws Exception;
 
-   void handleCloseConsumer(SessionConsumerCloseMessage packet);
+   void receiveConsumerCredits(long consumerID, int credits) throws Exception;
 
-   void handleReceiveConsumerCredits(SessionConsumerFlowCreditMessage packet);
+   void sendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
 
-   void handleSendContinuations(SessionSendContinuationMessage packet);
+   void send(ServerMessage message) throws Exception;
 
-   void handleSend(SessionSendMessage packet);
+   void sendLarge(byte[] largeMessageHeader) throws Exception;
 
-   void handleSendLargeMessage(SessionSendLargeMessage packet);
+   void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
 
-   void handleForceConsumerDelivery(SessionForceConsumerDelivery message);
+   void requestProducerCredits(SimpleString address, int credits) throws Exception;
 
-   void handleRequestProducerCredits(SessionRequestProducerCreditsMessage message) throws Exception;
+   void close() throws Exception;
 
-   void handleClose(Packet packet);
-
-   int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
-
-   Channel getChannel();
-
-   ServerSessionPacketHandler getHandler();
-
-   void setHandler(ServerSessionPacketHandler handler);
+   void setTransferring(boolean transferring);
+   
+   void runConnectionFailureRunners();
+   
+   void setCallback(SessionCallback callback);
 }

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/SessionCallback.java (from rev 8819, trunk/src/main/org/hornetq/core/server/SessionCallback.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/SessionCallback.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/SessionCallback.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * A SessionCallback
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface SessionCallback
+{
+   void sendProducerCreditsMessage(int credits, SimpleString address, int offset);
+
+   int sendMessage(ServerMessage message, long consumerID, int deliveryCount);
+
+   int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount);
+
+   int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse);
+   
+   void closed();
+}

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -1,234 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.HornetQServer;
-
-/**
- * A packet handler for all packets that need to be handled at the server level
- * 
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- */
-public class HornetQPacketHandler implements ChannelHandler
-{
-   private static final Logger log = Logger.getLogger(HornetQPacketHandler.class);
-
-   private final HornetQServer server;
-
-   private final Channel channel1;
-
-   private final RemotingConnection connection;
-
-   public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final RemotingConnection connection)
-   {
-      this.server = server;
-
-      this.channel1 = channel1;
-
-      this.connection = connection;
-   }
-
-   public void handlePacket(final Packet packet)
-   {
-      byte type = packet.getType();
-
-      // All these operations need to be idempotent since they are outside of the session
-      // reliability replay functionality
-      switch (type)
-      {
-         case CREATESESSION:
-         {
-            CreateSessionMessage request = (CreateSessionMessage)packet;
-
-            handleCreateSession(request);
-
-            break;
-         }
-         case REATTACH_SESSION:
-         {
-            ReattachSessionMessage request = (ReattachSessionMessage)packet;
-
-            handleReattachSession(request);
-
-            break;
-         }
-         case CREATE_QUEUE:
-         {
-            // Create queue can also be fielded here in the case of a replicated store and forward queue creation
-
-            CreateQueueMessage request = (CreateQueueMessage)packet;
-
-            handleCreateQueue(request);
-
-            break;
-         }
-         case CREATE_REPLICATION:
-         {
-            // Create queue can also be fielded here in the case of a replicated store and forward queue creation
-
-            CreateReplicationSessionMessage request = (CreateReplicationSessionMessage)packet;
-
-            handleCreateReplication(request);
-
-            break;
-         }
-         default:
-         {
-            HornetQPacketHandler.log.error("Invalid packet " + packet);
-         }
-      }
-   }
-
-   private void handleCreateSession(final CreateSessionMessage request)
-   {
-      boolean incompatibleVersion = false;
-      Packet response;
-      try
-      {
-         response = server.createSession(request.getName(),
-                                         request.getSessionChannelID(),
-                                         request.getUsername(),
-                                         request.getPassword(),
-                                         request.getMinLargeMessageSize(),
-                                         request.getVersion(),
-                                         connection,
-                                         request.isAutoCommitSends(),
-                                         request.isAutoCommitAcks(),
-                                         request.isPreAcknowledge(),
-                                         request.isXA(),
-                                         request.getWindowSize());
-      }
-      catch (Exception e)
-      {
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-            
-            if (((HornetQException)e).getCode() == HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
-            {
-               incompatibleVersion = true;
-            }
-         }
-         else
-         {
-            HornetQPacketHandler.log.error("Failed to create session", e);
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-      
-      // send the exception to the client and destroy
-      // the connection if the client and server versions
-      // are not compatible
-      if (incompatibleVersion)
-      {
-         channel1.sendAndFlush(response);
-      }
-      else
-      {
-         channel1.send(response);
-      }
-   }
-
-   private void handleReattachSession(final ReattachSessionMessage request)
-   {
-      Packet response;
-
-      try
-      {
-         response = server.reattachSession(connection, request.getName(), request.getLastConfirmedCommandID());
-      }
-      catch (Exception e)
-      {
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            HornetQPacketHandler.log.error("Failed to reattach session", e);
-
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      channel1.send(response);
-   }
-
-   private void handleCreateQueue(final CreateQueueMessage request)
-   {
-      try
-      {
-         server.createQueue(request.getAddress(),
-                            request.getQueueName(),
-                            request.getFilterString(),
-                            request.isDurable(),
-                            request.isTemporary());
-      }
-      catch (Exception e)
-      {
-         HornetQPacketHandler.log.error("Failed to handle create queue", e);
-      }
-   }
-
-   private void handleCreateReplication(final CreateReplicationSessionMessage request)
-   {
-      Packet response;
-
-      try
-      {
-         Channel channel = connection.getChannel(request.getSessionChannelID(), -1);
-
-         ReplicationEndpoint endpoint = server.connectToReplicationEndpoint(channel);
-
-         channel.setHandler(endpoint);
-
-         response = new NullResponseMessage();
-      }
-      catch (Exception e)
-      {
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            HornetQPacketHandler.log.warn(e.getMessage(), e);
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      channel1.send(response);
-   }
-
-}
\ No newline at end of file

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -22,7 +22,6 @@
 import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -69,10 +68,8 @@
 import org.hornetq.core.postoffice.impl.DivertBinding;
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.replication.ReplicationEndpoint;
@@ -342,12 +339,12 @@
             managementService.removeNotificationListener(groupingHandler);
             groupingHandler = null;
          }
-         // Need to flush all sessions to make sure all confirmations get sent back to client
-
-         for (ServerSession session : sessions.values())
-         {
-            session.getChannel().flushConfirmations();
-         }
+         // // Need to flush all sessions to make sure all confirmations get sent back to client
+         //
+         // for (ServerSession session : sessions.values())
+         // {
+         // session.getChannel().flushConfirmations();
+         // }
       }
 
       // we stop the remoting service outside a lock
@@ -537,102 +534,21 @@
       return clusterManager;
    }
 
-   public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
-                                                         final String name,
-                                                         final int lastConfirmedCommandID) throws Exception
+   public ServerSession createSession(final String name,
+                                      final String username,
+                                      final String password,
+                                      final int minLargeMessageSize,
+                                      final CoreRemotingConnection connection,
+                                      final boolean autoCommitSends,
+                                      final boolean autoCommitAcks,
+                                      final boolean preAcknowledge,
+                                      final boolean xa) throws Exception
    {
-      if (!started)
-      {
-         return new ReattachSessionResponseMessage(-1, false);
-      }
-
-      ServerSession session = sessions.get(name);
-
-      if (!checkActivate())
-      {
-         return new ReattachSessionResponseMessage(-1, false);
-      }
-
-      if (session == null)
-      {
-         return new ReattachSessionResponseMessage(-1, false);
-      }
-      else
-      {
-         if (session.getChannel().getConfirmationWindowSize() == -1)
-         {
-            // Even though session exists, we can't reattach since confi window size == -1,
-            // i.e. we don't have a resend cache for commands, so we just close the old session
-            // and let the client recreate
-
-            try
-            {
-               session.close();
-            }
-            catch (Exception e)
-            {
-               HornetQServerImpl.log.error("Failed to close session", e);
-            }
-
-            sessions.remove(name);
-
-            return new ReattachSessionResponseMessage(-1, false);
-         }
-         else
-         {
-            // Reconnect the channel to the new connection
-            int serverLastConfirmedCommandID = session.transferConnection(connection, lastConfirmedCommandID);
-
-            return new ReattachSessionResponseMessage(serverLastConfirmedCommandID, true);
-         }
-      }
-   }
-
-   public CreateSessionResponseMessage createSession(final String name,
-                                                     final long channelID,
-                                                     final String username,
-                                                     final String password,
-                                                     final int minLargeMessageSize,
-                                                     final int incrementingVersion,
-                                                     final RemotingConnection connection,
-                                                     final boolean autoCommitSends,
-                                                     final boolean autoCommitAcks,
-                                                     final boolean preAcknowledge,
-                                                     final boolean xa,
-                                                     final int sendWindowSize) throws Exception
-   {
-      if (!started)
-      {
-         throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED, "Server not started");
-      }
-
-      if (version.getIncrementingVersion() != incrementingVersion)
-      {
-         HornetQServerImpl.log.warn("Client with version " + incrementingVersion +
-                                    " and address " +
-                                    connection.getRemoteAddress() +
-                                    " is not compatible with server version " +
-                                    version.getFullVersion() +
-                                    ". " +
-                                    "Please ensure all clients and servers are upgraded to the same version for them to " +
-                                    "interoperate properly");
-         throw new HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
-                                    "Server and client versions incompatible");
-      }
-
-      if (!checkActivate())
-      {
-         throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED,
-                                    "Server will not accept create session requests");
-      }
-
       if (securityStore != null)
       {
          securityStore.authenticate(username, password);
       }
 
-      Channel channel = connection.getChannel(channelID, sendWindowSize);
-
       final ServerSessionImpl session = new ServerSessionImpl(name,
                                                               username,
                                                               password,
@@ -647,22 +563,13 @@
                                                               postOffice,
                                                               resourceManager,
                                                               securityStore,
-                                                              channel,
                                                               managementService,
                                                               this,
                                                               configuration.getManagementAddress());
 
       sessions.put(name, session);
 
-      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
-                                                                          storageManager.newContext(executorFactory.getExecutor()),
-                                                                          storageManager);
-
-      session.setHandler(handler);
-
-      channel.setHandler(handler);
-
-      return new CreateSessionResponseMessage(version.getIncrementingVersion());
+      return session;
    }
 
    public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
@@ -937,7 +844,7 @@
       }
    }
 
-   private synchronized boolean checkActivate() throws Exception
+   public synchronized boolean checkActivate() throws Exception
    {
       if (configuration.isBackup())
       {
@@ -1025,8 +932,7 @@
 
       if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser()) && ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
       {
-         log.warn("Security risk! It has been detected that the cluster admin user and password "
-                  + "have not been changed from the installation default. "
+         log.warn("Security risk! It has been detected that the cluster admin user and password " + "have not been changed from the installation default. "
                   + "Please see the HornetQ user guide, cluster chapter, for instructions on how to do this.");
       }
 

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -245,12 +245,12 @@
          executor.execute(deliverRunner);
       }
    }
-   
+
    public Executor getExecutor()
    {
       return executor;
    }
-   
+
    public synchronized void deliverNow()
    {
       deliverRunner.run();
@@ -743,7 +743,7 @@
       }
       return false;
    }
-   
+
    public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
    {
       int count = 0;
@@ -757,7 +757,7 @@
             deliveringCount.incrementAndGet();
             sendToDeadLetterAddress(ref);
             iter.remove();
-            count ++;
+            count++;
          }
       }
       return count;
@@ -832,7 +832,7 @@
 
       return false;
    }
-   
+
    public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
    {
       Iterator<MessageReference> iter = messageReferences.iterator();
@@ -843,7 +843,7 @@
          MessageReference ref = iter.next();
          if (filter == null || filter.match(ref.getMessage()))
          {
-            count ++;
+            count++;
             iter.remove();
             ref.getMessage().setPriority(newPriority);
             addLast(ref);
@@ -1404,7 +1404,8 @@
             }
             catch (Exception e)
             {
-               QueueImpl.log.warn("Unable to remove message id = " + message.getMessageID() + " please remove manually");
+               QueueImpl.log.warn("Unable to remove message id = " + message.getMessageID() + " please remove manually",
+                                  e);
             }
          }
       }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -23,7 +23,6 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -33,10 +32,6 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.server.HandleStatus;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
@@ -44,6 +39,7 @@
 import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.transaction.Transaction;
@@ -112,7 +108,7 @@
 
    private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
 
-   private final Channel channel;
+   private final SessionCallback callback;
 
    private volatile boolean closed;
 
@@ -133,12 +129,11 @@
                              final boolean started,
                              final boolean browseOnly,
                              final StorageManager storageManager,
-                             final Channel channel,
+                             final SessionCallback callback,
                              final boolean preAcknowledge,
                              final boolean strictUpdateDeliveryCount,
                              final ManagementService managementService) throws Exception
    {
-
       this.id = id;
 
       this.filter = filter;
@@ -157,7 +152,7 @@
 
       this.storageManager = storageManager;
 
-      this.channel = channel;
+      this.callback = callback;
 
       this.preAcknowledge = preAcknowledge;
 
@@ -361,9 +356,7 @@
          forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
          forcedDeliveryMessage.setAddress(messageQueue.getName());
 
-         final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
-
-         channel.send(packet);
+         callback.sendMessage(forcedDeliveryMessage, id, 0);
       }
       catch (Exception e)
       {
@@ -634,15 +627,12 @@
     */
    private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
    {
-      final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
+      int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
 
-      channel.send(packet);
-
       if (availableCredits != null)
       {
-         availableCredits.addAndGet(-packet.getPacketSize());
+         availableCredits.addAndGet(-packetSize);
       }
-
    }
 
    // Inner classes
@@ -732,21 +722,18 @@
 
                sizePendingLargeMessage = context.getLargeBodySize();
 
-               SessionReceiveLargeMessage initialPacket = new SessionReceiveLargeMessage(id,
-                                                                                         headerBuffer.toByteBuffer()
-                                                                                                     .array(),
-                                                                                         context.getLargeBodySize(),
-                                                                                         ref.getDeliveryCount());
-
                context.open();
 
                sentInitialPacket = true;
 
-               channel.send(initialPacket);
+               int packetSize = callback.sendLargeMessage(id,
+                                                          headerBuffer.toByteBuffer().array(),
+                                                          context.getLargeBodySize(),
+                                                          ref.getDeliveryCount());
 
                if (availableCredits != null)
                {
-                  availableCredits.addAndGet(-initialPacket.getPacketSize());
+                  availableCredits.addAndGet(-packetSize);
                }
 
                // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -768,22 +755,33 @@
                   return false;
                }
 
-               SessionReceiveContinuationMessage chunk = createChunkSend(context);
+               int localChunkLen = 0;
 
-               int chunkLen = chunk.getBody().length;
+               localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
 
-               channel.send(chunk);
+               HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
 
+               context.encode(bodyBuffer, localChunkLen);
+
+               byte[] body = bodyBuffer.toByteBuffer().array();
+
+               int packetSize = callback.sendLargeMessageContinuation(id,
+                                                                      body,
+                                                                      positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
+                                                                      false);
+
+               int chunkLen = body.length;
+
                if (ServerConsumerImpl.trace)
                {
-                  ServerConsumerImpl.trace("deliverLargeMessage: Sending " + chunk.getPacketSize() +
+                  ServerConsumerImpl.trace("deliverLargeMessage: Sending " + packetSize +
                                            " availableCredits now is " +
                                            availableCredits);
                }
 
                if (availableCredits != null)
                {
-                  availableCredits.addAndGet(-chunk.getPacketSize());
+                  availableCredits.addAndGet(-packetSize);
                }
 
                positionPendingLargeMessage += chunkLen;
@@ -846,26 +844,6 @@
             lock.unlock();
          }
       }
-
-      private SessionReceiveContinuationMessage createChunkSend(final BodyEncoder context) throws HornetQException
-      {
-         SessionReceiveContinuationMessage chunk;
-
-         int localChunkLen = 0;
-
-         localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
-
-         HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
-
-         context.encode(bodyBuffer, localChunkLen);
-
-         chunk = new SessionReceiveContinuationMessage(id,
-                                                       bodyBuffer.toByteBuffer().array(),
-                                                       positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
-                                                       false);
-
-         return chunk;
-      }
    }
 
    private class BrowserDeliverer implements Runnable

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -21,7 +21,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.protocol.core.PacketImpl;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -24,13 +24,13 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.exception.HornetQXAException;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
 import org.hornetq.core.journal.IOAsyncTask;
@@ -42,55 +42,22 @@
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.hornetq.core.security.CheckType;
 import org.hornetq.core.security.SecurityStore;
+import org.hornetq.core.server.BindingQueryResult;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueQueryResult;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.transaction.ResourceManager;
@@ -113,10 +80,10 @@
    private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
 
    // Static -------------------------------------------------------------------------------
-  
+
    // Attributes ----------------------------------------------------------------------------
 
-   private final long id;
+   // private final long id;
 
    private final String username;
 
@@ -132,7 +99,7 @@
 
    private final boolean strictUpdateDeliveryCount;
 
-   private RemotingConnection remotingConnection;
+   private CoreRemotingConnection remotingConnection;
 
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
 
@@ -146,8 +113,6 @@
 
    private final SecurityStore securityStore;
 
-   private final Channel channel;
-
    private final ManagementService managementService;
 
    private volatile boolean started = false;
@@ -163,14 +128,14 @@
    // The current currentLargeMessage being processed
    private volatile LargeServerMessage currentLargeMessage;
 
-   private ServerSessionPacketHandler handler;
-
    private boolean closed;
 
    private final Map<SimpleString, CreditManagerHolder> creditManagerHolders = new HashMap<SimpleString, CreditManagerHolder>();
 
    private final RoutingContext routingContext = new RoutingContextImpl(null);
 
+   private SessionCallback callback;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerSessionImpl(final String name,
@@ -182,18 +147,15 @@
                             final boolean preAcknowledge,
                             final boolean strictUpdateDeliveryCount,
                             final boolean xa,
-                            final RemotingConnection remotingConnection,
+                            final CoreRemotingConnection remotingConnection,                     
                             final StorageManager storageManager,
                             final PostOffice postOffice,
                             final ResourceManager resourceManager,
                             final SecurityStore securityStore,
-                            final Channel channel,
                             final ManagementService managementService,
                             final HornetQServer server,
                             final SimpleString managementAddress) throws Exception
    {
-      id = channel.getID();
-
       this.username = username;
 
       this.password = password;
@@ -223,8 +185,6 @@
 
       this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
 
-      this.channel = channel;
-
       this.managementService = managementService;
 
       this.name = name;
@@ -240,16 +200,11 @@
 
    // ServerSession implementation ----------------------------------------------------------------------------
 
-   public ServerSessionPacketHandler getHandler()
+   public void setCallback(final SessionCallback callback)
    {
-      return handler;
+      this.callback = callback;
    }
 
-   public void setHandler(final ServerSessionPacketHandler handler)
-   {
-      this.handler = handler;
-   }
-
    public String getUsername()
    {
       return username;
@@ -265,11 +220,6 @@
       return minLargeMessageSize;
    }
 
-   public long getID()
-   {
-      return id;
-   }
-
    public String getName()
    {
       return name;
@@ -288,7 +238,7 @@
       }
    }
 
-   public synchronized void close() throws Exception
+   private synchronized void doClose() throws Exception
    {
       if (tx != null && tx.getXid() == null)
       {
@@ -330,579 +280,346 @@
       {
          holder.store.returnProducerCredits(holder.outstandingCredits);
       }
+      
+      callback.closed();
    }
 
-   public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
+   public void createConsumer(final long consumerID,
+                                    final SimpleString name,
+                                    final SimpleString filterString,
+                                    final boolean browseOnly) throws Exception
    {
-      SimpleString name = packet.getQueueName();
-      
-      SimpleString filterString = packet.getFilterString();
+      Binding binding = postOffice.getBinding(name);
 
-      boolean browseOnly = packet.isBrowseOnly();
-
-      Packet response = null;
-
-      try
+      if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
       {
-         Binding binding = postOffice.getBinding(name);
+         throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue " + name + " does not exist");
+      }
 
-         if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
-         {
-            throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue " + name + " does not exist");
-         }
+      securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
 
-         securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
+      Filter filter = FilterImpl.createFilter(filterString);
 
-         Filter filter = FilterImpl.createFilter(filterString);;
+      ServerConsumer consumer = new ServerConsumerImpl(consumerID,
+                                                       this,
+                                                       (QueueBinding)binding,
+                                                       filter,
+                                                       started,
+                                                       browseOnly,
+                                                       storageManager,
+                                                       callback,
+                                                       preAcknowledge,
+                                                       strictUpdateDeliveryCount,
+                                                       managementService);
 
-         ServerConsumer consumer = new ServerConsumerImpl(packet.getID(),
-                                                          this,
-                                                          (QueueBinding)binding,
-                                                          filter,
-                                                          started,
-                                                          browseOnly,
-                                                          storageManager,
-                                                          channel,
-                                                          preAcknowledge,
-                                                          strictUpdateDeliveryCount,
-                                                          managementService);
+      consumers.put(consumer.getID(), consumer);
 
-         consumers.put(consumer.getID(), consumer);
+      if (!browseOnly)
+      {
+         TypedProperties props = new TypedProperties();
 
-         if (!browseOnly)
-         {
-            TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
 
-            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+         props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
 
-            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
+         props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
 
-            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+         props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
-            props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
+         Queue theQueue = (Queue)binding.getBindable();
 
-            Queue theQueue = (Queue)binding.getBindable();
+         props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
 
-            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
+         if (filterString != null)
+         {
+            props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+         }
 
-            if (filterString != null)
-            {
-               props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
-            }
+         Notification notification = new Notification(null, CONSUMER_CREATED, props);
 
-            Notification notification = new Notification(null, CONSUMER_CREATED, props);
+         managementService.sendNotification(notification);
+      }
+   }
 
-            managementService.sendNotification(notification);
-         }
-
-         //We send back queue information on the queue as a response-  this allows the queue to
-         //be automaticall recreated on failover
-         
-         if (packet.isRequiresResponse())
-         {
-            response = doExecuteQueueQuery(name);
-         }
-         else
-         {
-            response = null;
-         }
+   public void createQueue(final SimpleString address,
+                                 final SimpleString name,
+                                 final SimpleString filterString,
+                                 final boolean temporary,
+                                 final boolean durable) throws Exception
+   {
+      if (durable)
+      {
+         // make sure the user has privileges to create this queue
+         securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
       }
-      catch (Exception e)
+      else
       {
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            ServerSessionImpl.log.error("Failed to create consumer", e);
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
+         securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
       }
-      
-      sendResponse(packet, response, false, false);
-   }
 
-   public void handleCreateQueue(final CreateQueueMessage packet)
-   {
-      SimpleString address = packet.getAddress();
+      server.createQueue(address, name, filterString, durable, temporary);
 
-      final SimpleString name = packet.getQueueName();
-      
-      SimpleString filterString = packet.getFilterString();
-
-      boolean temporary = packet.isTemporary();
-
-      boolean durable = packet.isDurable();
-
-      Packet response = null;
-
-      try
+      if (temporary)
       {
-         if (durable)
-         {
-            // make sure the user has privileges to create this queue
-            securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
-         }
-         else
-         {
-            securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
-         }
+         // Temporary queue in core simply means the queue will be deleted if
+         // the remoting connection
+         // dies. It does not mean it will get deleted automatically when the
+         // session is closed.
+         // It is up to the user to delete the queue when finished with it
 
-         server.createQueue(address, name, filterString, durable, temporary);
-
-         if (temporary)
+         failureRunners.put(name, new Runnable()
          {
-            // Temporary queue in core simply means the queue will be deleted if
-            // the remoting connection
-            // dies. It does not mean it will get deleted automatically when the
-            // session is closed.
-            // It is up to the user to delete the queue when finished with it
-
-            failureRunners.put(name, new Runnable()
+            public void run()
             {
-               public void run()
+               try
                {
-                  try
+                  if (postOffice.getBinding(name) != null)
                   {
-                     if (postOffice.getBinding(name) != null)
+                     postOffice.removeBinding(name);
+
+                     if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
                      {
-                        postOffice.removeBinding(name);
-                        
-                        if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
-                        {
-                           creditManagerHolders.remove(name);
-                        }
+                        creditManagerHolders.remove(name);
                      }
                   }
-                  catch (Exception e)
-                  {
-                     ServerSessionImpl.log.error("Failed to remove temporary queue " + name);
-                  }
                }
-            });
-         }
-
-         if (packet.isRequiresResponse())
-         {
-            response = new NullResponseMessage();
-         }
-         else
-         {
-            response = null;
-         }
+               catch (Exception e)
+               {
+                  ServerSessionImpl.log.error("Failed to remove temporary queue " + name);
+               }
+            }
+         });
       }
-      catch (Exception e)
-      {
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            ServerSessionImpl.log.error("Failed to create queue", e);
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
+   public void deleteQueue(final SimpleString name) throws Exception
    {
-      SimpleString name = packet.getQueueName();
+      Binding binding = postOffice.getBinding(name);
 
-      Packet response = null;
-
-      try
+      if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
       {
-         Binding binding = postOffice.getBinding(name);
+         throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST);
+      }
 
-         if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
-         {
-            throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST);
-         }
+      server.destroyQueue(name, this);
 
-         server.destroyQueue(name, this);
+      failureRunners.remove(name);
 
-         failureRunners.remove(name);
-         
-         if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
-         {
-            creditManagerHolders.remove(name);
-         }
-
-         response = new NullResponseMessage();
-      }
-      catch (Exception e)
+      if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
       {
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            ServerSessionImpl.log.error("Failed to delete queue", e);
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
+         creditManagerHolders.remove(name);
       }
-
-      sendResponse(packet, response, false, false);
    }
-   
-   public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
-   {
-      SimpleString name = packet.getQueueName();
 
-      Packet response = null;
-
-      try
+   public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
+   {
+      if (name == null)
       {
-         response = doExecuteQueueQuery(name);
+         throw new IllegalArgumentException("Queue name is null");
       }
-      catch (Exception e)
-      {
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            ServerSessionImpl.log.error("Failed to execute queue query", e);
 
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
+      QueueQueryResult response;
 
-      sendResponse(packet, response, false, false);
-   }
+      Binding binding = postOffice.getBinding(name);
 
-   public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
-   {
-      SimpleString address = packet.getAddress();
-
-      Packet response = null;
-
-      try
+      if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
       {
-         if (address == null)
-         {
-            throw new IllegalArgumentException("Address is null");
-         }
+         Queue queue = (Queue)binding.getBindable();
 
-         List<SimpleString> names = new ArrayList<SimpleString>();
+         Filter filter = queue.getFilter();
 
-         Bindings bindings = postOffice.getMatchingBindings(address);
+         SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-         for (Binding binding : bindings.getBindings())
-         {
-            if (binding.getType() == BindingType.LOCAL_QUEUE)
-            {
-               names.add(binding.getUniqueName());
-            }
-         }
-
-         response = new SessionBindingQueryResponseMessage(!names.isEmpty(), names);
+         response = new QueueQueryResult(name,
+                                         binding.getAddress(),
+                                         queue.isDurable(),
+                                         queue.isTemporary(),
+                                         filterString,
+                                         queue.getConsumerCount(),
+                                         queue.getMessageCount());
       }
-      catch (Exception e)
+      // make an exception for the management address (see HORNETQ-29)
+      else if (name.equals(managementAddress))
       {
-         ServerSessionImpl.log.error("Failed to execute binding query", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
+         response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1);
       }
+      else
+      {
+         response = new QueueQueryResult();
+      }
 
-      sendResponse(packet, response, false, false);
+      return response;
    }
 
-   public void handleForceConsumerDelivery(final SessionForceConsumerDelivery message)
+   public BindingQueryResult executeBindingQuery(final SimpleString address)
    {
-      try
+      if (address == null)
       {
-         ServerConsumer consumer = consumers.get(message.getConsumerID());
-
-         consumer.forceDelivery(message.getSequence());
+         throw new IllegalArgumentException("Address is null");
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to query consumer deliveries", e);
-      }
 
-      sendResponse(message, null, false, false);
-   }
+      List<SimpleString> names = new ArrayList<SimpleString>();
 
-   public void handleAcknowledge(final SessionAcknowledgeMessage packet)
-   {
-      Packet response = null;
+      Bindings bindings = postOffice.getMatchingBindings(address);
 
-      try
+      for (Binding binding : bindings.getBindings())
       {
-         ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
-         consumer.acknowledge(autoCommitAcks, tx, packet.getMessageID());
-
-         if (packet.isRequiresResponse())
+         if (binding.getType() == BindingType.LOCAL_QUEUE)
          {
-            response = new NullResponseMessage();
+            names.add(binding.getUniqueName());
          }
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to acknowledge", e);
 
-         if (packet.isRequiresResponse())
-         {
-            if (e instanceof HornetQException)
-            {
-               response = new HornetQExceptionMessage((HornetQException)e);
-            }
-            else
-            {
-               response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-            }
-         }
-      }
+      return new BindingQueryResult(!names.isEmpty(), names);
+   }
 
-      sendResponse(packet, response, false, false);
+   public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception
+   {
+      ServerConsumer consumer = consumers.get(consumerID);
+
+      consumer.forceDelivery(sequence);
    }
 
-   public void handleExpired(final SessionExpiredMessage packet)
+   public void acknowledge(final long consumerID, final long messageID) throws Exception
    {
-      try
-      {
-         MessageReference ref = consumers.get(packet.getConsumerID()).getExpired(packet.getMessageID());
+      ServerConsumer consumer = consumers.get(consumerID);
 
-         if (ref != null)
-         {
-            ref.getQueue().expire(ref);
-         }
-      }
-      catch (Exception e)
+      consumer.acknowledge(autoCommitAcks, tx, messageID);
+   }
+
+   public void expire(final long consumerID, final long messageID) throws Exception
+   {
+      MessageReference ref = consumers.get(consumerID).getExpired(messageID);
+
+      if (ref != null)
       {
-         ServerSessionImpl.log.error("Failed to acknowledge", e);
+         ref.getQueue().expire(ref);
       }
-
-      sendResponse(packet, null, false, false);
    }
 
-   public void handleCommit(final Packet packet)
+   public void commit() throws Exception
    {
-      Packet response = null;
-
       try
       {
          tx.commit();
-
-         response = new NullResponseMessage();
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to commit", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
       finally
       {
          tx = new TransactionImpl(storageManager);
       }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleRollback(final RollbackMessage packet)
+   public void rollback(final boolean considerLastMessageAsDelivered) throws Exception
    {
-      Packet response = null;
-
-      try
+      if (tx == null)
       {
-         rollback(packet.isConsiderLastMessageAsDelivered());
+         // Might be null if XA
 
-         response = new NullResponseMessage();
+         tx = new TransactionImpl(storageManager);
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to rollback", e);
 
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
+      doRollback(considerLastMessageAsDelivered, tx);
 
-      sendResponse(packet, response, false, false);
+      tx = new TransactionImpl(storageManager);
    }
 
-   public void handleXACommit(final SessionXACommitMessage packet)
+   public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
    {
-      Packet response = null;
+      if (tx != null)
+      {
+         final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
 
-      Xid xid = packet.getXid();
-
-      try
+         throw new HornetQXAException(XAException.XAER_PROTO, msg);
+      }
+      else
       {
-         if (tx != null)
-         {
-            final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
+         Transaction theTx = resourceManager.removeTransaction(xid);
 
-            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+         if (theTx == null)
+         {
+            // checked heuristic committed transactions
+            if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+            {
+               throw new HornetQXAException(XAException.XA_HEURCOM,
+                                            "transaction has been heuristically committed: " + xid);
+            }
+            // checked heuristic rolled back transactions
+            else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+            {
+               throw new HornetQXAException(XAException.XA_HEURRB,
+                                            "transaction has been heuristically rolled back: " + xid);
+            }
+            else
+            {
+               throw new HornetQXAException(XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
+            }
          }
          else
          {
-            Transaction theTx = resourceManager.removeTransaction(xid);
-
-            if (theTx == null)
+            if (theTx.getState() == Transaction.State.SUSPENDED)
             {
-               // checked heuristic committed transactions
-               if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
-               {
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XA_HEURCOM,
-                                                          "transaction has been heuristically committed: " + xid);
-               }
-               // checked heuristic rolled back transactions
-               else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
-               {
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XA_HEURRB,
-                                                          "transaction has been heuristically rolled back: " + xid);
-               }
-               else
-               {
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XAER_NOTA,
-                                                          "Cannot find xid in resource manager: " + xid);
-               }
+               // Put it back
+               resourceManager.putTransaction(xid, tx);
+
+               throw new HornetQXAException(XAException.XAER_PROTO, "Cannot commit transaction, it is suspended " + xid);
             }
             else
             {
-               if (theTx.getState() == Transaction.State.SUSPENDED)
-               {
-                  // Put it back
-                  resourceManager.putTransaction(xid, tx);
-
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XAER_PROTO,
-                                                          "Cannot commit transaction, it is suspended " + xid);
-               }
-               else
-               {
-                  theTx.commit(packet.isOnePhase());
-
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-               }
+               theTx.commit(onePhase);
             }
          }
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to xa commit", e);
+   }
 
-         if (e instanceof HornetQException)
+   public void xaEnd(final Xid xid) throws Exception
+   {
+      if (tx != null && tx.getXid().equals(xid))
+      {
+         if (tx.getState() == Transaction.State.SUSPENDED)
          {
-            response = new HornetQExceptionMessage((HornetQException)e);
+            final String msg = "Cannot end, transaction is suspended";
+
+            throw new HornetQXAException(XAException.XAER_PROTO, msg);
          }
          else
          {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
+            tx = null;
          }
       }
-
-      sendResponse(packet, response, false, false);
-   }
-
-   public void handleXAEnd(final SessionXAEndMessage packet)
-   {
-      Packet response = null;
-
-      Xid xid = packet.getXid();
-
-      try
+      else
       {
-         if (tx != null && tx.getXid().equals(xid))
+         // It's also legal for the TM to call end for a Xid in the suspended
+         // state
+         // See JTA 1.1 spec 3.4.4 - state diagram
+         // Although in practice TMs rarely do this.
+         Transaction theTx = resourceManager.getTransaction(xid);
+
+         if (theTx == null)
          {
-            if (tx.getState() == Transaction.State.SUSPENDED)
-            {
-               final String msg = "Cannot end, transaction is suspended";
+            final String msg = "Cannot find suspended transaction to end " + xid;
 
-               response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-            }
-            else
-            {
-               tx = null;
-            }
+            throw new HornetQXAException(XAException.XAER_NOTA, msg);
          }
          else
          {
-            // It's also legal for the TM to call end for a Xid in the suspended
-            // state
-            // See JTA 1.1 spec 3.4.4 - state diagram
-            // Although in practice TMs rarely do this.
-            Transaction theTx = resourceManager.getTransaction(xid);
-
-            if (theTx == null)
+            if (theTx.getState() != Transaction.State.SUSPENDED)
             {
-               final String msg = "Cannot find suspended transaction to end " + xid;
+               final String msg = "Transaction is not suspended " + xid;
 
-               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+               throw new HornetQXAException(XAException.XAER_PROTO, msg);
             }
             else
             {
-               if (theTx.getState() != Transaction.State.SUSPENDED)
-               {
-                  final String msg = "Transaction is not suspended " + xid;
-
-                  response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-               }
-               else
-               {
-                  theTx.resume();
-               }
+               theTx.resume();
             }
          }
-
-         if (response == null)
-         {
-            response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-         }
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to xa end", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleXAForget(final SessionXAForgetMessage packet)
+   public void xaForget(final Xid xid) throws Exception
    {
-      long id = resourceManager.removeHeuristicCompletion(packet.getXid());
-      int code = XAResource.XA_OK;
+      long id = resourceManager.removeHeuristicCompletion(xid);
+
       if (id != -1)
       {
          try
@@ -912,402 +629,236 @@
          catch (Exception e)
          {
             e.printStackTrace();
-            code = XAException.XAER_RMERR;
+
+            throw new HornetQXAException(XAException.XAER_RMERR);
          }
       }
       else
       {
-         code = XAException.XAER_NOTA;
+         throw new HornetQXAException(XAException.XAER_NOTA);
       }
-
-      Packet response = new SessionXAResponseMessage((code != XAResource.XA_OK), code, null);
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleXAJoin(final SessionXAJoinMessage packet)
+   public void xaJoin(final Xid xid) throws Exception
    {
-      Packet response = null;
+      Transaction theTx = resourceManager.getTransaction(xid);
 
-      Xid xid = packet.getXid();
-
-      try
+      if (theTx == null)
       {
-         Transaction theTx = resourceManager.getTransaction(xid);
+         final String msg = "Cannot find xid in resource manager: " + xid;
 
-         if (theTx == null)
-         {
-            final String msg = "Cannot find xid in resource manager: " + xid;
-
-            response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
-         }
-         else
-         {
-            if (theTx.getState() == Transaction.State.SUSPENDED)
-            {
-               response = new SessionXAResponseMessage(true,
-                                                       XAException.XAER_PROTO,
-                                                       "Cannot join tx, it is suspended " + xid);
-            }
-            else
-            {
-               tx = theTx;
-
-               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-            }
-         }
+         throw new HornetQXAException(XAException.XAER_NOTA, msg);
       }
-      catch (Exception e)
+      else
       {
-         ServerSessionImpl.log.error("Failed to xa join", e);
-
-         if (e instanceof HornetQException)
+         if (theTx.getState() == Transaction.State.SUSPENDED)
          {
-            response = new HornetQExceptionMessage((HornetQException)e);
+            throw new HornetQXAException(XAException.XAER_PROTO, "Cannot join tx, it is suspended " + xid);
          }
          else
          {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
+            tx = theTx;
          }
       }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleXAResume(final SessionXAResumeMessage packet)
+   public void xaResume(final Xid xid) throws Exception
    {
-      Packet response = null;
+      if (tx != null)
+      {
+         final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
 
-      Xid xid = packet.getXid();
-
-      try
+         throw new HornetQXAException(XAException.XAER_PROTO, msg);
+      }
+      else
       {
-         if (tx != null)
+         Transaction theTx = resourceManager.getTransaction(xid);
+
+         if (theTx == null)
          {
-            final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
+            final String msg = "Cannot find xid in resource manager: " + xid;
 
-            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+            throw new HornetQXAException(XAException.XAER_NOTA, msg);
          }
          else
          {
-            Transaction theTx = resourceManager.getTransaction(xid);
-
-            if (theTx == null)
+            if (theTx.getState() != Transaction.State.SUSPENDED)
             {
-               final String msg = "Cannot find xid in resource manager: " + xid;
-
-               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+               throw new HornetQXAException(XAException.XAER_PROTO,
+                                            "Cannot resume transaction, it is not suspended " + xid);
             }
             else
             {
-               if (theTx.getState() != Transaction.State.SUSPENDED)
-               {
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XAER_PROTO,
-                                                          "Cannot resume transaction, it is not suspended " + xid);
-               }
-               else
-               {
-                  tx = theTx;
+               tx = theTx;
 
-                  tx.resume();
-
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-               }
+               tx.resume();
             }
          }
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to xa resume", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleXARollback(final SessionXARollbackMessage packet)
+   public void xaRollback(final Xid xid) throws Exception
    {
-      Packet response = null;
+      if (tx != null)
+      {
+         final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
 
-      Xid xid = packet.getXid();
-
-      try
+         throw new HornetQXAException(XAException.XAER_PROTO, msg);
+      }
+      else
       {
-         if (tx != null)
-         {
-            final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
+         Transaction theTx = resourceManager.removeTransaction(xid);
 
-            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-         }
-         else
+         if (theTx == null)
          {
-            Transaction theTx = resourceManager.removeTransaction(xid);
-
-            if (theTx == null)
+            // checked heuristic committed transactions
+            if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
             {
-               // checked heuristic committed transactions
-               if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
-               {
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XA_HEURCOM,
-                                                          "transaction has ben heuristically committed: " + xid);
-               }
-               // checked heuristic rolled back transactions
-               else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
-               {
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XA_HEURRB,
-                                                          "transaction has ben heuristically rolled back: " + xid);
-               }
-               else
-               {
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XAER_NOTA,
-                                                          "Cannot find xid in resource manager: " + xid);
-               }
+               throw new HornetQXAException(XAException.XA_HEURCOM,
+                                            "transaction has ben heuristically committed: " + xid);
             }
+            // checked heuristic rolled back transactions
+            else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+            {
+               throw new HornetQXAException(XAException.XA_HEURRB,
+                                            "transaction has ben heuristically rolled back: " + xid);
+            }
             else
             {
-               if (theTx.getState() == Transaction.State.SUSPENDED)
-               {
-                  // Put it back
-                  resourceManager.putTransaction(xid, tx);
-
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XAER_PROTO,
-                                                          "Cannot rollback transaction, it is suspended " + xid);
-               }
-               else
-               {
-                  doRollback(false, theTx);
-
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-               }
+               throw new HornetQXAException(XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
             }
          }
-      }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to xa rollback", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
          else
          {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      sendResponse(packet, response, false, false);
-   }
-
-   public void handleXAStart(final SessionXAStartMessage packet)
-   {
-      Packet response = null;
-
-      Xid xid = packet.getXid();
-
-      try
-      {
-         if (tx != null)
-         {
-            final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
-
-            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-         }
-         else
-         {
-            tx = new TransactionImpl(xid, storageManager, postOffice);
-
-            boolean added = resourceManager.putTransaction(xid, tx);
-
-            if (!added)
+            if (theTx.getState() == Transaction.State.SUSPENDED)
             {
-               final String msg = "Cannot start, there is already a xid " + tx.getXid();
+               // Put it back
+               resourceManager.putTransaction(xid, tx);
 
-               response = new SessionXAResponseMessage(true, XAException.XAER_DUPID, msg);
+               throw new HornetQXAException(XAException.XAER_PROTO,
+                                            "Cannot rollback transaction, it is suspended " + xid);
             }
             else
             {
-               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+               doRollback(false, theTx);
             }
          }
       }
-      catch (Exception e)
+   }
+
+   public void xaStart(final Xid xid) throws Exception
+   {
+      if (tx != null)
       {
-         ServerSessionImpl.log.error("Failed to xa start", e);
+         final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
 
-         if (e instanceof HornetQException)
+         throw new HornetQXAException(XAException.XAER_PROTO, msg);
+      }
+      else
+      {
+         tx = new TransactionImpl(xid, storageManager, postOffice);
+
+         boolean added = resourceManager.putTransaction(xid, tx);
+
+         if (!added)
          {
-            response = new HornetQExceptionMessage((HornetQException)e);
+            final String msg = "Cannot start, there is already a xid " + tx.getXid();
+
+            throw new HornetQXAException(XAException.XAER_DUPID, msg);
          }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
       }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleXASuspend(final Packet packet)
+   public void xaSuspend() throws Exception
    {
-      Packet response = null;
+      if (tx == null)
+      {
+         final String msg = "Cannot suspend, session is not doing work in a transaction ";
 
-      try
+         throw new HornetQXAException(XAException.XAER_PROTO, msg);
+      }
+      else
       {
-         if (tx == null)
+         if (tx.getState() == Transaction.State.SUSPENDED)
          {
-            final String msg = "Cannot suspend, session is not doing work in a transaction ";
+            final String msg = "Cannot suspend, transaction is already suspended " + tx.getXid();
 
-            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+            throw new HornetQXAException(XAException.XAER_PROTO, msg);
          }
          else
          {
-            if (tx.getState() == Transaction.State.SUSPENDED)
-            {
-               final String msg = "Cannot suspend, transaction is already suspended " + tx.getXid();
+            tx.suspend();
 
-               response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
-            }
-            else
-            {
-               tx.suspend();
-
-               tx = null;
-
-               response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-            }
+            tx = null;
          }
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to xa suspend", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleXAPrepare(final SessionXAPrepareMessage packet)
+   public void xaPrepare(final Xid xid) throws Exception
    {
-      Packet response = null;
+      if (tx != null)
+      {
+         final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
 
-      Xid xid = packet.getXid();
-
-      try
+         throw new HornetQXAException(XAException.XAER_PROTO, msg);
+      }
+      else
       {
-         if (tx != null)
+         Transaction theTx = resourceManager.getTransaction(xid);
+
+         if (theTx == null)
          {
-            final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
+            final String msg = "Cannot find xid in resource manager: " + xid;
 
-            response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+            throw new HornetQXAException(XAException.XAER_NOTA, msg);
          }
          else
          {
-            Transaction theTx = resourceManager.getTransaction(xid);
-
-            if (theTx == null)
+            if (theTx.getState() == Transaction.State.SUSPENDED)
             {
-               final String msg = "Cannot find xid in resource manager: " + xid;
-
-               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+               throw new HornetQXAException(XAException.XAER_PROTO,
+                                            "Cannot prepare transaction, it is suspended " + xid);
             }
             else
             {
-               if (theTx.getState() == Transaction.State.SUSPENDED)
-               {
-                  response = new SessionXAResponseMessage(true,
-                                                          XAException.XAER_PROTO,
-                                                          "Cannot prepare transaction, it is suspended " + xid);
-               }
-               else
-               {
-                  theTx.prepare();
-
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-               }
+               theTx.prepare();
             }
          }
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to xa prepare", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleGetInDoubtXids(final Packet packet)
+   public List<Xid> xaGetInDoubtXids()
    {
-      List<Xid> indoubtsXids = new ArrayList<Xid>();
-      indoubtsXids.addAll(resourceManager.getPreparedTransactions());
-      indoubtsXids.addAll(resourceManager.getHeuristicCommittedTransactions());
-      indoubtsXids.addAll(resourceManager.getHeuristicRolledbackTransactions());
-      Packet response = new SessionXAGetInDoubtXidsResponseMessage(indoubtsXids);
+      List<Xid> xids = new ArrayList<Xid>();
 
-      sendResponse(packet, response, false, false);
+      xids.addAll(resourceManager.getPreparedTransactions());
+      xids.addAll(resourceManager.getHeuristicCommittedTransactions());
+      xids.addAll(resourceManager.getHeuristicRolledbackTransactions());
+
+      return xids;
    }
 
-   public void handleGetXATimeout(final Packet packet)
+   public int xaGetTimeout()
    {
-      Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
-
-      sendResponse(packet, response, false, false);
+      return resourceManager.getTimeoutSeconds();
    }
 
-   public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
+   public void xaSetTimeout(final int timeout)
    {
-      Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
-
-      sendResponse(packet, response, false, false);
+      resourceManager.setTimeoutSeconds(timeout);
    }
 
-   public void handleStart(final Packet packet)
+   public void start()
    {
       setStarted(true);
-
-      sendResponse(packet, null, false, false);
    }
 
-   public void handleStop(final Packet packet)
+   public void stop()
    {
-      final Packet response = new NullResponseMessage();
-
       setStarted(false);
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleClose(final Packet packet)
+   public void close()
    {
       storageManager.afterCompleteOperations(new IOAsyncTask()
       {
@@ -1317,97 +868,63 @@
 
          public void done()
          {
-            doClose(packet);
+            try
+            {
+               doClose();
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to close session", e);
+            }
          }
       });
    }
 
-   public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
+   public void closeConsumer(final long consumerID) throws Exception
    {
-      final ServerConsumer consumer = consumers.get(packet.getConsumerID());
+      final ServerConsumer consumer = consumers.get(consumerID);
 
-      Packet response;
-
-      try
+      if (consumer != null)
       {
-         if (consumer != null)
-         {
-            consumer.close();
-         }
-         else
-         {
-            ServerSessionImpl.log.error("Cannot find consumer with id " + packet.getConsumerID());
-         }
-
-         response = new NullResponseMessage();
+         consumer.close();
       }
-      catch (Exception e)
+      else
       {
-         ServerSessionImpl.log.error("Failed to close consumer", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
+         ServerSessionImpl.log.error("Cannot find consumer with id " + consumerID);
       }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
+   public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception
    {
-      ServerConsumer consumer = consumers.get(packet.getConsumerID());
+      ServerConsumer consumer = consumers.get(consumerID);
 
       if (consumer == null)
       {
-         ServerSessionImpl.log.error("There is no consumer with id " + packet.getConsumerID());
-         
+         ServerSessionImpl.log.error("There is no consumer with id " + consumerID);
+
          return;
       }
 
-      try
-      {
-         consumer.receiveCredits(packet.getCredits());
-      }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to receive credits " + server.getConfiguration().isBackup(), e);
-      }
-
-      sendResponse(packet, null, false, false);
+      consumer.receiveCredits(credits);
    }
 
-   public void handleSendLargeMessage(final SessionSendLargeMessage packet)
+   public void sendLarge(final byte[] largeMessageHeader) throws Exception
    {
-
       // need to create the LargeMessage before continue
       long id = storageManager.generateUniqueID();
 
-      LargeServerMessage msg = doCreateLargeMessage(id, packet);
+      LargeServerMessage msg = storageManager.createLargeMessage(id, largeMessageHeader);
 
-      if (msg != null)
+      if (currentLargeMessage != null)
       {
-         if (currentLargeMessage != null)
-         {
-            ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
-         }
-
-         currentLargeMessage = msg;
-
-         sendResponse(packet, null, false, false);
+         ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
       }
+
+      currentLargeMessage = msg;
    }
 
-   public void handleSend(final SessionSendMessage packet)
+   public void send(final ServerMessage message) throws Exception
    {
-      Packet response = null;
-
-      ServerMessage message = (ServerMessage)packet.getMessage();
-
       try
       {
          long id = storageManager.generateUniqueID();
@@ -1423,30 +940,9 @@
          }
          else
          {
-            send(message);
+            doSend(message);
          }
-
-         if (packet.isRequiresResponse())
-         {
-            response = new NullResponseMessage();
-         }
       }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to send message", e);
-
-         if (packet.isRequiresResponse())
-         {
-            if (e instanceof HornetQException)
-            {
-               response = new HornetQExceptionMessage((HornetQException)e);
-            }
-            else
-            {
-               response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-            }
-         }
-      }
       finally
       {
          try
@@ -1458,74 +954,40 @@
             ServerSessionImpl.log.error("Failed to release outstanding credits", e);
          }
       }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleSendContinuations(final SessionSendContinuationMessage packet)
+   public void sendContinuations(final int packetSize, final byte[] body, final boolean continues) throws Exception
    {
-      Packet response = null;
-
-      try
+      if (currentLargeMessage == null)
       {
-         if (currentLargeMessage == null)
-         {
-            throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message not initialized on server");
-         }
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message not initialized on server");
+      }
 
-         // Immediately release the credits for the continuations- these don't contribute to the in-memory size
-         // of the message
+      // Immediately release the credits for the continuations- these don't contribute to the in-memory size
+      // of the message
 
-         releaseOutStanding(currentLargeMessage, packet.getPacketSize());
+      releaseOutStanding(currentLargeMessage, packetSize);
 
-         currentLargeMessage.addBytes(packet.getBody());
+      currentLargeMessage.addBytes(body);
 
-         if (!packet.isContinues())
-         {
-            currentLargeMessage.releaseResources();
+      if (!continues)
+      {
+         currentLargeMessage.releaseResources();
 
-            send(currentLargeMessage);
+         doSend(currentLargeMessage);
 
-            releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
+         releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
 
-            currentLargeMessage = null;
-         }
-
-         if (packet.isRequiresResponse())
-         {
-            response = new NullResponseMessage();
-         }
+         currentLargeMessage = null;
       }
-      catch (Exception e)
-      {
-         if (packet.isRequiresResponse())
-         {
-            if (e instanceof HornetQException)
-            {
-               response = new HornetQExceptionMessage((HornetQException)e);
-            }
-            else
-            {
-               ServerSessionImpl.log.error("Failed to send message", e);
-
-               response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-            }
-         }
-      }
-
-      sendResponse(packet, response, false, false);
    }
 
-   public void handleRequestProducerCredits(final SessionRequestProducerCreditsMessage packet) throws Exception
+   public void requestProducerCredits(final SimpleString address, final int credits) throws Exception
    {
-      final SimpleString address = packet.getAddress();
-
       final CreditManagerHolder holder = getCreditManagerHolder(address);
 
-      int credits = packet.getCredits();
-      
-      //Requesting -ve credits means returning them
-      
+      // Requesting -ve credits means returning them
+
       if (credits < 0)
       {
          releaseOutStanding(address, -credits);
@@ -1541,7 +1003,7 @@
                   if (!closed)
                   {
                      sendProducerCredits(holder, credits, address);
-   
+
                      return true;
                   }
                   else
@@ -1551,58 +1013,37 @@
                }
             }
          });
-   
+
          if (gotCredits > 0)
          {
             sendProducerCredits(holder, gotCredits, address);
          }
       }
-
-      sendResponse(packet, null, false, false);
    }
 
-   public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
-   { 
-      //We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get delivered
-      //after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
-      //sequence of packets.
-      //It is not sufficient to just stop the session, since right after stopping the session, another session start might be executed
-      //before we have transferred the connection, leaving it in a started state      
-      setTransferring(true);
+   public void setTransferring(final boolean transferring)
+   {
+      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
 
-      remotingConnection.removeFailureListener(this);
-      remotingConnection.removeCloseListener(this);
-
-      // Note. We do not destroy the replicating connection here. In the case the live server has really crashed
-      // then the connection will get cleaned up anyway when the server ping timeout kicks in.
-      // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
-      // the replicating connection will cause the outstanding responses to be be replayed on the live server,
-      // if these reach the client who then subsequently fails over, on reconnection to backup, it will have
-      // received responses that the backup did not know about.
-            
-      channel.transferConnection(newConnection);
-      
-      newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
-      remotingConnection = newConnection;
-
-      remotingConnection.addFailureListener(this);
-      remotingConnection.addCloseListener(this);
-
-      int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
-
-      channel.replayCommands(lastReceivedCommandID, id);
-       
-      channel.setTransferring(false);
-      
-      setTransferring(false);
-
-      return serverLastReceivedCommandID;
+      for (ServerConsumer consumer : consumersClone)
+      {
+         consumer.setTransferring(transferring);
+      }
    }
 
-   public Channel getChannel()
+   public void runConnectionFailureRunners()
    {
-      return channel;
+      for (Runnable runner : failureRunners.values())
+      {
+         try
+         {
+            runner.run();
+         }
+         catch (Throwable t)
+         {
+            ServerSessionImpl.log.error("Failed to execute failure runner", t);
+         }
+      }
    }
 
    // FailureListener implementation
@@ -1626,7 +1067,7 @@
             }
          }
 
-         handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
+         close();
 
          ServerSessionImpl.log.warn("Cleared up resources for session " + name);
       }
@@ -1654,7 +1095,7 @@
       }
       catch (Throwable t)
       {
-         ServerSessionImpl.log.error("Failed fire listeners " + this);
+         ServerSessionImpl.log.error("Failed to fire listeners " + this);
       }
 
    }
@@ -1665,129 +1106,6 @@
    // Private
    // ----------------------------------------------------------------------------
 
-   private SessionQueueQueryResponseMessage doExecuteQueueQuery(final SimpleString name) throws Exception
-   {
-      if (name == null)
-      {
-         throw new IllegalArgumentException("Queue name is null");
-      }
-      
-      SessionQueueQueryResponseMessage response;
-
-      Binding binding = postOffice.getBinding(name);
-
-      if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
-      {
-         Queue queue = (Queue)binding.getBindable();
-
-         Filter filter = queue.getFilter();
-
-         SimpleString filterString = filter == null ? null : filter.getFilterString();
-         
-         response = new SessionQueueQueryResponseMessage(name,
-                                                         binding.getAddress(),
-                                                         queue.isDurable(),
-                                                         queue.isTemporary(),
-                                                         filterString,                                                         
-                                                         queue.getConsumerCount(),
-                                                         queue.getMessageCount());
-      }
-      // make an exception for the management address (see HORNETQ-29)
-      else if (name.equals(managementAddress))
-      {
-         response = new SessionQueueQueryResponseMessage(name, managementAddress, true, false, null, -1, -1);
-      }
-      else
-      {
-         response = new SessionQueueQueryResponseMessage();
-      }
-      
-      return response;
-   }
-   
-   private void sendResponse(final Packet confirmPacket,
-                             final Packet response,
-                             final boolean flush,
-                             final boolean closeChannel)
-   {
-      storageManager.afterCompleteOperations(new IOAsyncTask()
-      {
-         public void onError(final int errorCode, final String errorMessage)
-         {
-            ServerSessionImpl.log.warn("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
-
-            HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new HornetQException(errorCode,
-                                                                                                        errorMessage));
-
-            doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
-         }
-
-         public void done()
-         {
-            doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
-         }
-      });
-   }
-
-   /**
-    * @param confirmPacket
-    * @param response
-    * @param flush
-    * @param closeChannel
-    */
-   private void doConfirmAndResponse(final Packet confirmPacket,
-                                     final Packet response,
-                                     final boolean flush,
-                                     final boolean closeChannel)
-   {
-      if (confirmPacket != null)
-      {
-         channel.confirm(confirmPacket);
-
-         if (flush)
-         {
-            channel.flushConfirmations();
-         }
-      }
-
-      if (response != null)
-      {
-         channel.send(response);
-      }
-
-      if (closeChannel)
-      {
-         channel.close();
-      }
-   }
-
-   private void doClose(final Packet packet)
-   {
-      Packet response = null;
-
-      try
-      {
-         close();
-
-         response = new NullResponseMessage();
-      }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to close", e);
-
-         if (e instanceof HornetQException)
-         {
-            response = new HornetQExceptionMessage((HornetQException)e);
-         }
-         else
-         {
-            response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
-         }
-      }
-
-      sendResponse(packet, response, true, true);
-   }
-   
    private void setStarted(final boolean s)
    {
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -1799,45 +1117,7 @@
 
       started = s;
    }
-   
-   private void setTransferring(final boolean transferring)
-   {
-      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
 
-      for (ServerConsumer consumer : consumersClone)
-      {
-         consumer.setTransferring(transferring);
-      }
-   }   
-
-   /**
-    * We need to create the LargeMessage before replicating the packet, or else we won't know how to extract the destination,
-    * which is stored on the header
-    * @param packet
-    * @throws Exception
-    */
-   private LargeServerMessage doCreateLargeMessage(final long id, final SessionSendLargeMessage packet)
-   {
-      try
-      {
-         LargeServerMessage msg = createLargeMessageStorage(id, packet.getLargeMessageHeader());
-
-         return msg;
-      }
-      catch (Exception e)
-      {
-         ServerSessionImpl.log.error("Failed to create large message", e);
-         Packet response = null;
-
-         channel.confirm(packet);
-         if (response != null)
-         {
-            channel.send(response);
-         }
-         return null;
-      }
-   }
-
    private void handleManagementMessage(final ServerMessage message) throws Exception
    {
       try
@@ -1861,15 +1141,10 @@
       {
          reply.setAddress(replyTo);
 
-         send(reply);
+         doSend(reply);
       }
    }
 
-   private LargeServerMessage createLargeMessageStorage(final long id, final byte[] header) throws Exception
-   {
-      return storageManager.createLargeMessage(id, header);
-   }
-
    private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception
    {
       boolean wasStarted = started;
@@ -1902,20 +1177,6 @@
       }
    }
 
-   private void rollback(final boolean lastMessageAsDelived) throws Exception
-   {
-      if (tx == null)
-      {
-         // Might be null if XA
-
-         tx = new TransactionImpl(storageManager);
-      }
-
-      doRollback(lastMessageAsDelived, tx);
-
-      tx = new TransactionImpl(storageManager);
-   }
-
    /*
     * The way flow producer flow control works is as follows:
     * The client can only send messages as long as it has credits. It requests credits from the server
@@ -1931,7 +1192,7 @@
    {
       releaseOutStanding(message.getAddress(), credits);
    }
-   
+
    private void releaseOutStanding(final SimpleString address, final int credits) throws Exception
    {
       CreditManagerHolder holder = getCreditManagerHolder(address);
@@ -1961,12 +1222,10 @@
    {
       holder.outstandingCredits += credits;
 
-      Packet packet = new SessionProducerCreditsMessage(credits, address, -1);
-
-      channel.send(packet);
+      callback.sendProducerCreditsMessage(credits, address, -1);
    }
 
-   private void send(final ServerMessage msg) throws Exception
+   private void doSend(final ServerMessage msg) throws Exception
    {
       // Look up the paging store
 

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2010-01-21 14:46:21 UTC (rev 8827)
@@ -1,314 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.OperationContext;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.hornetq.core.server.ServerSession;
-
-/**
- * A ServerSessionPacketHandler
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
- * @author <a href="mailto:clebert.suconic at jboss.org>Clebert Suconic</a>
- */
-public class ServerSessionPacketHandler implements ChannelHandler
-{
-   private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
-
-   private final ServerSession session;
-
-   private final OperationContext sessionContext;
-
-   // Storagemanager here is used to set the Context
-   private final StorageManager storageManager;
-
-   public ServerSessionPacketHandler(final ServerSession session,
-                                     final OperationContext sessionContext,
-                                     final StorageManager storageManager)
-   {
-      this.session = session;
-
-      this.storageManager = storageManager;
-
-      this.sessionContext = sessionContext;
-   }
-
-   public long getID()
-   {
-      return session.getID();
-   }
-
-   public void handlePacket(final Packet packet)
-   {
-      byte type = packet.getType();
-
-      storageManager.setContext(sessionContext);
-
-      try
-      {
-         switch (type)
-         {
-            case SESS_CREATECONSUMER:
-            {
-               SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
-               session.handleCreateConsumer(request);
-               break;
-            }
-            case CREATE_QUEUE:
-            {
-               CreateQueueMessage request = (CreateQueueMessage)packet;
-               session.handleCreateQueue(request);
-               break;
-            }
-            case DELETE_QUEUE:
-            {
-               SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
-               session.handleDeleteQueue(request);
-               break;
-            }
-            case SESS_QUEUEQUERY:
-            {
-               SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
-               session.handleExecuteQueueQuery(request);
-               break;
-            }
-            case SESS_BINDINGQUERY:
-            {
-               SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
-               session.handleExecuteBindingQuery(request);
-               break;
-            }
-            case SESS_ACKNOWLEDGE:
-            {
-               SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
-               session.handleAcknowledge(message);
-               break;
-            }
-            case SESS_EXPIRED:
-            {
-               SessionExpiredMessage message = (SessionExpiredMessage)packet;
-               session.handleExpired(message);
-               break;
-            }
-            case SESS_COMMIT:
-            {
-               session.handleCommit(packet);
-               break;
-            }
-            case SESS_ROLLBACK:
-            {
-               session.handleRollback((RollbackMessage)packet);
-               break;
-            }
-            case SESS_XA_COMMIT:
-            {
-               SessionXACommitMessage message = (SessionXACommitMessage)packet;
-               session.handleXACommit(message);
-               break;
-            }
-            case SESS_XA_END:
-            {
-               SessionXAEndMessage message = (SessionXAEndMessage)packet;
-               session.handleXAEnd(message);
-               break;
-            }
-            case SESS_XA_FORGET:
-            {
-               SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
-               session.handleXAForget(message);
-               break;
-            }
-            case SESS_XA_JOIN:
-            {
-               SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
-               session.handleXAJoin(message);
-               break;
-            }
-            case SESS_XA_RESUME:
-            {
-               SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
-               session.handleXAResume(message);
-               break;
-            }
-            case SESS_XA_ROLLBACK:
-            {
-               SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
-               session.handleXARollback(message);
-               break;
-            }
-            case SESS_XA_START:
-            {
-               SessionXAStartMessage message = (SessionXAStartMessage)packet;
-               session.handleXAStart(message);
-               break;
-            }
-            case SESS_XA_SUSPEND:
-            {
-               session.handleXASuspend(packet);
-               break;
-            }
-            case SESS_XA_PREPARE:
-            {
-               SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
-               session.handleXAPrepare(message);
-               break;
-            }
-            case SESS_XA_INDOUBT_XIDS:
-            {
-               session.handleGetInDoubtXids(packet);
-               break;
-            }
-            case SESS_XA_GET_TIMEOUT:
-            {
-               session.handleGetXATimeout(packet);
-               break;
-            }
-            case SESS_XA_SET_TIMEOUT:
-            {
-               SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
-               session.handleSetXATimeout(message);
-               break;
-            }
-            case SESS_START:
-            {
-               session.handleStart(packet);
-               break;
-            }
-            case SESS_STOP:
-            {
-               session.handleStop(packet);
-               break;
-            }
-            case SESS_CLOSE:
-            {
-               session.handleClose(packet);
-               break;
-            }
-            case SESS_CONSUMER_CLOSE:
-            {
-               SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
-               session.handleCloseConsumer(message);
-               break;
-            }
-            case SESS_FLOWTOKEN:
-            {
-               SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
-               session.handleReceiveConsumerCredits(message);
-               break;
-            }
-            case SESS_SEND:
-            {
-               SessionSendMessage message = (SessionSendMessage)packet;
-               session.handleSend(message);
-               break;
-            }
-            case SESS_SEND_LARGE:
-            {
-               SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
-               session.handleSendLargeMessage(message);
-               break;
-            }
-            case SESS_SEND_CONTINUATION:
-            {
-               SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
-               session.handleSendContinuations(message);
-               break;
-            }
-            case SESS_FORCE_CONSUMER_DELIVERY:
-            {
-               SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
-               session.handleForceConsumerDelivery(message);
-               break;
-            }
-            case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
-            {
-               SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)packet;
-               session.handleRequestProducerCredits(message);
-               break;
-            }
-         }
-      }
-      catch (Throwable t)
-      {
-         ServerSessionPacketHandler.log.error("Caught unexpected exception", t);
-      }
-      finally
-      {
-         storageManager.completeOperations();
-         storageManager.clearContext();
-      }
-   }
-}



More information about the hornetq-commits mailing list