[jboss-cvs] JBoss Messaging SVN: r3327 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/messaging/core and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 14 05:20:05 EST 2007


Author: jmesnil
Date: 2007-11-14 05:20:05 -0500 (Wed, 14 Nov 2007)
New Revision: 3327

Added:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Assert.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Constants.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcher.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcherFactory.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/TransportType.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/NullPacketCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/RemoteDispatcherImpl.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/NullPacket.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/TextPacket.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
Modified:
   branches/Branch_JBMESSAGING-544/.classpath
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544:  Replace client-server transport with NIO based transport

* added new remote API & implementation in o.j.m.core.remoting based on MINA
* new API is not used by JBoss Messaging
* for now, depends on an Eclipse project mina-core (from MINA trunk)

Modified: branches/Branch_JBMESSAGING-544/.classpath
===================================================================
--- branches/Branch_JBMESSAGING-544/.classpath	2007-11-13 20:43:24 UTC (rev 3326)
+++ branches/Branch_JBMESSAGING-544/.classpath	2007-11-14 10:20:05 UTC (rev 3327)
@@ -3,14 +3,11 @@
 	<classpathentry kind="src" path="docs/examples/queue-failover/src"/>
 	<classpathentry kind="src" path="docs/examples/bridge/src"/>
 	<classpathentry kind="src" path="docs/examples/stateless-clustered/src"/>
-	<classpathentry kind="src" path="docs/examples/web-service/src-client"/>
-	<classpathentry kind="src" path="docs/examples/web-service/src"/>
 	<classpathentry kind="src" path="docs/examples/mdb-failure/src"/>
 	<classpathentry kind="src" path="docs/examples/distributed-queue/src"/>
 	<classpathentry kind="src" path="output/gen-parsers"/>
 	<classpathentry kind="src" path="docs/examples/common/src"/>
 	<classpathentry kind="src" path="docs/examples/distributed-topic/src"/>
-	<classpathentry kind="src" path="docs/examples/ejb3mdb/src"/>
 	<classpathentry kind="src" path="docs/examples/http/src"/>
 	<classpathentry kind="src" path="docs/examples/mdb/src"/>
 	<classpathentry kind="src" path="docs/examples/queue/src"/>
@@ -45,7 +42,6 @@
 	<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
-	<classpathentry kind="src" path=".apt_generated"/>
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-j2ee.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-system.jar"/>
@@ -61,5 +57,6 @@
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-local-jdbc.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-xml-binding.jar"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/mina-core"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import static java.util.UUID.randomUUID;
+
+import java.util.UUID;
+
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * A PacketHandler whose ID is randomly generated using UUID.
+ * 
+ * @see UUID#randomUUID()
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public abstract class AbstractPacketHandler implements PacketHandler
+{
+
+   // Constants -----------------------------------------------------
+
+   private final String id;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public AbstractPacketHandler()
+   {
+      this.id = randomUUID().toString();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return "AbstractPacketHandler[id=" + id + "]";
+   }
+
+   // PacketHandler implementation ----------------------------------
+
+   public String getID()
+   {
+      return id;
+   }
+
+   public abstract void handle(AbstractPacket packet);
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Assert.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Assert.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Assert.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,31 @@
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class Assert
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   public final static void assertValidID(String id)
+   {
+      assert id != null;
+      assert id.length() != 0;
+   }
+   
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Constants.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Constants.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Constants.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class Constants
+{
+   // Constants -----------------------------------------------------
+
+   public static final int CONNECTION_TIMEOUT = 20 * 1000;
+
+   public static final byte NULL_BYTE = (byte) 0;
+
+   public static final CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8")
+         .newEncoder();
+
+   public static final CharsetDecoder UTF_8_DECODER = Charset.forName("UTF-8")
+         .newDecoder();
+
+   public static final long NO_CORRELATION_ID = -1L;
+
+   public static final String NO_TARGET_ID = "NO_TARGET_ID_SET";
+
+   public static final String NO_CALLBACK_ID = "NO_CALLBACK_ID_SET";
+
+   public static final int INT_LENGTH = 4;
+
+   public static final int LONG_LENGTH = 8;
+
+   public static final byte TRUE = (byte) 0;
+
+   public static final byte FALSE = (byte) 1;
+
+   public static final int PORT = 8080;
+
+   public static final String PACKET_TYPE_HEADER = "PACKET_TYPE";
+
+   public static final String TARGET_ID_HEADER = "TARGET_ID";
+
+   public static final String CALLBACK_ID_HEADER = "CALLBACK_ID";
+
+   public static final String CORRELATION_ID_HEADER = "CORRELATION_ID";
+
+   public static final String PACKET_HTTP_PARAMETER = "packet";
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * A PacketHandler handles packets (as defined by {@link AbstractPacket} and its
+ * subclasses).
+ * 
+ * It must have an ID unique among all PacketHandlers (or at least among those
+ * registered into the same RemoteDispatcher).
+ * 
+ * @see RemoteDispatcher#register(PacketHandler)
+ * @see RemoteDispatcher#unregister(String)
+ * @see AbstractPacketHandler
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public interface PacketHandler
+{
+   /*
+    * The advantage to use String as ID is that we can leverage Java 5 UUID to
+    * generate these IDs. However theses IDs are 128 bite long and it increases
+    * the size of a packet (compared to integer or long).
+    * 
+    * By switching to Long, we could reduce the size of the packet and maybe
+    * increase the performance (to check after some performance tests)
+    */
+   String getID();
+
+   void handle(AbstractPacket packet);
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcher.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcher.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcher.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public interface RemoteDispatcher
+{
+   public void sendOneWay(AbstractPacket packet);
+
+   public void sendOneWay(AbstractPacket packet, PacketHandler callbackHandler);
+
+   public AbstractPacket sendBlocking(AbstractPacket packet) throws Exception;
+
+   public void connect(int port, TransportType transport) throws Exception;
+
+   public void connect(int port, TransportType transport, boolean useSSL)
+         throws Exception;
+
+   public boolean disconnect() throws Exception;
+
+   public void register(PacketHandler handler);
+
+   public void unregister(String handlerID);
+
+   public void setBlockingRequestTimeout(int timeout, TimeUnit unit);
+}
\ No newline at end of file

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcherFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcherFactory.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcherFactory.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,41 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.internal.RemoteDispatcherImpl;
+
+
+/**
+ * A factory to create new {@link RemoteDispatcher}.
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class RemoteDispatcherFactory
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public static RemoteDispatcher newDispatcher()
+   {
+      return new RemoteDispatcherImpl();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/TransportType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/TransportType.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/TransportType.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,17 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+/**
+ * The transport types supported by JBoss Messaging.
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public enum TransportType
+{
+   TCP, HTTP;
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,180 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.Constants.INT_LENGTH;
+import static org.jboss.messaging.core.remoting.Constants.LONG_LENGTH;
+import static org.jboss.messaging.core.remoting.Constants.NULL_BYTE;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_ENCODER;
+
+import java.nio.charset.CharacterCodingException;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageDecoder;
+import org.apache.mina.filter.codec.demux.MessageDecoderResult;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public abstract class AbstractPacketCodec<P extends AbstractPacket> implements
+      MessageEncoder<P>, MessageDecoder
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private PacketType type;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   protected AbstractPacketCodec(PacketType type)
+   {
+      assert type != null;
+
+      this.type = type;
+   }
+
+   // Public --------------------------------------------------------
+
+   // MessageEncoder implementation ---------------------------------
+
+   public void encode(IoSession session, P packet, ProtocolEncoderOutput out)
+         throws Exception
+   {
+      long correlationID = packet.getCorrelationID();
+      String targetID = packet.getTargetID();
+      String callbackID = packet.getCallbackID();
+
+      int headerLength = LONG_LENGTH + targetID.getBytes().length
+            + callbackID.getBytes().length + 2 /* NULL bytes */;
+
+      IoBuffer buf = IoBuffer.allocate(256);
+
+      // Enable auto-expand for easier encoding
+      buf.setAutoExpand(true);
+
+      buf.put(packet.getType().byteValue());
+      buf.putInt(headerLength);
+      buf.putLong(correlationID);
+      buf.putString(targetID, UTF_8_ENCODER);
+      buf.put(NULL_BYTE);
+      buf.putString(callbackID, UTF_8_ENCODER);
+      buf.put(NULL_BYTE);
+
+      encodeBody(session, packet, buf);
+
+      buf.flip();
+      out.write(buf);
+   }
+
+   // MessageDecoder implementation ---------------------------------
+
+   public MessageDecoderResult decodable(IoSession session, IoBuffer in)
+   {
+      byte t = in.get();
+      if (t != type.byteValue())
+      {
+         return NOT_OK;
+      }
+      if (in.remaining() < INT_LENGTH)
+      {
+         System.out.println("need more data to read header length");
+         // can not read next int
+         return NEED_DATA;
+      }
+      int headerLength = in.getInt();
+      if (in.remaining() < headerLength)
+      {
+         System.out.println("need more data to read header");
+         return NEED_DATA;
+      }
+      in.getLong(); // correlation ID
+      try
+      {
+         in.getString(UTF_8_DECODER);
+      } catch (CharacterCodingException e)
+      {
+         return NOT_OK;
+      }
+      try
+      {
+         in.getString(UTF_8_DECODER);
+      } catch (CharacterCodingException e)
+      {
+         return NOT_OK;
+      }
+
+      if (in.remaining() < INT_LENGTH)
+      {
+         System.out.println("need more data to read body length");
+         // can not read next int
+         return NEED_DATA;
+      }
+      int bodyLength = in.getInt();
+      if (bodyLength == 0)
+      {
+         return OK;
+      }
+      if (in.remaining() < bodyLength)
+      {
+         System.out.println("need more data to read body");
+         return NEED_DATA;
+      }
+      return OK;
+   }
+
+   public MessageDecoderResult decode(IoSession session, IoBuffer in,
+         ProtocolDecoderOutput out) throws Exception
+   {
+      in.get(); // skip message type
+      in.getInt(); // skip header length
+      long correlationID = in.getLong();
+      String targetID = in.getString(UTF_8_DECODER);
+      String callbackID = in.getString(UTF_8_DECODER);
+
+      P packet = decodeBody(session, in);
+
+      if (packet == null)
+      {
+         return NEED_DATA;
+      }
+      packet.setTargetID(targetID);
+      packet.setCorrelationID(correlationID);
+      packet.setCallbackID(callbackID);
+      out.write(packet);
+
+      return OK;
+   }
+
+   public void finishDecode(IoSession session, ProtocolDecoderOutput out)
+         throws Exception
+   {
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected abstract void encodeBody(IoSession session, P packet, IoBuffer buf)
+         throws Exception;
+
+   protected abstract P decodeBody(IoSession session, IoBuffer in)
+         throws Exception;
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ConnectionFactoryCreateConnectionRequestCodec extends
+      AbstractPacketCodec<ConnectionFactoryCreateConnectionRequest>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public ConnectionFactoryCreateConnectionRequestCodec()
+   {
+      super(REQ_CONNECTIONFACTORY_CREATECONNECTION);
+   }
+
+   // AbstractPackedCodec overrides----------------------------------
+   
+   @Override
+   protected void encodeBody(IoSession session,
+         ConnectionFactoryCreateConnectionRequest request, IoBuffer out)
+         throws Exception
+   {
+      // no body
+      out.putInt(0);
+   }
+
+   @Override
+   protected ConnectionFactoryCreateConnectionRequest decodeBody(
+         IoSession session, IoBuffer in) throws Exception
+   {
+      in.getInt(); // skip body length
+      return new ConnectionFactoryCreateConnectionRequest();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.Constants.NULL_BYTE;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_ENCODER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CONNECTIONFACTORY_CREATECONNECTION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ConnectionFactoryCreateConnectionResponseCodec extends
+      AbstractPacketCodec<ConnectionFactoryCreateConnectionResponse>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public ConnectionFactoryCreateConnectionResponseCodec()
+   {
+      super(RESP_CONNECTIONFACTORY_CREATECONNECTION);
+   }
+
+   // AbstractPackedCodec overrides----------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session,
+         ConnectionFactoryCreateConnectionResponse response, IoBuffer out)
+         throws Exception
+   {
+      String id = response.getID();
+
+      int bodyLength = id.getBytes().length + 1 /* NULL byte */;
+
+      out.putInt(bodyLength);
+      out.putString(id, UTF_8_ENCODER);
+      out.put(NULL_BYTE);
+   }
+
+   @Override
+   protected ConnectionFactoryCreateConnectionResponse decodeBody(
+         IoSession session, IoBuffer in) throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (bodyLength > in.remaining())
+      {
+         return null;
+      }
+      String id = in.getString(UTF_8_DECODER);
+
+      return new ConnectionFactoryCreateConnectionResponse(id);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/NullPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/NullPacketCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/NullPacketCodec.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class NullPacketCodec extends AbstractPacketCodec<NullPacket>
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public NullPacketCodec()
+   {
+      super(NULL);
+   }
+
+   // AbstractPackedCodec overrides----------------------------------
+   
+   @Override
+   protected void encodeBody(IoSession session, NullPacket packet,
+         IoBuffer out) throws Exception
+   {
+      // no body
+      out.putInt(0);
+   }
+
+   @Override
+   protected NullPacket decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      in.getInt(); // skip body length
+      return new NullPacket();
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class PacketCodecFactory extends DemuxingProtocolCodecFactory
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // FIXME: split encoder/decoder required only on client and/or server sides
+   public PacketCodecFactory()
+   {
+      super.addMessageDecoder(NullPacketCodec.class);
+      super.addMessageEncoder(NullPacket.class, NullPacketCodec.class);
+
+      // TextPacket are for testing purpose only!
+      super.addMessageDecoder(TextPacketCodec.class);
+      super.addMessageEncoder(TextPacket.class, TextPacketCodec.class);
+
+      super
+            .addMessageDecoder(ConnectionFactoryCreateConnectionRequestCodec.class);
+      super.addMessageEncoder(ConnectionFactoryCreateConnectionRequest.class,
+            ConnectionFactoryCreateConnectionRequestCodec.class);
+
+      super
+            .addMessageDecoder(ConnectionFactoryCreateConnectionResponseCodec.class);
+      super.addMessageEncoder(ConnectionFactoryCreateConnectionResponse.class,
+            ConnectionFactoryCreateConnectionResponseCodec.class);
+
+      // super.addMessageDecoder(ClientDeliveryCodec.class);
+      // super.addMessageEncoder(ClientDelivery.class,
+      // ClientDeliveryCodec.class);
+      //
+      // super.addMessageDecoder(ConnectionCreateSessionRequestCodec.class);
+      // super.addMessageEncoder(ConnectionCreateSessionRequest.class,
+      // ConnectionCreateSessionRequestCodec.class);
+      //
+      // super.addMessageDecoder(ConnectionCreateSessionResponseCodec.class);
+      // super.addMessageEncoder(ConnectionCreateSessionResponse.class,
+      // ConnectionCreateSessionResponseCodec.class);
+
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.Constants.NULL_BYTE;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_ENCODER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class TextPacketCodec extends AbstractPacketCodec<TextPacket>
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public TextPacketCodec()
+   {
+      super(TEXT);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session, TextPacket packet, IoBuffer out)
+         throws Exception
+   {
+      int bodyLength = packet.getText().getBytes().length + 1; // for NULL byte
+
+      out.putInt(bodyLength);
+      out.putString(packet.getText(), UTF_8_ENCODER);
+      out.put(NULL_BYTE);
+   }
+
+   @Override
+   protected TextPacket decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (bodyLength > in.remaining())
+      {
+         return null;
+      }
+      String text = in.getString(UTF_8_DECODER);
+
+      return new TextPacket(text);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.reqres.Response;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ClientHandler extends IoHandlerAdapter
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // IoHandlerAdapter overrides ------------------------------------
+
+   @Override
+   public void messageReceived(IoSession session, Object message)
+         throws Exception
+   {
+      if (message instanceof AbstractPacket)
+      {
+         AbstractPacket packet = (AbstractPacket) message;
+         String targetID = packet.getTargetID();
+         PacketHandler handler = (PacketHandler) session.getAttribute(targetID);
+         if (handler != null)
+         {
+            handler.handle(packet);
+         } else
+         {
+            System.err
+                  .println("ClientHandler.messageReceived() unhandled packet: "
+                        + packet);
+         }
+      } else if (message instanceof Response)
+      {
+         // response is handled by the reqres filter.
+         // do nothing
+      } else
+      {
+         System.err.println("unhandled message: " + message);
+      }
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
+import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
+
+import org.apache.mina.filter.reqres.ResponseInspector;
+import org.apache.mina.filter.reqres.ResponseType;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class PacketInspector implements ResponseInspector
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // ResponseInspector implementation ------------------------------
+
+   public Object getRequestId(Object message)
+   {
+      if (!(message instanceof AbstractPacket))
+      {
+         return null;
+      }
+      long correlationID = ((AbstractPacket) message).getCorrelationID();
+      if (correlationID != NO_CORRELATION_ID)
+      {
+         return correlationID;
+      } else
+      {
+         return null;
+      }
+   }
+
+   public ResponseType getResponseType(Object message)
+   {
+      if (!(message instanceof AbstractPacket))
+      {
+         return null;
+      }
+
+      return WHOLE;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/RemoteDispatcherImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/RemoteDispatcherImpl.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/RemoteDispatcherImpl.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,243 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.Constants.CONNECTION_TIMEOUT;
+import static org.jboss.messaging.core.remoting.TransportType.HTTP;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.filter.reqres.Request;
+import org.apache.mina.filter.reqres.RequestResponseFilter;
+import org.apache.mina.filter.reqres.RequestTimeoutException;
+import org.apache.mina.filter.reqres.Response;
+import org.apache.mina.filter.ssl.SslFilter;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.RemoteDispatcher;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class RemoteDispatcherImpl implements RemoteDispatcher
+{
+
+   // Attributes ----------------------------------------------------
+
+   private IoSession session;
+   private SslFilter sslFilter;
+
+   // By default, a blocking request will timeout after 5 seconds
+   private int blockingRequestTimeout = 5;
+   private TimeUnit blockingRequestTimeUnit = SECONDS;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public RemoteDispatcherImpl()
+   {
+   }
+
+   // RemoteDispatcher implementation -------------------------------
+
+   public void connect(int port, TransportType transport) throws Exception
+   {
+      connect(port, transport, false);
+   }
+
+   public void connect(int port, TransportType transport, boolean useSSL)
+         throws Exception
+   {
+      assert port > 0;
+      assert transport != null;
+
+      NioSocketConnector connector = new NioSocketConnector();
+
+      connector.setConnectTimeout(CONNECTION_TIMEOUT);
+
+      if (useSSL)
+         addSSLSupport(connector.getFilterChain());
+
+      MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+      connector.getFilterChain().addLast("mdc", mdcInjectionFilter);
+
+      if (transport == TCP)
+      {
+         connector.getFilterChain().addLast("codec",
+               new ProtocolCodecFilter(new PacketCodecFactory()));
+      } else
+      {
+         assert transport == HTTP;
+
+         // TODO support HTTP
+         // URL url = new URL("http://localhost:" + port + "/");
+         // connector.getFilterChain().addLast("http_codec",
+         // new ProtocolCodecFilter(new HttpProtocolCodecFactory(url)));
+         // connector.getFilterChain().addLast("http_logger", new
+         // LoggingFilter());
+         // connector.getFilterChain().addLast("http_filter",
+         // new HTTPFilter(false));
+      }
+
+      addBlockingRequestResponseFilter(connector.getFilterChain());
+
+      connector.getFilterChain().addLast("logger", new LoggingFilter());
+
+      connector.setHandler(new ClientHandler());
+      InetSocketAddress address = new InetSocketAddress(port);
+      ConnectFuture future = connector.connect(address);
+      future.awaitUninterruptibly();
+      if (!future.isConnected())
+      {
+         throw new IOException("Cannot connect to " + address.toString());
+      }
+      this.session = future.getSession();
+   }
+
+   public boolean disconnect() throws Exception
+   {
+      assert session != null;
+
+      if (sslFilter != null)
+      {
+         sslFilter.stopSsl(session).awaitUninterruptibly();
+         // FIXME: w/o the delay, an exception is thrown:
+         // "Inbound closed before receiving peer's close_notify"
+         Thread.sleep(500);
+      }
+      CloseFuture closeFuture = session.close().awaitUninterruptibly();
+      return closeFuture.isClosed();
+   }
+
+   public void register(PacketHandler handler)
+   {
+      assert session != null;
+      assertValidID(handler.getID());
+      assert handler != null;
+
+      session.setAttribute(handler.getID(), handler);
+   }
+
+   public void unregister(String handlerID)
+   {
+      assert session != null;
+      assertValidID(handlerID);
+
+      session.removeAttribute(handlerID);
+   }
+
+   public void sendOneWay(AbstractPacket packet)
+   {
+      assert packet != null;
+      checkConnected();
+
+      session.write(packet);
+   }
+
+   public void sendOneWay(AbstractPacket packet, PacketHandler callbackHandler)
+   {
+      assert packet != null;
+      assert callbackHandler != null;
+      checkConnected();
+
+      register(callbackHandler);
+      packet.setCallbackID(callbackHandler.getID());
+      session.write(packet);
+   }
+
+   public AbstractPacket sendBlocking(AbstractPacket packet)
+         throws TimeoutException
+   {
+      assert packet != null;
+      checkConnected();
+
+      packet.setCorrelationID(System.nanoTime());
+
+      // TODO request timeout must be configurable
+      Request req = new Request(packet.getCorrelationID(), packet,
+            blockingRequestTimeout, blockingRequestTimeUnit);
+      session.write(req);
+      Response response;
+      try
+      {
+         response = req.awaitResponse();
+         return (AbstractPacket) response.getMessage();
+      } catch (RequestTimeoutException e)
+      {
+         TimeoutException toe = new TimeoutException();
+         toe.initCause(e);
+         throw toe;
+      } catch (InterruptedException e)
+      {
+         TimeoutException toe = new TimeoutException();
+         toe.initCause(e);
+         throw toe;
+      }
+   }
+
+   public void setBlockingRequestTimeout(int timeout, TimeUnit unit)
+   {
+      this.blockingRequestTimeout = timeout;
+      this.blockingRequestTimeUnit = unit;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void addBlockingRequestResponseFilter(
+         DefaultIoFilterChainBuilder chain)
+   {
+      ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+      RequestResponseFilter filter = new RequestResponseFilter(
+            new PacketInspector(), scheduler);
+      chain.addLast("reqres", filter);
+   }
+
+   private void addSSLSupport(DefaultIoFilterChainBuilder chain)
+         throws Exception
+   {
+      // TODO support SSL
+      // this.sslFilter = new
+      // SslFilter(BogusSslContextFactory.getInstance(false));
+      // sslFilter.setUseClientMode(true);
+      // chain.addLast("sslFilter", sslFilter);
+   }
+
+   private void checkConnected()
+   {
+      if (session == null)
+      {
+         throw new IllegalStateException("RemoteDispatcher is not connected.");
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,94 @@
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.Constants.NO_CALLBACK_ID;
+import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
+import static org.jboss.messaging.core.remoting.Constants.NO_TARGET_ID;
+
+public class AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long correlationID = NO_CORRELATION_ID;
+
+   private String targetID = NO_TARGET_ID;
+
+   private String callbackID = NO_CALLBACK_ID;
+
+   private final PacketType type;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public AbstractPacket(PacketType type)
+   {
+      assert type != null;
+
+      this.type = type;
+   }
+
+   // Public --------------------------------------------------------
+
+   public PacketType getType()
+   {
+      return type;
+   }
+
+   public void setCorrelationID(long correlationID)
+   {
+      this.correlationID = correlationID;
+   }
+
+   public long getCorrelationID()
+   {
+      return correlationID;
+   }
+
+   public String getTargetID()
+   {
+      return targetID;
+   }
+
+   public void setTargetID(String targetID)
+   {
+      assertValidID(targetID);
+
+      this.targetID = targetID;
+   }
+
+   public void setCallbackID(String callbackID)
+   {
+      assertValidID(callbackID);
+
+      this.callbackID = callbackID;
+   }
+
+   public String getCallbackID()
+   {
+      return callbackID;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + "]";
+   }
+
+   protected String getParentString()
+   {
+      return "PACKET[type=" + type + ", correlationID=" + correlationID
+            + ", targetID=" + targetID + ", callabckID=" + callbackID;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ConnectionFactoryCreateConnectionRequest extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // TODO: add auth credentials
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ConnectionFactoryCreateConnectionRequest()
+   {
+      super(REQ_CONNECTIONFACTORY_CREATECONNECTION);
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CONNECTIONFACTORY_CREATECONNECTION;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ConnectionFactoryCreateConnectionResponse extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String id;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ConnectionFactoryCreateConnectionResponse(String id)
+   {
+      super(RESP_CONNECTIONFACTORY_CREATECONNECTION);
+
+      assertValidID(id);
+
+      this.id = id;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getID()
+   {
+      return id;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", id=" + id + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/NullPacket.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/NullPacket.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/NullPacket.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class NullPacket extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public NullPacket()
+   {
+      super(NULL);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public enum PacketType
+{
+   NULL((byte)1),
+   CLIENT_DELIVERY((byte) 2),
+   TEXT((byte) 3),
+   REQ_CONNECTIONFACTORY_CREATECONNECTION((byte) 100),
+   RESP_CONNECTIONFACTORY_CREATECONNECTION((byte) 100100),
+   REQ_CONNECTION_CREATESESSION((byte) 201),
+   RESP_CONNECTION_CREATESESSION((byte) 100200);
+
+   private byte type;
+
+   PacketType(byte type)
+   {
+      this.type = type;
+   }
+
+   public byte byteValue()
+   {
+      return type;
+   }
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/TextPacket.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/TextPacket.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/TextPacket.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
+
+
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class TextPacket extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String text;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public TextPacket(String text)
+   {
+      super(TEXT);
+
+      assert text != null;
+
+      this.text = text;
+   }
+   
+   // Public --------------------------------------------------------
+
+   public String getText()
+   {
+      return text;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", text=" + text + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------   
+}

Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,179 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.jboss.messaging.core.remoting.Constants.PORT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.jboss.messaging.core.remoting.AbstractPacketHandler;
+import org.jboss.messaging.core.remoting.RemoteDispatcher;
+import org.jboss.messaging.core.remoting.RemoteDispatcherFactory;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class RemoteDispatcherTest extends TestSupport
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testCanNotSendPacketIfNotConnected() throws Exception
+   {
+      RemoteDispatcher dispatcher = RemoteDispatcherFactory.newDispatcher();
+
+      try
+      {
+         dispatcher.sendOneWay(new NullPacket());
+         fail("can not send a packet if the dispatcher is not connected");
+      } catch (IllegalStateException e)
+      {
+
+      }
+   }
+
+   public void testSendOneWay() throws Exception
+   {
+      TextPacket packet = new TextPacket("testSendOneWay");
+      clientDispatcher.sendOneWay(packet);
+
+      Thread.sleep(300);
+
+      List<AbstractPacket> messages = serverHandler.getPackets();
+      assertEquals(1, messages.size());
+      String response = ((TextPacket) messages.get(0)).getText();
+      assertEquals(packet.getText(), response);
+   }
+
+   public void testSendManyOneWay() throws Exception
+   {
+      TextPacket[] packets = new TextPacket[MANY_MESSAGES];
+      for (int i = 0; i < MANY_MESSAGES; i++)
+      {
+         packets[i] = new TextPacket("testSendManyOneWay " + i);
+         clientDispatcher.sendOneWay(packets[i]);
+      }
+
+      List<AbstractPacket> receivedPackets = serverHandler.getPackets();
+      // unlikely that all messages have been consumed
+      assertNotSame(MANY_MESSAGES, receivedPackets.size());
+
+      Thread.sleep(2 * 1000);
+
+      assertEquals(MANY_MESSAGES, receivedPackets.size());
+      for (int i = 0; i < MANY_MESSAGES; i++)
+      {
+         TextPacket receivedPacket = (TextPacket) receivedPackets.get(i);
+         assertEquals(packets[i].getText(), receivedPacket.getText());
+      }
+   }
+
+   public void testSendOneWayWithCallbackHandler() throws Exception
+   {
+      // add some lag
+      serverHandler.setSleepTime(300, MILLISECONDS);
+
+      TestClientHandler callbackHandler = new TestClientHandler();
+
+      TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
+
+      clientDispatcher.sendOneWay(packet, callbackHandler);
+      assertEquals(0, callbackHandler.getPackets().size());
+
+      Thread.sleep(600);
+
+      assertEquals(1, callbackHandler.getPackets().size());
+      String response = callbackHandler.getPackets().get(0).getText();
+      assertEquals(reverse(packet.getText()), response);
+   }
+
+   public void testSendBlocking() throws Exception
+   {
+      TextPacket request = new TextPacket("testSendBlocking");
+
+      AbstractPacket receivedPacket = clientDispatcher.sendBlocking(request);
+
+      assertNotNull(receivedPacket);
+      assertTrue(receivedPacket instanceof TextPacket);
+      TextPacket response = (TextPacket) receivedPacket;
+      assertEquals(reverse(request.getText()), response.getText());
+   }
+
+   public void testSendBlockingWithTimeout() throws Exception
+   {
+      clientDispatcher.setBlockingRequestTimeout(5, SECONDS);
+      serverHandler.setSleepTime(7, SECONDS);
+      
+      AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
+
+      try
+      {
+         clientDispatcher.sendBlocking(packet);
+         fail("a RequestTimeoutException should be thrown");
+      } catch (TimeoutException e)
+      {
+      }
+   }
+
+   // TestCase implementation ---------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      startServer(PORT, TRANSPORT);
+      startClient(PORT, TRANSPORT);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      clientDispatcher.disconnect();
+      serverAcceptor.unbind();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private final class TestClientHandler extends AbstractPacketHandler
+   {
+      private final List<TextPacket> packets;
+
+      private TestClientHandler()
+      {
+         packets = new ArrayList<TextPacket>();
+      }
+
+      public void handle(AbstractPacket packet)
+      {
+         packets.add((TextPacket) packet);
+      }
+
+      public List<TextPacket> getPackets()
+      {
+         return packets;
+      }
+   }
+}

Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,133 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static org.jboss.messaging.core.remoting.Constants.NO_CALLBACK_ID;
+import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
+import static org.jboss.messaging.core.remoting.TransportType.HTTP;
+import static org.jboss.test.messaging.core.remoting.TestSupport.reverse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ReverseServerHandler extends IoHandlerAdapter
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private List<AbstractPacket> packets = new ArrayList<AbstractPacket>();
+
+   private long sleepTime = 0;
+
+   private List<IoSession> sessions = new ArrayList<IoSession>();
+
+   private TransportType transport;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   ReverseServerHandler(TransportType transport)
+   {
+      this.transport = transport;
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setSleepTime(long time, TimeUnit unit)
+   {
+      this.sleepTime = unit.toMillis(time);
+   }
+
+   public List<AbstractPacket> getPackets()
+   {
+      return packets;
+   }
+
+   public List<IoSession> getSessions()
+   {
+      return sessions;
+   }
+
+   // IoHandlerAdapter overrides ------------------------------------
+
+   @Override
+   public void messageReceived(IoSession session, Object msg)
+   {
+      packets.add((AbstractPacket) msg);
+
+      // TODO put this logic in the real server handler
+      if (((AbstractPacket) msg).getCorrelationID() == NO_CORRELATION_ID
+            && transport == HTTP)
+      {
+         session.write(new NullPacket());
+      }
+
+      if (msg instanceof TextPacket)
+      {
+         TextPacket incomingPacket = (TextPacket) msg;
+         if (mustReply(incomingPacket))
+         {
+            try
+            {
+               Thread.sleep(sleepTime);
+               TextPacket p = new TextPacket(reverse(incomingPacket.getText()));
+               p.setCorrelationID(incomingPacket.getCorrelationID());
+               if (!NO_CALLBACK_ID.equals(incomingPacket.getCallbackID()))
+               {
+                  p.setTargetID(incomingPacket.getCallbackID());
+               }
+               session.write(p);
+            } catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   @Override
+   public void sessionCreated(IoSession session) throws Exception
+   {
+      sessions.add(session);
+   }
+
+   @Override
+   public void sessionClosed(IoSession session) throws Exception
+   {
+      super.sessionClosed(session);
+      sessions.remove(session);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private boolean mustReply(AbstractPacket incomingPacket)
+   {
+      boolean mustReply = (incomingPacket.getCorrelationID() != NO_CORRELATION_ID)
+            || (!NO_CALLBACK_ID.equals(incomingPacket.getCallbackID()));
+      return mustReply;
+   }
+
+   // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file

Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static org.jboss.messaging.core.remoting.Constants.PORT;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.AbstractPacketHandler;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class TargetHandlerTest extends TestSupport
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testClientHandlePacketSentByServer() throws Exception
+   {
+      ClientTargetHandler targetHandler = new ClientTargetHandler();
+
+      clientDispatcher.register(targetHandler);
+
+      // send a packet to create the IoSession on the server
+      clientDispatcher.sendOneWay(new TextPacket(
+            "testClientHandlePacketSentByServer from client"));
+
+      Thread.sleep(300);
+
+      assertEquals(1, serverHandler.getSessions().size());
+      IoSession serverSession = serverHandler.getSessions().get(0);
+      TextPacket packetFromServer = new TextPacket(
+            "testClientHandlePacketSentByServer from server");
+      packetFromServer.setTargetID(targetHandler.getID());
+      serverSession.write(packetFromServer);
+
+      Thread.sleep(300);
+
+      List<TextPacket> packets = targetHandler.getPackets();
+      assertEquals(1, packets.size());
+      TextPacket packetReceivedByClient = (TextPacket) packets.get(0);
+      assertEquals(packetFromServer.getText(), packetReceivedByClient.getText());
+   }
+
+   // TestCase overrides --------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      startServer(PORT, TRANSPORT);
+      startClient(PORT, TRANSPORT);
+   }
+
+   public void tearDown() throws Exception
+   {
+      clientDispatcher.disconnect();
+      serverAcceptor.unbind();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private final class ClientTargetHandler extends AbstractPacketHandler
+   {
+      private final List<TextPacket> packets;
+
+      private ClientTargetHandler()
+      {
+         packets = new ArrayList<TextPacket>();
+      }
+
+      public void handle(AbstractPacket packet)
+      {
+         packets.add((TextPacket) packet);
+      }
+
+      public List<TextPacket> getPackets()
+      {
+         return packets;
+      }
+   }
+}

Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static org.jboss.messaging.core.remoting.TransportType.HTTP;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.messaging.core.remoting.RemoteDispatcher;
+import org.jboss.messaging.core.remoting.RemoteDispatcherFactory;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public abstract class TestSupport extends TestCase
+{
+   // Constants -----------------------------------------------------
+
+   public static final int MANY_MESSAGES = 500;
+
+   /**
+    * Configurable by system property <code>transport.type</code>, default is
+    * TCP
+    */
+   public final static TransportType TRANSPORT;
+
+   // Attributes ----------------------------------------------------
+
+   NioSocketAcceptor serverAcceptor;
+
+   ReverseServerHandler serverHandler;
+
+   RemoteDispatcher clientDispatcher;
+
+   // Static --------------------------------------------------------
+
+   static
+   {
+      String transportType = System.getProperty("transport.type", TCP
+            .toString());
+      TRANSPORT = TransportType.valueOf(transportType);
+      info("Default transport is " + TRANSPORT);
+   }
+
+   static String reverse(String text)
+   {
+      // Reverse text
+      StringBuffer buf = new StringBuffer(text.length());
+      for (int i = text.length() - 1; i >= 0; i--)
+      {
+         buf.append(text.charAt(i));
+      }
+      return buf.toString();
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   void startServer(int port, TransportType transport) throws Exception
+   {
+      startServer(port, transport, false);
+   }
+
+   void startServer(int port, TransportType transport, boolean useSSL)
+         throws Exception
+   {
+      serverAcceptor = new NioSocketAcceptor();
+      serverHandler = new ReverseServerHandler(transport);
+
+      MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+      serverAcceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
+
+      if (useSSL)
+         addSSLSupport(serverAcceptor.getFilterChain());
+
+      info("using " + transport);
+      if (transport == TCP)
+      {
+         serverAcceptor.getFilterChain().addLast("codec",
+               new ProtocolCodecFilter(new PacketCodecFactory()));
+      } else
+      {
+         assert transport == HTTP;
+
+         // TODO add http support
+         // serverAcceptor.getFilterChain().addLast("http_codec",
+         // new ProtocolCodecFilter(new HttpServerCodecFactory()));
+         // serverAcceptor.getFilterChain().addLast("http_filter",
+         // new HTTPFilter(true));
+      }
+      serverAcceptor.getFilterChain().addLast("logger", new LoggingFilter());
+
+      // Bind
+      serverAcceptor.setLocalAddress(new InetSocketAddress(port));
+      info("listening on " + port);
+      serverAcceptor.setHandler(serverHandler);
+      serverAcceptor.bind();
+   }
+
+   private void addSSLSupport(DefaultIoFilterChainBuilder chain)
+         throws Exception
+   {
+      // TODO add SSL support
+      // SslFilter sslFilter = new
+      // SslFilter(BogusSslContextFactory.getInstance(true));
+      // chain.addLast("sslFilter", sslFilter);
+      // info("SSL enabled");
+   }
+
+   void startClient(int port, TransportType transport) throws Exception
+   {
+      startClient(port, transport, false);
+   }
+
+   void startClient(int port, TransportType transport, boolean useSSL)
+         throws Exception
+   {
+      clientDispatcher = RemoteDispatcherFactory.newDispatcher();
+      clientDispatcher.connect(port, transport, useSSL);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private static void info(String info)
+   {
+      System.out.format("### %-50s ###\n", info);
+   }
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java	2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.wireformat;
+
+import static java.util.UUID.randomUUID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CONNECTIONFACTORY_CREATECONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.filter.codec.ProtocolCodecSession;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class PacketTypeTest extends TestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testNullPacket() throws Exception
+   {
+      NullPacket packet = new NullPacket();
+      packet.setCallbackID(randomUUID().toString());
+      packet.setCorrelationID(System.currentTimeMillis());
+      packet.setTargetID(randomUUID().toString());
+
+      AbstractPacket decodedPacket = encodeAndDecode(packet);
+
+      assertTrue(decodedPacket instanceof NullPacket);
+      NullPacket p = (NullPacket) decodedPacket;
+
+      assertEquals(NULL, p.getType());
+      assertEquals(packet.getCallbackID(), p.getCallbackID());
+      assertEquals(packet.getCorrelationID(), p.getCorrelationID());
+      assertEquals(packet.getTargetID(), p.getTargetID());
+   }
+
+   public void testTextPacket() throws Exception
+   {
+      TextPacket packet = new TextPacket("testTextPacket");
+
+      AbstractPacket decodedPacket = encodeAndDecode(packet);
+
+      assertTrue(decodedPacket instanceof TextPacket);
+      TextPacket p = (TextPacket) decodedPacket;
+
+      assertEquals(TEXT, p.getType());
+      assertEquals(packet.getText(), p.getText());
+   }
+
+   public void testConnectionFactoryCreateConnectionRequest() throws Exception
+   {
+      ConnectionFactoryCreateConnectionRequest packet = new ConnectionFactoryCreateConnectionRequest();
+
+      AbstractPacket decodedPacket = encodeAndDecode(packet);
+
+      assertTrue(decodedPacket instanceof ConnectionFactoryCreateConnectionRequest);
+      assertEquals(REQ_CONNECTIONFACTORY_CREATECONNECTION, decodedPacket
+            .getType());
+   }
+
+   public void testConnectionFactoryCreateConnectionResponse() throws Exception
+   {
+      ConnectionFactoryCreateConnectionResponse packet = new ConnectionFactoryCreateConnectionResponse(
+            randomUUID().toString());
+
+      AbstractPacket decodedPacket = encodeAndDecode(packet);
+
+      assertTrue(decodedPacket instanceof ConnectionFactoryCreateConnectionResponse);
+      assertEquals(RESP_CONNECTIONFACTORY_CREATECONNECTION, decodedPacket
+            .getType());
+   }
+
+   // TestCase overrides ---------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private AbstractPacket encodeAndDecode(AbstractPacket packet)
+         throws Exception
+   {
+      IoBuffer buffer = encode(packet);
+      return decode(buffer);
+   }
+
+   private IoBuffer encode(AbstractPacket packet) throws Exception
+   {
+      ProtocolCodecSession httpCodecSession = new ProtocolCodecSession();
+      ProtocolEncoder encoder = new PacketCodecFactory().getEncoder();
+      encoder.encode(httpCodecSession, packet, httpCodecSession
+            .getEncoderOutput());
+      IoBuffer buffer = httpCodecSession.getEncoderOutputQueue().poll();
+      return buffer;
+   }
+
+   private AbstractPacket decode(IoBuffer buffer) throws Exception
+   {
+      ProtocolCodecSession session = new ProtocolCodecSession();
+      ProtocolDecoder decoder = new PacketCodecFactory().getDecoder();
+      decoder.decode(session, buffer, session.getDecoderOutput());
+
+      Object o = session.getDecoderOutputQueue().poll();
+
+      assertTrue(o instanceof AbstractPacket);
+
+      return (AbstractPacket) o;
+   }
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list