[jboss-cvs] JBoss Messaging SVN: r3398 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/messaging/core/remoting/codec and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 3 09:33:46 EST 2007


Author: jmesnil
Date: 2007-12-03 09:33:46 -0500 (Mon, 03 Dec 2007)
New Revision: 3398

Added:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java
Removed:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java
Modified:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DecoderStatus.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RemotingBuffer.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544 Replace client-server transport with NIO based transport
* refactoring

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -26,11 +26,10 @@
 import org.apache.mina.filter.reqres.RequestResponseFilter;
 import org.apache.mina.filter.reqres.RequestTimeoutException;
 import org.apache.mina.filter.reqres.Response;
-import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
-import org.jboss.messaging.core.remoting.internal.ClientHandler;
-import org.jboss.messaging.core.remoting.internal.PacketInspector;
+import org.jboss.messaging.core.remoting.internal.MinaHandler;
+import org.jboss.messaging.core.remoting.internal.MinaInspector;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 
 /**
@@ -43,7 +42,6 @@
    // Attributes ----------------------------------------------------
 
    private IoSession session;
-   private SslFilter sslFilter;
 
    // By default, a blocking request will timeout after 5 seconds
    private int blockingRequestTimeout = 5;
@@ -86,7 +84,7 @@
 
       connector.getFilterChain().addLast("logger", new LoggingFilter());
 
-      connector.setHandler(new ClientHandler());
+      connector.setHandler(new MinaHandler(PacketDispatcher.client));
       connector.getSessionConfig().setKeepAlive(true);
       connector.getSessionConfig().setReuseAddress(true);
       InetSocketAddress address = new InetSocketAddress(host, port);
@@ -110,7 +108,7 @@
 
       assert connector != null;
       assert blockingScheduler != null;
-      
+
       CloseFuture closeFuture = session.close().awaitUninterruptibly();
       boolean closed = closeFuture.isClosed();
 
@@ -128,7 +126,7 @@
       }
       return Long.toString(session.getId());
    }
-   
+
    public void sendOneWay(AbstractPacket packet)
    {
       assert packet != null;
@@ -183,20 +181,10 @@
    {
       blockingScheduler = Executors.newScheduledThreadPool(1);
       RequestResponseFilter filter = new RequestResponseFilter(
-            new PacketInspector(), blockingScheduler);
+            new MinaInspector(), blockingScheduler);
       chain.addLast("reqres", filter);
    }
 
-   private void addSSLSupport(DefaultIoFilterChainBuilder chain)
-         throws Exception
-   {
-      // TODO support SSL
-      // this.sslFilter = new
-      // SslFilter(BogusSslContextFactory.getInstance(false));
-      // sslFilter.setUseClientMode(true);
-      // chain.addLast("sslFilter", sslFilter);
-   }
-
    private void checkConnected()
    {
       if (session == null)

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -11,6 +11,9 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
  * 
@@ -18,7 +21,10 @@
  */
 public class PacketDispatcher
 {
+   // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(PacketDispatcher.class);
+
    // Attributes ----------------------------------------------------
 
    private Map<String, PacketHandler> handlers;
@@ -42,6 +48,11 @@
       assertValidID(handler.getID());
       assert handler != null;
 
+      if (log.isDebugEnabled())
+      {
+         log.debug("register " + handler);
+      }
+      
       handlers.put(handler.getID(), handler);
    }
 
@@ -50,6 +61,11 @@
       assertValidID(handlerID);
 
       handlers.remove(handlerID);
+      
+      if (log.isDebugEnabled())
+      {
+         log.debug("unregister handler for " + handlerID);
+      }
    }
 
    public PacketHandler getHandler(String handlerID)
@@ -58,7 +74,23 @@
 
       return handlers.get(handlerID);
    }
+   
+   public void dispatch(AbstractPacket packet, PacketSender sender)
+   {
+      String targetID = packet.getTargetID();
+      PacketHandler handler = getHandler(targetID);
+      if (handler != null)
+      {
+         if (log.isTraceEnabled())
+            log.trace(handler + " handles " + packet);
 
+         handler.handle(packet, sender);
+      } else
+      {
+         log.warn("Unhandled packet " + packet);
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -30,11 +30,11 @@
 {
    // Constants -----------------------------------------------------
 
-   protected static final int INT_LENGTH = 4;
+   public static final int INT_LENGTH = 4;
 
-   protected static final int FLOAT_LENGTH = 4;
+   public static final int FLOAT_LENGTH = 4;
 
-   protected static final int LONG_LENGTH = 8;
+   public static final int LONG_LENGTH = 8;
 
    // Attributes ----------------------------------------------------
 

Deleted: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,75 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.codec;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
-
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
-import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- */
-public class AddCallbackMessageCodec extends
-      AbstractPacketCodec<UpdateCallbackMessage>
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public AddCallbackMessageCodec()
-   {
-      super(MSG_UPDATECALLBACK);
-   }
-
-   // AbstractPackedCodec overrides----------------------------------
-
-   @Override
-   protected void encodeBody(UpdateCallbackMessage message, RemotingBuffer out) throws Exception
-   {
-      String remotingSessionID = message.getRemotingSessionID();
-      String clientVMID = message.getClientVMID();
-      boolean add = message.isAdd();
-
-      int bodyLength = sizeof(remotingSessionID) + sizeof(clientVMID) + 1;
-
-      out.putInt(bodyLength);
-      out.putNullableString(remotingSessionID);
-      out.putNullableString(clientVMID);
-      out.putBoolean(add);
-   }
-
-   @Override
-   protected UpdateCallbackMessage decodeBody(RemotingBuffer in)
-         throws Exception
-   {
-      int bodyLength = in.getInt();
-      if (in.remaining() < bodyLength)
-      {
-         return null;
-      }
-      String remotingSessionID = MinaPacketCodec.getString(in);
-      String clientVMID = MinaPacketCodec.getString(in);
-      boolean add = in.getBoolean();
-
-      return new UpdateCallbackMessage(remotingSessionID, clientVMID, add);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DecoderStatus.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DecoderStatus.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DecoderStatus.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,7 +6,6 @@
  */
 package org.jboss.messaging.core.remoting.codec;
 
-import org.apache.mina.filter.codec.demux.MessageDecoder;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -103,7 +103,7 @@
 
       addCodec(GetTopologyResponse.class, GetTopologyResponseCodec.class);
 
-      addCodec(UpdateCallbackMessage.class, AddCallbackMessageCodec.class);
+      addCodec(UpdateCallbackMessage.class, UpdateCallbackMessageCodec.class);
 
       addCodec(CreateSessionRequest.class, CreateSessionRequestCodec.class);
 

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RemotingBuffer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RemotingBuffer.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RemotingBuffer.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -7,8 +7,6 @@
 package org.jboss.messaging.core.remoting.codec;
 
 import java.nio.charset.CharacterCodingException;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>

Copied: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java (from rev 3389, branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+
+import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class UpdateCallbackMessageCodec extends
+      AbstractPacketCodec<UpdateCallbackMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public UpdateCallbackMessageCodec()
+   {
+      super(MSG_UPDATECALLBACK);
+   }
+
+   // AbstractPackedCodec overrides----------------------------------
+
+   @Override
+   protected void encodeBody(UpdateCallbackMessage message, RemotingBuffer out) throws Exception
+   {
+      String remotingSessionID = message.getRemotingSessionID();
+      String clientVMID = message.getClientVMID();
+      boolean add = message.isAdd();
+
+      int bodyLength = sizeof(remotingSessionID) + sizeof(clientVMID) + 1;
+
+      out.putInt(bodyLength);
+      out.putNullableString(remotingSessionID);
+      out.putNullableString(clientVMID);
+      out.putBoolean(add);
+   }
+
+   @Override
+   protected UpdateCallbackMessage decodeBody(RemotingBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+      String remotingSessionID = MinaPacketCodec.getString(in);
+      String clientVMID = MinaPacketCodec.getString(in);
+      boolean add = in.getBoolean();
+
+      return new UpdateCallbackMessage(remotingSessionID, clientVMID, add);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Deleted: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.internal;
-
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.reqres.Response;
-import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
- * @version <tt>$Revision$</tt>
- */
-public class ClientHandler extends IoHandlerAdapter
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(ClientHandler.class);
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // IoHandlerAdapter overrides ------------------------------------
-
-   @Override
-   public void messageReceived(final IoSession session, Object message)
-         throws Exception
-   {
-      if (message instanceof AbstractPacket)
-      {
-         AbstractPacket packet = (AbstractPacket) message;
- 
-         if (log.isTraceEnabled())
-            log.trace("received packet " + packet);
-         
-         String targetID = packet.getTargetID();
-         PacketHandler handler = PacketDispatcher.client.getHandler(targetID);
-         if (handler != null)
-         {
-            if (log.isTraceEnabled())
-               log.trace(handler + " handles " + packet);
-            
-            handler.handle(packet, new PacketSender() {
-               public void send(AbstractPacket p)
-               {
-                  session.write(p);
-               }
-               
-            });
-         } else
-         {
-            log.error("unhandled packet " + packet);
-         }
-      } else if (message instanceof Response)
-      {
-         // response is handled by the reqres filter.
-         // do nothing
-      } else
-      {
-         log.error("unhandled remote message " + message);
-      }
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.reqres.Response;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class MinaHandler extends IoHandlerAdapter
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(MinaHandler.class);
+
+   // Attributes ----------------------------------------------------
+
+   private final PacketDispatcher dispatcher;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   
+   public MinaHandler(PacketDispatcher dispatcher)
+   {
+      this.dispatcher = dispatcher;
+   }
+
+   // Public --------------------------------------------------------
+
+   // IoHandlerAdapter overrides ------------------------------------
+
+   @Override
+   public void exceptionCaught(IoSession session, Throwable cause)
+         throws Exception
+   {
+      session.close();
+   }
+   
+   @Override
+   public void messageReceived(final IoSession session, Object message)
+         throws Exception
+   {
+      if (message instanceof Response)
+      {
+         log.trace("received response " + message);
+         // response is handled by the reqres filter.
+         // do nothing
+         return;
+      }
+
+      if (!(message instanceof AbstractPacket))
+      {
+         throw new IllegalArgumentException("Unknown message type: " + message);
+      }
+
+      AbstractPacket packet = (AbstractPacket) message;
+      PacketSender sender = new PacketSender()
+      {
+         public void send(AbstractPacket p)
+         {
+            session.write(p);
+         }
+      };
+
+      if (log.isTraceEnabled())
+         log.trace("received packet " + packet);
+
+      dispatcher.dispatch(packet, sender);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Copied: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java (from rev 3389, branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
+
+import org.apache.mina.filter.reqres.ResponseInspector;
+import org.apache.mina.filter.reqres.ResponseType;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class MinaInspector implements ResponseInspector
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // ResponseInspector implementation ------------------------------
+
+   public Object getRequestId(Object message)
+   {
+      if (!(message instanceof AbstractPacket))
+      {
+         return null;
+      }
+      AbstractPacket packet = (AbstractPacket) message;
+      if (packet.isRequest())
+      {
+         return packet.getCorrelationID();
+      } else
+      {
+         return null;
+      }
+   }
+
+   public ResponseType getResponseType(Object message)
+   {
+      if (!(message instanceof AbstractPacket))
+      {
+         return null;
+      }
+
+      return WHOLE;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -59,12 +59,6 @@
 
    private final AbstractPacketCodec<P> codec;
 
-   public MinaPacketCodec(Class<? extends AbstractPacketCodec<P>> codecClass)
-         throws Throwable
-   {
-      this.codec = (AbstractPacketCodec<P>) codecClass.newInstance();
-   }
-
    public MinaPacketCodec(AbstractPacketCodec<P> codec)
    {
       this.codec = codec;

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import java.net.InetSocketAddress;
+import java.util.Formatter;
+
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaService
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(MinaService.class);
+
+   // Attributes ----------------------------------------------------
+
+   private int port;
+
+   private NioSocketAcceptor acceptor;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+
+   public MinaService()
+   {
+   }
+
+   // Public --------------------------------------------------------
+   
+   public void setPort(int port)
+   {
+      this.port = port;  
+   }
+   
+   public int getPort()
+   {
+      return port;
+   }
+   
+   public void start() throws Exception
+   {
+      if (acceptor == null)
+      {
+         info("Starting MINA on port " + port);
+
+         acceptor = new NioSocketAcceptor();
+
+         // Prepare the configuration
+         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+         acceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
+         acceptor.getFilterChain().addLast("codec",
+               new ProtocolCodecFilter(new PacketCodecFactory()));
+         acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+
+         // Bind
+         acceptor.setLocalAddress(new InetSocketAddress("127.0.0.1", port));
+         acceptor.setReuseAddress(true);
+         acceptor.getSessionConfig().setReuseAddress(true);
+         acceptor.getSessionConfig().setKeepAlive(true);
+         acceptor.setDisconnectOnUnbind(false);
+
+         acceptor.setHandler(new MinaHandler(PacketDispatcher.server));
+         acceptor.bind();
+
+         info("MINA started");
+      } 
+   }
+   
+
+   public void stop()
+   {
+      if (acceptor != null)
+      {
+         acceptor.unbind();
+         acceptor.dispose();
+         acceptor = null;
+         
+         info("Stopped MINA ");
+      }    
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   
+   private void info(String s)
+   {
+      log.info(new Formatter().format("### %-30s ###", s).toString());
+   }
+   
+   // Inner classes -------------------------------------------------
+}

Deleted: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,69 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.internal;
-
-import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
-import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
-
-import org.apache.mina.filter.reqres.ResponseInspector;
-import org.apache.mina.filter.reqres.ResponseType;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
- * @version <tt>$Revision$</tt>
- */
-public class PacketInspector implements ResponseInspector
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // ResponseInspector implementation ------------------------------
-
-   public Object getRequestId(Object message)
-   {
-      if (!(message instanceof AbstractPacket))
-      {
-         return null;
-      }
-      long correlationID = ((AbstractPacket) message).getCorrelationID();
-      if (correlationID != NO_CORRELATION_ID)
-      {
-         return correlationID;
-      } else
-      {
-         return null;
-      }
-   }
-
-   public ResponseType getResponseType(Object message)
-   {
-      if (!(message instanceof AbstractPacket))
-      {
-         return null;
-      }
-
-      return WHOLE;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}
\ No newline at end of file

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -105,6 +105,14 @@
       setTargetID(other.getCallbackID());
    }
 
+   /**
+    * An AbstractPacket is a request if it has a target ID and a correlation ID
+    */
+   public boolean isRequest()
+   {
+      return targetID != NO_ID_SET && correlationID != NO_CORRELATION_ID;
+   }
+
    @Override
    public String toString()
    {

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -7,7 +7,6 @@
 package org.jboss.test.messaging.core.remoting;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.util.List;
 import java.util.concurrent.TimeoutException;
@@ -25,6 +24,8 @@
  */
 public class RemoteDispatcherTest extends TestSupport
 {
+   private ReversePacketHandler serverPacketHandler;
+
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -34,7 +35,7 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-
+   
    public void testCanNotSendPacketIfNotConnected() throws Exception
    {
       Client client = new Client();
@@ -51,15 +52,16 @@
 
    public void testSendOneWay() throws Exception
    {
-      serverHandler.expectMessage(1);
+      serverPacketHandler.expectMessage(1);
 
       TextPacket packet = new TextPacket("testSendOneWay");
       packet.setVersion((byte)1);
+      packet.setTargetID(serverPacketHandler.getID());
       client.sendOneWay(packet);
 
-      serverHandler.await();
+      serverPacketHandler.await();
 
-      List<AbstractPacket> messages = serverHandler.getPackets();
+      List<TextPacket> messages = serverPacketHandler.getPackets();
       assertEquals(1, messages.size());
       String response = ((TextPacket) messages.get(0)).getText();
       assertEquals(packet.getText(), response);
@@ -67,19 +69,20 @@
 
    public void testSendManyOneWay() throws Exception
    {
-      serverHandler.expectMessage(MANY_MESSAGES);
+      serverPacketHandler.expectMessage(MANY_MESSAGES);
       
       TextPacket[] packets = new TextPacket[MANY_MESSAGES];
       for (int i = 0; i < MANY_MESSAGES; i++)
       {
          packets[i] = new TextPacket("testSendManyOneWay " + i);
          packets[i].setVersion((byte)1);
+         packets[i].setTargetID(serverPacketHandler.getID());
          client.sendOneWay(packets[i]);
       }
 
-      serverHandler.await();
+      serverPacketHandler.await();
       
-      List<AbstractPacket> receivedPackets = serverHandler.getPackets();
+      List<TextPacket> receivedPackets = serverPacketHandler.getPackets();
       assertEquals(MANY_MESSAGES, receivedPackets.size());
       for (int i = 0; i < MANY_MESSAGES; i++)
       {
@@ -89,10 +92,7 @@
    }
 
    public void testSendOneWayWithCallbackHandler() throws Exception
-   {
-      // add some lag
-      serverHandler.setSleepTime(300, MILLISECONDS);
-      
+   {      
       TestPacketHandler callbackHandler = new TestPacketHandler();
       callbackHandler.expectMessage(1);
 
@@ -100,6 +100,7 @@
       
       TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
       packet.setVersion((byte)1);
+      packet.setTargetID(serverPacketHandler.getID());
       packet.setCallbackID(callbackHandler.getID());
       
       client.sendOneWay(packet);
@@ -115,7 +116,8 @@
    {
       TextPacket request = new TextPacket("testSendBlocking");
       request.setVersion((byte)1);
-
+      request.setTargetID(serverPacketHandler.getID());
+      
       AbstractPacket receivedPacket = client.sendBlocking(request);
 
       assertNotNull(receivedPacket);
@@ -126,8 +128,8 @@
 
    public void testSendBlockingWithTimeout() throws Exception
    {
-      client.setBlockingRequestTimeout(1, SECONDS);
-      serverHandler.setSleepTime(2, SECONDS);
+      client.setBlockingRequestTimeout(500, MILLISECONDS);
+      serverPacketHandler.setSleepTime(1000, MILLISECONDS);
       
       AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
       packet.setVersion((byte)1);
@@ -148,12 +150,17 @@
    {
       startServer(TestSupport.PORT, TRANSPORT);
       startClient(TestSupport.PORT, TRANSPORT);
+      
+      serverPacketHandler = new ReversePacketHandler();
+      PacketDispatcher.server.register(serverPacketHandler);
    }
 
    @Override
    protected void tearDown() throws Exception
    {
+      PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
       client.disconnect();
-      serverAcceptor.unbind();
+      stopServer();
    }
 }

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,7 +6,14 @@
  */
 package org.jboss.test.messaging.core.remoting;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static junit.framework.Assert.fail;
+import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
+import static org.jboss.messaging.core.remoting.Constants.NO_ID_SET;
 import static org.jboss.test.messaging.core.remoting.TestSupport.reverse;
+
+import java.util.concurrent.TimeUnit;
+
 import junit.framework.Assert;
 
 import org.jboss.messaging.core.remoting.PacketSender;
@@ -15,9 +22,9 @@
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
+ * 
  * @version <tt>$Revision$</tt>
- *
+ * 
  */
 public class ReversePacketHandler extends TestPacketHandler
 {
@@ -25,26 +32,57 @@
 
    // Attributes ----------------------------------------------------
 
+   private int sleepTime;
+   private TimeUnit timeUnit;
+   private PacketSender lastSender;
+ 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
 
-   // TestPacketHandler overrides ---------------------------------------------------
-
+   public void setSleepTime(int sleepTime, TimeUnit timeUnit)
+   {
+      this.sleepTime = sleepTime;
+      this.timeUnit = timeUnit;
+   }
+   
+   public PacketSender getLastSender()
+   {
+      return lastSender;
+   }
+   
+   // TestPacketHandler overrides -----------------------------------
+   
    @Override
    protected void doHandle(AbstractPacket packet, PacketSender sender)
    {
       Assert.assertTrue(packet instanceof TextPacket);
+
+      lastSender = sender;
+
+      if (sleepTime > 0)
+      {
+         try
+         {
+            Thread.sleep(MILLISECONDS.convert(sleepTime, timeUnit));
+         } catch (InterruptedException e)
+         {
+            fail();
+         }
+      }
       
       TextPacket request = (TextPacket) packet;
-      TextPacket response = new TextPacket(reverse(request.getText()));
-      response.normalize(request);
-      
-      sender.send(response);
+      if (!NO_ID_SET.equals(request.getCallbackID())
+            || request.getCorrelationID() != NO_CORRELATION_ID)
+      {
+         TextPacket response = new TextPacket(reverse(request.getText()));
+         response.normalize(request);
+         sender.send(response);
+      }
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,157 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.core.remoting;
-
-import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
-import static org.jboss.messaging.core.remoting.Constants.NO_ID_SET;
-import static org.jboss.messaging.core.remoting.TransportType.HTTP;
-import static org.jboss.test.messaging.core.remoting.TestSupport.reverse;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.TextPacket;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class ReverseServerHandler extends IoHandlerAdapter
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private List<AbstractPacket> packets = new ArrayList<AbstractPacket>();
-
-   private long sleepTime = 0;
-
-   private List<IoSession> sessions = new ArrayList<IoSession>();
-
-   private TransportType transport;
-
-   private CountDownLatch latch;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   ReverseServerHandler(TransportType transport)
-   {
-      this.transport = transport;
-   }
-
-   // Public --------------------------------------------------------
-
-   public void setSleepTime(long time, TimeUnit unit)
-   {
-      this.sleepTime = unit.toMillis(time);
-   }
-
-   public List<AbstractPacket> getPackets()
-   {
-      return packets;
-   }
-
-   public List<IoSession> getSessions()
-   {
-      return sessions;
-   }
-
-   // IoHandlerAdapter overrides ------------------------------------
-
-   @Override
-   public void messageReceived(IoSession session, Object msg)
-   {
-      try
-      {
-         Thread.sleep(sleepTime);
-      } catch (InterruptedException e)
-      {
-         e.printStackTrace();
-      }
-
-      packets.add((AbstractPacket) msg);
-
-      // TODO put this logic in the real server handler
-      if (((AbstractPacket) msg).getCorrelationID() == NO_CORRELATION_ID
-            && transport == HTTP)
-      {
-         session.write(new NullPacket());
-      }
-
-      if (msg instanceof TextPacket)
-      {
-         TextPacket incomingPacket = (TextPacket) msg;
-         if (mustReply(incomingPacket))
-         {
-            TextPacket p = new TextPacket(reverse(incomingPacket.getText()));
-            p.setCorrelationID(incomingPacket.getCorrelationID());
-            p.setVersion(incomingPacket.getVersion());
-            if (!NO_ID_SET.equals(incomingPacket.getCallbackID()))
-            {
-               p.setTargetID(incomingPacket.getCallbackID());
-            }
-            session.write(p);
-         }
-      }
-
-      if (latch != null)
-         latch.countDown();
-      
-   }
-
-   @Override
-   public void sessionCreated(IoSession session) throws Exception
-   {
-      sessions.add(session);
-   }
-
-   @Override
-   public void sessionClosed(IoSession session) throws Exception
-   {
-      super.sessionClosed(session);
-      sessions.remove(session);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   private boolean mustReply(AbstractPacket incomingPacket)
-   {
-      boolean mustReply = (incomingPacket.getCorrelationID() != NO_CORRELATION_ID)
-            || (!NO_ID_SET.equals(incomingPacket.getCallbackID()));
-      return mustReply;
-   }
-
-   public void await() throws InterruptedException
-   {
-      latch.await();
-   }
-
-   /**
-    * @param manyMessages
-    */
-   public void expectMessage(int count)
-   {
-      latch = new CountDownLatch(count);
-   }
-
-   // Inner classes -------------------------------------------------
-
-}
\ No newline at end of file

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,15 +6,10 @@
  */
 package org.jboss.test.messaging.core.remoting;
 
-
-import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.mina.common.IoSession;
-import org.jboss.messaging.core.remoting.AbstractPacketHandler;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
 
 /**
@@ -28,6 +23,8 @@
 
    // Attributes ----------------------------------------------------
 
+   private ReversePacketHandler serverPacketHandler;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -36,27 +33,29 @@
 
    public void testClientHandlePacketSentByServer() throws Exception
    {
-      TestPacketHandler clientHandler = new TestPacketHandler();      
+      TestPacketHandler clientHandler = new TestPacketHandler();
       PacketDispatcher.client.register(clientHandler);
 
-      serverHandler.expectMessage(1);
+      serverPacketHandler.expectMessage(1);
       clientHandler.expectMessage(1);
 
       TextPacket packet = new TextPacket(
-      "testClientHandlePacketSentByServer from client");
-      packet.setVersion((byte)1);
-      // send a packet to create the IoSession on the server
+            "testClientHandlePacketSentByServer from client");
+      packet.setVersion((byte) 1);
+      packet.setTargetID(serverPacketHandler.getID());
+      // send a packet to create a sender when the server
+      // handles the packet
       client.sendOneWay(packet);
 
-      serverHandler.await();
-      
-      assertEquals(1, serverHandler.getSessions().size());
-      IoSession serverSession = serverHandler.getSessions().get(0);
+      serverPacketHandler.await();
+
+      assertNotNull(serverPacketHandler.getLastSender());
+      PacketSender sender = serverPacketHandler.getLastSender();
       TextPacket packetFromServer = new TextPacket(
             "testClientHandlePacketSentByServer from server");
-      packetFromServer.setVersion((byte)1);
+      packetFromServer.setVersion((byte) 1);
       packetFromServer.setTargetID(clientHandler.getID());
-      serverSession.write(packetFromServer);
+      sender.send(packetFromServer);
 
       clientHandler.await();
 
@@ -72,12 +71,17 @@
    {
       startServer(PORT, TRANSPORT);
       startClient(PORT, TRANSPORT);
+      
+      serverPacketHandler = new ReversePacketHandler();
+      PacketDispatcher.server.register(serverPacketHandler);
    }
 
    public void tearDown() throws Exception
    {
+      PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
       client.disconnect();
-      serverAcceptor.unbind();
+      stopServer();
    }
 
    // Package protected ---------------------------------------------
@@ -87,24 +91,4 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
-
-   private final class ClientTargetHandler extends AbstractPacketHandler
-   {
-      private final List<TextPacket> packets;
-
-      private ClientTargetHandler()
-      {
-         packets = new ArrayList<TextPacket>();
-      }
-
-      public void handle(AbstractPacket packet, PacketSender replier)
-      {
-         packets.add((TextPacket) packet);
-      }
-
-      public List<TextPacket> getPackets()
-      {
-         return packets;
-      }
-   }
 }

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -15,12 +15,12 @@
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
 
-class TestPacketHandler extends AbstractPacketHandler
+public class TestPacketHandler extends AbstractPacketHandler
 {
    private final List<TextPacket> packets;
    private CountDownLatch latch;
 
-   TestPacketHandler()
+   public TestPacketHandler()
    {
       packets = new ArrayList<TextPacket>();
    }
@@ -40,10 +40,10 @@
    {
       packets.add((TextPacket) packet);
       
+      doHandle(packet, sender);
+
       if (latch != null)
          latch.countDown();
-      
-      doHandle(packet, sender);
    }
    
    protected void doHandle(AbstractPacket packet, PacketSender sender)

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,23 +6,12 @@
  */
 package org.jboss.test.messaging.core.remoting;
 
-import static org.jboss.messaging.core.remoting.TransportType.HTTP;
 import static org.jboss.messaging.core.remoting.TransportType.TCP;
-
-import java.net.InetSocketAddress;
-
 import junit.framework.TestCase;
 
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.logging.LoggingFilter;
-import org.apache.mina.filter.logging.MdcInjectionFilter;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.jboss.messaging.core.remoting.Client;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
+import org.jboss.messaging.core.remoting.internal.MinaService;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -43,14 +32,10 @@
 
    // Attributes ----------------------------------------------------
 
-   NioSocketAcceptor serverAcceptor;
-
-   ReverseServerHandler serverHandler;
-   
-   ReversePacketHandler serverPacketHandler = new ReversePacketHandler();
-
    Client client;
 
+   private MinaService service;
+
    public static final int PORT = 9090;
 
    // Static --------------------------------------------------------
@@ -81,53 +66,19 @@
    void startServer(int port, TransportType transport) throws Exception
    {
       startServer(port, transport, false);
-      PacketDispatcher.server.register(serverPacketHandler);
    }
 
    void startServer(int port, TransportType transport, boolean useSSL)
          throws Exception
    {
-      serverAcceptor = new NioSocketAcceptor();
-      serverHandler = new ReverseServerHandler(transport);
-
-      MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
-      serverAcceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
-
-      if (useSSL)
-         addSSLSupport(serverAcceptor.getFilterChain());
-
-      info("using " + transport);
-      if (transport == TCP)
-      {
-         serverAcceptor.getFilterChain().addLast("codec",
-               new ProtocolCodecFilter(new PacketCodecFactory()));
-      } else
-      {
-         assert transport == HTTP;
-
-         // TODO add http support
-         // serverAcceptor.getFilterChain().addLast("http_codec",
-         // new ProtocolCodecFilter(new HttpServerCodecFactory()));
-         // serverAcceptor.getFilterChain().addLast("http_filter",
-         // new HTTPFilter(true));
-      }
-      serverAcceptor.getFilterChain().addLast("logger", new LoggingFilter());
-
-      // Bind
-      serverAcceptor.setLocalAddress(new InetSocketAddress(port));
-      info("listening on " + port);
-      serverAcceptor.setHandler(serverHandler);
-      serverAcceptor.bind();
+      service = new MinaService();
+      service.setPort(port);
+      service.start();
    }
-
-   private void addSSLSupport(DefaultIoFilterChainBuilder chain)
-         throws Exception
+   
+   void stopServer()
    {
-      // TODO add SSL support
-      // SslFilter sslFilter = new
-      // SslFilter(BogusSslContextFactory.getInstance(true));
-      // chain.addLast("sslFilter", sslFilter);
-      // info("SSL enabled");
+      service.stop();
    }
 
    void startClient(int port, TransportType transport) throws Exception

Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,108 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.core.remoting.internal;
-
-import static java.util.UUID.randomUUID;
-import junit.framework.TestCase;
-
-import org.apache.mina.common.IoBuffer;
-import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
- * @version <tt>$Revision$</tt>
- */
-public class MinaRemotingBuffer extends TestCase
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   private RemotingBuffer wrapper;
-   private IoBuffer buffer;
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      buffer = IoBuffer.allocate(256);
-      buffer.setAutoExpand(true);
-      wrapper = new MinaPacketCodec.BufferWrapper(buffer);
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      wrapper = null;
-      buffer = null;
-
-   }
-
-   public void testNullString() throws Exception
-   {
-      assertNull(putAndGetNullableString(null));
-   }
-
-   public void testEmptyString() throws Exception
-   {
-      String result = putAndGetNullableString("");
-
-      assertNotNull(result);
-      assertEquals("", result);
-   }
-
-   public void testNonEmptyString() throws Exception
-   {
-      String junk = randomUUID().toString();
-
-      String result = putAndGetNullableString(junk);
-
-      assertNotNull(result);
-      assertEquals(junk, result);
-   }
-
-   public void testPutTrueBoolean() throws Exception
-   {
-      wrapper.putBoolean(true);
-      
-      buffer.flip();
-      
-      assertTrue(wrapper.getBoolean());
-   }
-
-   public void testPutFalseBoolean() throws Exception
-   {
-      wrapper.putBoolean(false);
-      
-      buffer.flip();
-      
-      assertFalse(wrapper.getBoolean());
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   private String putAndGetNullableString(String nullableString) throws Exception
-   {
-      wrapper.putNullableString(nullableString);
-
-      buffer.flip();
-      
-      return wrapper.getNullableString();
-   }
-   // Inner classes -------------------------------------------------
-}

Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java (from rev 3389, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,108 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.internal;
+
+import static java.util.UUID.randomUUID;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.IoBuffer;
+import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
+import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class MinaRemotingBufferTest extends TestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   private RemotingBuffer wrapper;
+   private IoBuffer buffer;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      buffer = IoBuffer.allocate(256);
+      buffer.setAutoExpand(true);
+      wrapper = new MinaPacketCodec.BufferWrapper(buffer);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      wrapper = null;
+      buffer = null;
+
+   }
+
+   public void testNullString() throws Exception
+   {
+      assertNull(putAndGetNullableString(null));
+   }
+
+   public void testEmptyString() throws Exception
+   {
+      String result = putAndGetNullableString("");
+
+      assertNotNull(result);
+      assertEquals("", result);
+   }
+
+   public void testNonEmptyString() throws Exception
+   {
+      String junk = randomUUID().toString();
+
+      String result = putAndGetNullableString(junk);
+
+      assertNotNull(result);
+      assertEquals(junk, result);
+   }
+
+   public void testPutTrueBoolean() throws Exception
+   {
+      wrapper.putBoolean(true);
+      
+      buffer.flip();
+      
+      assertTrue(wrapper.getBoolean());
+   }
+
+   public void testPutFalseBoolean() throws Exception
+   {
+      wrapper.putBoolean(false);
+      
+      buffer.flip();
+      
+      assertFalse(wrapper.getBoolean());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private String putAndGetNullableString(String nullableString) throws Exception
+   {
+      wrapper.putNullableString(nullableString);
+
+      buffer.flip();
+      
+      return wrapper.getNullableString();
+   }
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,6 +6,8 @@
  */
 package org.jboss.test.messaging.core.remoting.wireformat;
 
+import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.LONG_LENGTH;
+import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.sizeof;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_BROWSER_RESET;
@@ -93,7 +95,7 @@
 import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveriesRequestCodec;
 import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveryRequestCodec;
 import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveryResponseCodec;
-import org.jboss.messaging.core.remoting.codec.AddCallbackMessageCodec;
+import org.jboss.messaging.core.remoting.codec.UpdateCallbackMessageCodec;
 import org.jboss.messaging.core.remoting.codec.AddTemporaryDestinationMessageCodec;
 import org.jboss.messaging.core.remoting.codec.BrowserHasNextMessageResponseCodec;
 import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockRequestCodec;
@@ -225,6 +227,20 @@
       return randomString().getBytes();
    }
 
+   private static void checkHeader(RemotingBuffer buffer, AbstractPacket packet)
+         throws Exception
+   {
+      assertEquals(buffer.get(), packet.getType().byteValue());
+      assertEquals(buffer.get(), packet.getVersion());
+
+      int headerLength = LONG_LENGTH + sizeof(packet.getTargetID())
+            + sizeof(packet.getCallbackID());
+      assertEquals(buffer.getInt(), headerLength);
+      assertEquals(buffer.getLong(), packet.getCorrelationID());
+      assertEquals(buffer.getNullableString(), packet.getTargetID());
+      assertEquals(buffer.getNullableString(), packet.getCallbackID());
+   }
+
    private static void assertEqualsAcks(List<Ack> expected, List<Ack> actual)
    {
       assertEquals(expected.size(), actual.size());
@@ -329,10 +345,17 @@
       packet.setCorrelationID(randomLong());
       packet.setTargetID(randomString());
 
-      AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
-            NULL, NullPacket.class);
-      AbstractPacket decodedPacket = encodeAndDecode(packet, codec);
+      AbstractPacketCodec<AbstractPacket> codec = PacketCodecFactory
+            .createCodecForEmptyPacket(NULL, NullPacket.class);
 
+      SimpleRemotingBuffer buffer = encode(packet, codec);
+      checkHeader(buffer, packet);
+      // no body
+      assertEquals(0, buffer.getInt());
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
       assertTrue(decodedPacket instanceof NullPacket);
       NullPacket p = (NullPacket) decodedPacket;
 
@@ -350,9 +373,13 @@
       JMSExceptionMessage message = new JMSExceptionMessage(e);
       addVersion(message);
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new JMSExceptionMessageCodec());
+      AbstractPacketCodec<JMSExceptionMessage> codec = new JMSExceptionMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
       assertTrue(decodedPacket instanceof JMSExceptionMessage);
       JMSExceptionMessage decodedMessage = (JMSExceptionMessage) decodedPacket;
 
@@ -365,10 +392,16 @@
    {
       TextPacket packet = new TextPacket("testTextPacket");
       addVersion(packet);
+      AbstractPacketCodec<TextPacket> codec = new TextPacketCodec();
 
-      AbstractPacket decodedPacket = encodeAndDecode(packet,
-            new TextPacketCodec());
+      SimpleRemotingBuffer buffer = encode(packet, codec);
+      checkHeader(buffer, packet);
+      assertEquals(buffer.getInt(), sizeof(packet.getText()));
+      assertEquals(buffer.getNullableString(), packet.getText());
+      buffer.rewind();
 
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
       assertTrue(decodedPacket instanceof TextPacket);
       TextPacket p = (TextPacket) decodedPacket;
 
@@ -388,9 +421,12 @@
       CreateConnectionRequest request = new CreateConnectionRequest(version,
             remotingSessionID, clientVMID, failedNodeID, username, password);
       addVersion(request);
+      AbstractPacketCodec<CreateConnectionRequest> codec = new ConnectionFactoryCreateConnectionRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new ConnectionFactoryCreateConnectionRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateConnectionRequest);
       CreateConnectionRequest decodedRequest = (CreateConnectionRequest) decodedPacket;
@@ -410,9 +446,12 @@
       CreateConnectionResponse response = new CreateConnectionResponse(
             randomString(), 1234);
       addVersion(response);
+      AbstractPacketCodec<CreateConnectionResponse> codec = new ConnectionFactoryCreateConnectionResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new ConnectionFactoryCreateConnectionResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateConnectionResponse);
 
@@ -430,7 +469,10 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_GETCLIENTAOPSTACK, GetClientAOPStackRequest.class);
-      AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof GetClientAOPStackRequest);
       assertEquals(REQ_GETCLIENTAOPSTACK, decodedPacket.getType());
@@ -442,9 +484,12 @@
 
       GetClientAOPStackResponse response = new GetClientAOPStackResponse(stack);
       addVersion(response);
+      AbstractPacketCodec codec = new GetClientAOPStackResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new GetClientAOPStackResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof GetClientAOPStackResponse);
       GetClientAOPStackResponse decodedResponse = (GetClientAOPStackResponse) decodedPacket;
@@ -459,8 +504,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_GETTOPOLOGY, GetTopologyRequest.class);
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof GetTopologyRequest);
       assertEquals(REQ_GETTOPOLOGY, decodedPacket.getType());
@@ -479,9 +527,12 @@
             new HashMap());
       GetTopologyResponse response = new GetTopologyResponse(topology);
       addVersion(response);
+      AbstractPacketCodec codec = new GetTopologyResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new GetTopologyResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof GetTopologyResponse);
       GetTopologyResponse decodedResponse = (GetTopologyResponse) decodedPacket;
@@ -494,9 +545,12 @@
       UpdateCallbackMessage message = new UpdateCallbackMessage(randomString(),
             randomString(), true);
       addVersion(message);
+      AbstractPacketCodec codec = new UpdateCallbackMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new AddCallbackMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof UpdateCallbackMessage);
 
@@ -512,9 +566,12 @@
    {
       CreateSessionRequest request = new CreateSessionRequest(true, 0, false);
       addVersion(request);
+      AbstractPacketCodec codec = new CreateSessionRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new CreateSessionRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateSessionRequest);
 
@@ -531,9 +588,12 @@
       CreateSessionResponse response = new CreateSessionResponse(
             randomString(), 23, false);
       addVersion(response);
+      AbstractPacketCodec codec = new CreateSessionResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new CreateSessionResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateSessionResponse);
 
@@ -549,9 +609,12 @@
    {
       IDBlockRequest request = new IDBlockRequest(23);
       addVersion(request);
+      AbstractPacketCodec codec = new IDBlockRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new IDBlockRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof IDBlockRequest);
 
@@ -565,9 +628,12 @@
       IDBlockResponse response = new IDBlockResponse(randomLong(),
             randomLong() * 2);
       addVersion(response);
+      AbstractPacketCodec codec = new IDBlockResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new IDBlockResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof IDBlockResponse);
 
@@ -582,8 +648,12 @@
       SendMessage packet = new SendMessage(new JBossMessage(System
             .currentTimeMillis()), true, randomLong());
       addVersion(packet);
+      AbstractPacketCodec codec = new SendMessageCodec();
+      SimpleRemotingBuffer buffer = encode(packet, codec);
+      checkHeader(buffer, packet);
+      buffer.rewind();
 
-      AbstractPacket p = encodeAndDecode(packet, new SendMessageCodec());
+      AbstractPacket p = codec.decode(buffer);
 
       assertTrue(p instanceof SendMessage);
 
@@ -603,9 +673,12 @@
       CreateConsumerRequest request = new CreateConsumerRequest(destination,
             "color = 'red'", false, "subscription", false, false);
       addVersion(request);
+      AbstractPacketCodec codec = new CreateConsumerRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new CreateConsumerRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateConsumerRequest);
 
@@ -627,9 +700,12 @@
       CreateDestinationRequest request = new CreateDestinationRequest(
             "testCreateDestinationRequest", false);
       addVersion(request);
+      AbstractPacketCodec codec = new CreateDestinationRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new CreateDestinationRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateDestinationRequest);
 
@@ -646,9 +722,12 @@
       CreateDestinationResponse response = new CreateDestinationResponse(
             destination);
       addVersion(response);
+      AbstractPacketCodec codec = new CreateDestinationResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new CreateDestinationResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateDestinationResponse);
 
@@ -665,9 +744,12 @@
       CreateDestinationResponse response = new CreateDestinationResponse(
             destination);
       addVersion(response);
+      AbstractPacketCodec codec = new CreateDestinationResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new CreateDestinationResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateDestinationResponse);
 
@@ -683,9 +765,12 @@
       CreateConsumerResponse response = new CreateConsumerResponse(
             randomString(), 23, 42, randomLong());
       addVersion(response);
+      AbstractPacketCodec codec = new CreateConsumerResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new CreateConsumerResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateConsumerResponse);
 
@@ -705,8 +790,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             MSG_STARTCONNECTION, StartConnectionMessage.class);
+      SimpleRemotingBuffer buffer = encode(packet, codec);
+      checkHeader(buffer, packet);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(packet, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof StartConnectionMessage);
       assertEquals(MSG_STARTCONNECTION, decodedPacket.getType());
@@ -719,8 +807,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             MSG_STOPCONNECTION, StopConnectionMessage.class);
+      SimpleRemotingBuffer buffer = encode(packet, codec);
+      checkHeader(buffer, packet);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(packet, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof StopConnectionMessage);
       assertEquals(MSG_STOPCONNECTION, decodedPacket.getType());
@@ -730,9 +821,12 @@
    {
       ChangeRateMessage message = new ChangeRateMessage(0.63f);
       addVersion(message);
+      AbstractPacketCodec codec = new ChangeRateMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new ChangeRateMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof ChangeRateMessage);
       ChangeRateMessage decodedMessage = (ChangeRateMessage) decodedPacket;
@@ -746,9 +840,12 @@
       DeliverMessage message = new DeliverMessage(msg, randomString(),
             randomLong(), 23);
       addVersion(message);
+      AbstractPacketCodec codec = new DeliverMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new DeliverMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof DeliverMessage);
       DeliverMessage decodedMessage = (DeliverMessage) decodedPacket;
@@ -766,9 +863,12 @@
       AcknowledgeDeliveryRequest request = new AcknowledgeDeliveryRequest(
             randomLong());
       addVersion(request);
+      AbstractPacketCodec codec = new AcknowledgeDeliveryRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new AcknowledgeDeliveryRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof AcknowledgeDeliveryRequest);
       AcknowledgeDeliveryRequest decodedRequest = (AcknowledgeDeliveryRequest) decodedPacket;
@@ -786,9 +886,12 @@
       AcknowledgeDeliveriesMessage request = new AcknowledgeDeliveriesMessage(
             acks);
       addVersion(request);
+      AbstractPacketCodec codec = new AcknowledgeDeliveriesRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new AcknowledgeDeliveriesRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof AcknowledgeDeliveriesMessage);
       AcknowledgeDeliveriesMessage decodedRequest = (AcknowledgeDeliveriesMessage) decodedPacket;
@@ -801,9 +904,12 @@
       AcknowledgeDeliveryResponse response = new AcknowledgeDeliveryResponse(
             true);
       addVersion(response);
+      AbstractPacketCodec codec = new AcknowledgeDeliveryResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new AcknowledgeDeliveryResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof AcknowledgeDeliveryResponse);
       AcknowledgeDeliveryResponse decodedResponse = (AcknowledgeDeliveryResponse) decodedPacket;
@@ -816,9 +922,12 @@
    {
       ClosingRequest request = new ClosingRequest(randomLong());
       addVersion(request);
+      AbstractPacketCodec codec = new ClosingRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new ClosingRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof ClosingRequest);
       ClosingRequest decodedRequest = (ClosingRequest) decodedPacket;
@@ -830,9 +939,12 @@
    {
       ClosingResponse response = new ClosingResponse(System.currentTimeMillis());
       addVersion(response);
+      AbstractPacketCodec codec = new ClosingResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new ClosingResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof ClosingResponse);
       ClosingResponse decodedRequest = (ClosingResponse) decodedPacket;
@@ -847,8 +959,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             MSG_CLOSE, CloseMessage.class);
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CloseMessage);
       CloseMessage decodedMessage = (CloseMessage) decodedPacket;
@@ -864,9 +979,12 @@
 
       SendTransactionMessage message = new SendTransactionMessage(tr, true);
       addVersion(message);
+      AbstractPacketCodec codec = new SendTransactionMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new SendTransactionMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof SendTransactionMessage);
       SendTransactionMessage decodedMessage = (SendTransactionMessage) decodedPacket;
@@ -888,8 +1006,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_GETPREPAREDTRANSACTIONS, GetPreparedTransactionsRequest.class);
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof GetPreparedTransactionsRequest);
       assertEquals(REQ_GETPREPAREDTRANSACTIONS, decodedPacket.getType());
@@ -904,9 +1025,12 @@
       GetPreparedTransactionsResponse response = new GetPreparedTransactionsResponse(
             xids);
       addVersion(response);
+      AbstractPacketCodec codec = new GetPreparedTransactionsResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new GetPreparedTransactionsResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof GetPreparedTransactionsResponse);
       GetPreparedTransactionsResponse decodedResponse = (GetPreparedTransactionsResponse) decodedPacket;
@@ -921,8 +1045,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_GETCLIENTID, GetClientIDRequest.class);
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof GetClientIDRequest);
       assertEquals(REQ_GETCLIENTID, decodedPacket.getType());
@@ -932,9 +1059,12 @@
    {
       GetClientIDResponse response = new GetClientIDResponse(randomString());
       addVersion(response);
+      AbstractPacketCodec codec = new GetClientIDResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new GetClientIDResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof GetClientIDResponse);
       GetClientIDResponse decodedResponse = (GetClientIDResponse) decodedPacket;
@@ -946,9 +1076,12 @@
    {
       SetClientIDMessage message = new SetClientIDMessage(randomString());
       addVersion(message);
+      AbstractPacketCodec codec = new SetClientIDMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new SetClientIDMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof SetClientIDMessage);
       SetClientIDMessage decodedMessage = (SetClientIDMessage) decodedPacket;
@@ -969,9 +1102,12 @@
       RecoverDeliveriesMessage message = new RecoverDeliveriesMessage(
             deliveries, randomString());
       addVersion(message);
+      AbstractPacketCodec codec = new RecoverDeliveriesMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new RecoverDeliveriesMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof RecoverDeliveriesMessage);
       RecoverDeliveriesMessage decodedMessage = (RecoverDeliveriesMessage) decodedPacket;
@@ -986,9 +1122,12 @@
       Cancel cancel = new DefaultCancel(randomLong(), 23, true, false);
       CancelDeliveryMessage message = new CancelDeliveryMessage(cancel);
       addVersion(message);
+      AbstractPacketCodec codec = new CancelDeliveryMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new CancelDeliveryMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CancelDeliveryMessage);
       CancelDeliveryMessage decodedMessage = (CancelDeliveryMessage) decodedPacket;
@@ -1010,9 +1149,12 @@
 
       CancelDeliveriesMessage message = new CancelDeliveriesMessage(cancels);
       addVersion(message);
+      AbstractPacketCodec codec = new CancelDeliveriesMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new CancelDeliveriesMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CancelDeliveriesMessage);
       CancelDeliveriesMessage decodedMessage = (CancelDeliveriesMessage) decodedPacket;
@@ -1027,9 +1169,12 @@
       CreateBrowserRequest request = new CreateBrowserRequest(destination,
             "color = 'red'");
       addVersion(request);
+      AbstractPacketCodec codec = new CreateBrowserRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request,
-            new CreateBrowserRequestCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateBrowserRequest);
 
@@ -1043,9 +1188,12 @@
    {
       CreateBrowserResponse response = new CreateBrowserResponse(randomString());
       addVersion(response);
+      AbstractPacketCodec codec = new CreateBrowserResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new CreateBrowserResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof CreateBrowserResponse);
 
@@ -1061,8 +1209,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             MSG_BROWSER_RESET, BrowserResetMessage.class);
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof BrowserResetMessage);
       assertEquals(MSG_BROWSER_RESET, decodedPacket.getType());
@@ -1075,8 +1226,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_BROWSER_HASNEXTMESSAGE, BrowserHasNextMessageRequest.class);
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof BrowserHasNextMessageRequest);
       assertEquals(REQ_BROWSER_HASNEXTMESSAGE, decodedPacket.getType());
@@ -1087,9 +1241,12 @@
       BrowserHasNextMessageResponse response = new BrowserHasNextMessageResponse(
             false);
       addVersion(response);
+      AbstractPacketCodec codec = new BrowserHasNextMessageResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new BrowserHasNextMessageResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof BrowserHasNextMessageResponse);
 
@@ -1105,8 +1262,11 @@
 
       AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
             REQ_BROWSER_NEXTMESSAGE, BrowserNextMessageRequest.class);
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof BrowserNextMessageRequest);
       assertEquals(REQ_BROWSER_NEXTMESSAGE, decodedPacket.getType());
@@ -1117,9 +1277,12 @@
       JBossMessage msg = new JBossMessage(randomLong());
       BrowserNextMessageResponse response = new BrowserNextMessageResponse(msg);
       addVersion(response);
+      AbstractPacketCodec codec = new BrowserNextMessageResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new BrowserNextMessageResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof BrowserNextMessageResponse);
       BrowserNextMessageResponse decodedResponse = (BrowserNextMessageResponse) decodedPacket;
@@ -1133,9 +1296,13 @@
       BrowserNextMessageBlockRequest request = new BrowserNextMessageBlockRequest(
             randomLong());
       addVersion(request);
-      
-      AbstractPacket decodedPacket = encodeAndDecode(request, new BrowserNextMessageBlockRequestCodec());
+      AbstractPacketCodec codec = new BrowserNextMessageBlockRequestCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      buffer.rewind();
 
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
       assertTrue(decodedPacket instanceof BrowserNextMessageBlockRequest);
       BrowserNextMessageBlockRequest decodedRequest = (BrowserNextMessageBlockRequest) decodedPacket;
       assertEquals(REQ_BROWSER_NEXTMESSAGEBLOCK, decodedPacket.getType());
@@ -1150,9 +1317,12 @@
       BrowserNextMessageBlockResponse response = new BrowserNextMessageBlockResponse(
             messages);
       addVersion(response);
+      AbstractPacketCodec codec = new BrowserNextMessageBlockResponseCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(response,
-            new BrowserNextMessageBlockResponseCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof BrowserNextMessageBlockResponse);
       BrowserNextMessageBlockResponse decodedResponse = (BrowserNextMessageBlockResponse) decodedPacket;
@@ -1168,9 +1338,12 @@
       UnsubscribeMessage message = new UnsubscribeMessage(
             "testUnsubscribeMessage");
       addVersion(message);
+      AbstractPacketCodec codec = new UnsubscribeMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new UnsubscribeMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof UnsubscribeMessage);
       UnsubscribeMessage decodedMessage = (UnsubscribeMessage) decodedPacket;
@@ -1186,9 +1359,12 @@
       AddTemporaryDestinationMessage message = new AddTemporaryDestinationMessage(
             destination);
       addVersion(message);
+      AbstractPacketCodec codec = new AddTemporaryDestinationMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new AddTemporaryDestinationMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof AddTemporaryDestinationMessage);
 
@@ -1204,9 +1380,12 @@
       DeleteTemporaryDestinationMessage message = new DeleteTemporaryDestinationMessage(
             destination);
       addVersion(message);
+      AbstractPacketCodec codec = new DeleteTemporaryDestinationMessageCodec();
+      SimpleRemotingBuffer buffer = encode(message, codec);
+      checkHeader(buffer, message);
+      buffer.rewind();
 
-      AbstractPacket decodedPacket = encodeAndDecode(message,
-            new DeleteTemporaryDestinationMessageCodec());
+      AbstractPacket decodedPacket = codec.decode(buffer);
 
       assertTrue(decodedPacket instanceof DeleteTemporaryDestinationMessage);
 
@@ -1221,18 +1400,17 @@
 
    // Private -------------------------------------------------------
 
-   private AbstractPacket encodeAndDecode(AbstractPacket packet,
+   private SimpleRemotingBuffer encode(AbstractPacket packet,
          AbstractPacketCodec codec) throws Exception
    {
       SimpleRemotingBuffer buf = new SimpleRemotingBuffer();
-      
-      codec.encode(packet, buf);      
+
+      codec.encode(packet, buf);
       buf.flip();
-      return codec.decode(buf);
+
+      return buf;
    }
 
-   // Inner classes -------------------------------------------------
-
    private final class SimpleRemotingBuffer implements RemotingBuffer
    {
       private static final byte NON_NULL_STRING = (byte) 0;
@@ -1251,8 +1429,14 @@
          dos = null;
          dais = new DataInputStream(
                new ByteArrayInputStream(baos.toByteArray()));
+         dais.mark(1024);
       }
 
+      public void rewind() throws IOException
+      {
+         dais.reset();
+      }
+
       public byte get()
       {
          try

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java	2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java	2007-12-03 14:33:46 UTC (rev 3398)
@@ -24,7 +24,6 @@
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.InputStream;
-import java.net.InetSocketAddress;
 import java.net.URL;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -33,7 +32,6 @@
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Formatter;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
@@ -58,10 +56,6 @@
 import javax.transaction.TransactionManager;
 import javax.transaction.UserTransaction;
 
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.logging.LoggingFilter;
-import org.apache.mina.filter.logging.MdcInjectionFilter;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.hsqldb.Server;
 import org.hsqldb.persist.HsqlProperties;
 import org.jboss.jms.jndi.JMSProviderAdapter;
@@ -69,8 +63,7 @@
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.remoting.JMSServerInvocationHandler;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
-import org.jboss.messaging.core.remoting.server.ServerHandler;
+import org.jboss.messaging.core.remoting.internal.MinaService;
 import org.jboss.messaging.util.JNDIUtil;
 import org.jboss.messaging.util.XMLUtil;
 import org.jboss.remoting.InvokerLocator;
@@ -238,7 +231,7 @@
    // so we don't start up multiple servers with services running on the same port
    private int serverIndex;
 
-   private NioSocketAcceptor acceptor;
+   private MinaService minaService;
 
    // Static ---------------------------------------------------------------------------------------
 
@@ -538,7 +531,8 @@
 
       stopService(REMOTING_OBJECT_NAME);
 
-      stopMINAServer();
+      if (minaService != null)
+         minaService.stop();
 
       if (httpConnectionFactory)
       {
@@ -1415,58 +1409,12 @@
       
       if (!transport.equals("http"))
       {
-         startMINAServer(locator.getPort() + 1000);
+         minaService = new MinaService();
+         minaService.setPort(locator.getPort() + 1000);
+         minaService.start();
       }
    }
 
-
-   private void startMINAServer(int port) throws Exception
-   {
-      if (acceptor == null)
-      {
-         info("Starting MINA on port " + port);
-
-         acceptor = new NioSocketAcceptor();
-
-         // Prepare the configuration
-         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
-         acceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
-         acceptor.getFilterChain().addLast("codec",
-               new ProtocolCodecFilter(new PacketCodecFactory()));
-         acceptor.getFilterChain().addLast("logger", new LoggingFilter());
-
-         // Bind
-         acceptor.setLocalAddress(new InetSocketAddress("127.0.0.1", port));
-         acceptor.setReuseAddress(true);
-         acceptor.getSessionConfig().setReuseAddress(true);
-         acceptor.getSessionConfig().setKeepAlive(true);
-         acceptor.setDisconnectOnUnbind(false);
-
-         acceptor.setHandler(new ServerHandler());
-         acceptor.bind();
-
-         info("MINA started");
-      }
-   }
-
-   private void stopMINAServer()
-   {
-      if (acceptor != null)
-      {
-         acceptor.unbind();
-         acceptor.dispose();
-         acceptor = null;
-         
-         info("Stopped MINA ");
-      }    
-   }
-   
-   private void info(String s)
-   {
-      log.info(new Formatter().format("### %-30s ###\n", s).toString());
-   }
-
-
    private void startSecurityManager() throws Exception
    {
       MockJBossSecurityManager sm = new MockJBossSecurityManager();




More information about the jboss-cvs-commits mailing list