[jboss-cvs] JBoss Messaging SVN: r3327 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/messaging/core and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 14 05:20:05 EST 2007
Author: jmesnil
Date: 2007-11-14 05:20:05 -0500 (Wed, 14 Nov 2007)
New Revision: 3327
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Assert.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Constants.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcher.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcherFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/TransportType.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/NullPacketCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/RemoteDispatcherImpl.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/NullPacket.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/TextPacket.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
Modified:
branches/Branch_JBMESSAGING-544/.classpath
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
* added new remote API & implementation in o.j.m.core.remoting based on MINA
* new API is not used by JBoss Messaging
* for now, depends on an Eclipse project mina-core (from MINA trunk)
Modified: branches/Branch_JBMESSAGING-544/.classpath
===================================================================
--- branches/Branch_JBMESSAGING-544/.classpath 2007-11-13 20:43:24 UTC (rev 3326)
+++ branches/Branch_JBMESSAGING-544/.classpath 2007-11-14 10:20:05 UTC (rev 3327)
@@ -3,14 +3,11 @@
<classpathentry kind="src" path="docs/examples/queue-failover/src"/>
<classpathentry kind="src" path="docs/examples/bridge/src"/>
<classpathentry kind="src" path="docs/examples/stateless-clustered/src"/>
- <classpathentry kind="src" path="docs/examples/web-service/src-client"/>
- <classpathentry kind="src" path="docs/examples/web-service/src"/>
<classpathentry kind="src" path="docs/examples/mdb-failure/src"/>
<classpathentry kind="src" path="docs/examples/distributed-queue/src"/>
<classpathentry kind="src" path="output/gen-parsers"/>
<classpathentry kind="src" path="docs/examples/common/src"/>
<classpathentry kind="src" path="docs/examples/distributed-topic/src"/>
- <classpathentry kind="src" path="docs/examples/ejb3mdb/src"/>
<classpathentry kind="src" path="docs/examples/http/src"/>
<classpathentry kind="src" path="docs/examples/mdb/src"/>
<classpathentry kind="src" path="docs/examples/queue/src"/>
@@ -45,7 +42,6 @@
<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
- <classpathentry kind="src" path=".apt_generated"/>
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-j2ee.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-system.jar"/>
@@ -61,5 +57,6 @@
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-local-jdbc.jar"/>
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-xml-binding.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/mina-core"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import static java.util.UUID.randomUUID;
+
+import java.util.UUID;
+
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * A PacketHandler whose ID is randomly generated using UUID.
+ *
+ * @see UUID#randomUUID()
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public abstract class AbstractPacketHandler implements PacketHandler
+{
+
+ // Constants -----------------------------------------------------
+
+ private final String id;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public AbstractPacketHandler()
+ {
+ this.id = randomUUID().toString();
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public String toString()
+ {
+ return "AbstractPacketHandler[id=" + id + "]";
+ }
+
+ // PacketHandler implementation ----------------------------------
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public abstract void handle(AbstractPacket packet);
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Assert.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Assert.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Assert.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,31 @@
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class Assert
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public final static void assertValidID(String id)
+ {
+ assert id != null;
+ assert id.length() != 0;
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Constants.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Constants.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Constants.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class Constants
+{
+ // Constants -----------------------------------------------------
+
+ public static final int CONNECTION_TIMEOUT = 20 * 1000;
+
+ public static final byte NULL_BYTE = (byte) 0;
+
+ public static final CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8")
+ .newEncoder();
+
+ public static final CharsetDecoder UTF_8_DECODER = Charset.forName("UTF-8")
+ .newDecoder();
+
+ public static final long NO_CORRELATION_ID = -1L;
+
+ public static final String NO_TARGET_ID = "NO_TARGET_ID_SET";
+
+ public static final String NO_CALLBACK_ID = "NO_CALLBACK_ID_SET";
+
+ public static final int INT_LENGTH = 4;
+
+ public static final int LONG_LENGTH = 8;
+
+ public static final byte TRUE = (byte) 0;
+
+ public static final byte FALSE = (byte) 1;
+
+ public static final int PORT = 8080;
+
+ public static final String PACKET_TYPE_HEADER = "PACKET_TYPE";
+
+ public static final String TARGET_ID_HEADER = "TARGET_ID";
+
+ public static final String CALLBACK_ID_HEADER = "CALLBACK_ID";
+
+ public static final String CORRELATION_ID_HEADER = "CORRELATION_ID";
+
+ public static final String PACKET_HTTP_PARAMETER = "packet";
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * A PacketHandler handles packets (as defined by {@link AbstractPacket} and its
+ * subclasses).
+ *
+ * It must have an ID unique among all PacketHandlers (or at least among those
+ * registered into the same RemoteDispatcher).
+ *
+ * @see RemoteDispatcher#register(PacketHandler)
+ * @see RemoteDispatcher#unregister(String)
+ * @see AbstractPacketHandler
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public interface PacketHandler
+{
+ /*
+ * The advantage to use String as ID is that we can leverage Java 5 UUID to
+ * generate these IDs. However theses IDs are 128 bite long and it increases
+ * the size of a packet (compared to integer or long).
+ *
+ * By switching to Long, we could reduce the size of the packet and maybe
+ * increase the performance (to check after some performance tests)
+ */
+ String getID();
+
+ void handle(AbstractPacket packet);
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcher.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcher.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcher.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public interface RemoteDispatcher
+{
+ public void sendOneWay(AbstractPacket packet);
+
+ public void sendOneWay(AbstractPacket packet, PacketHandler callbackHandler);
+
+ public AbstractPacket sendBlocking(AbstractPacket packet) throws Exception;
+
+ public void connect(int port, TransportType transport) throws Exception;
+
+ public void connect(int port, TransportType transport, boolean useSSL)
+ throws Exception;
+
+ public boolean disconnect() throws Exception;
+
+ public void register(PacketHandler handler);
+
+ public void unregister(String handlerID);
+
+ public void setBlockingRequestTimeout(int timeout, TimeUnit unit);
+}
\ No newline at end of file
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcherFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcherFactory.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/RemoteDispatcherFactory.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,41 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.internal.RemoteDispatcherImpl;
+
+
+/**
+ * A factory to create new {@link RemoteDispatcher}.
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class RemoteDispatcherFactory
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public static RemoteDispatcher newDispatcher()
+ {
+ return new RemoteDispatcherImpl();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/TransportType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/TransportType.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/TransportType.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,17 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+/**
+ * The transport types supported by JBoss Messaging.
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public enum TransportType
+{
+ TCP, HTTP;
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,180 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.Constants.INT_LENGTH;
+import static org.jboss.messaging.core.remoting.Constants.LONG_LENGTH;
+import static org.jboss.messaging.core.remoting.Constants.NULL_BYTE;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_ENCODER;
+
+import java.nio.charset.CharacterCodingException;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageDecoder;
+import org.apache.mina.filter.codec.demux.MessageDecoderResult;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public abstract class AbstractPacketCodec<P extends AbstractPacket> implements
+ MessageEncoder<P>, MessageDecoder
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private PacketType type;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ protected AbstractPacketCodec(PacketType type)
+ {
+ assert type != null;
+
+ this.type = type;
+ }
+
+ // Public --------------------------------------------------------
+
+ // MessageEncoder implementation ---------------------------------
+
+ public void encode(IoSession session, P packet, ProtocolEncoderOutput out)
+ throws Exception
+ {
+ long correlationID = packet.getCorrelationID();
+ String targetID = packet.getTargetID();
+ String callbackID = packet.getCallbackID();
+
+ int headerLength = LONG_LENGTH + targetID.getBytes().length
+ + callbackID.getBytes().length + 2 /* NULL bytes */;
+
+ IoBuffer buf = IoBuffer.allocate(256);
+
+ // Enable auto-expand for easier encoding
+ buf.setAutoExpand(true);
+
+ buf.put(packet.getType().byteValue());
+ buf.putInt(headerLength);
+ buf.putLong(correlationID);
+ buf.putString(targetID, UTF_8_ENCODER);
+ buf.put(NULL_BYTE);
+ buf.putString(callbackID, UTF_8_ENCODER);
+ buf.put(NULL_BYTE);
+
+ encodeBody(session, packet, buf);
+
+ buf.flip();
+ out.write(buf);
+ }
+
+ // MessageDecoder implementation ---------------------------------
+
+ public MessageDecoderResult decodable(IoSession session, IoBuffer in)
+ {
+ byte t = in.get();
+ if (t != type.byteValue())
+ {
+ return NOT_OK;
+ }
+ if (in.remaining() < INT_LENGTH)
+ {
+ System.out.println("need more data to read header length");
+ // can not read next int
+ return NEED_DATA;
+ }
+ int headerLength = in.getInt();
+ if (in.remaining() < headerLength)
+ {
+ System.out.println("need more data to read header");
+ return NEED_DATA;
+ }
+ in.getLong(); // correlation ID
+ try
+ {
+ in.getString(UTF_8_DECODER);
+ } catch (CharacterCodingException e)
+ {
+ return NOT_OK;
+ }
+ try
+ {
+ in.getString(UTF_8_DECODER);
+ } catch (CharacterCodingException e)
+ {
+ return NOT_OK;
+ }
+
+ if (in.remaining() < INT_LENGTH)
+ {
+ System.out.println("need more data to read body length");
+ // can not read next int
+ return NEED_DATA;
+ }
+ int bodyLength = in.getInt();
+ if (bodyLength == 0)
+ {
+ return OK;
+ }
+ if (in.remaining() < bodyLength)
+ {
+ System.out.println("need more data to read body");
+ return NEED_DATA;
+ }
+ return OK;
+ }
+
+ public MessageDecoderResult decode(IoSession session, IoBuffer in,
+ ProtocolDecoderOutput out) throws Exception
+ {
+ in.get(); // skip message type
+ in.getInt(); // skip header length
+ long correlationID = in.getLong();
+ String targetID = in.getString(UTF_8_DECODER);
+ String callbackID = in.getString(UTF_8_DECODER);
+
+ P packet = decodeBody(session, in);
+
+ if (packet == null)
+ {
+ return NEED_DATA;
+ }
+ packet.setTargetID(targetID);
+ packet.setCorrelationID(correlationID);
+ packet.setCallbackID(callbackID);
+ out.write(packet);
+
+ return OK;
+ }
+
+ public void finishDecode(IoSession session, ProtocolDecoderOutput out)
+ throws Exception
+ {
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected abstract void encodeBody(IoSession session, P packet, IoBuffer buf)
+ throws Exception;
+
+ protected abstract P decodeBody(IoSession session, IoBuffer in)
+ throws Exception;
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ConnectionFactoryCreateConnectionRequestCodec extends
+ AbstractPacketCodec<ConnectionFactoryCreateConnectionRequest>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public ConnectionFactoryCreateConnectionRequestCodec()
+ {
+ super(REQ_CONNECTIONFACTORY_CREATECONNECTION);
+ }
+
+ // AbstractPackedCodec overrides----------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session,
+ ConnectionFactoryCreateConnectionRequest request, IoBuffer out)
+ throws Exception
+ {
+ // no body
+ out.putInt(0);
+ }
+
+ @Override
+ protected ConnectionFactoryCreateConnectionRequest decodeBody(
+ IoSession session, IoBuffer in) throws Exception
+ {
+ in.getInt(); // skip body length
+ return new ConnectionFactoryCreateConnectionRequest();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionResponseCodec.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.Constants.NULL_BYTE;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_ENCODER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CONNECTIONFACTORY_CREATECONNECTION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ConnectionFactoryCreateConnectionResponseCodec extends
+ AbstractPacketCodec<ConnectionFactoryCreateConnectionResponse>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public ConnectionFactoryCreateConnectionResponseCodec()
+ {
+ super(RESP_CONNECTIONFACTORY_CREATECONNECTION);
+ }
+
+ // AbstractPackedCodec overrides----------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session,
+ ConnectionFactoryCreateConnectionResponse response, IoBuffer out)
+ throws Exception
+ {
+ String id = response.getID();
+
+ int bodyLength = id.getBytes().length + 1 /* NULL byte */;
+
+ out.putInt(bodyLength);
+ out.putString(id, UTF_8_ENCODER);
+ out.put(NULL_BYTE);
+ }
+
+ @Override
+ protected ConnectionFactoryCreateConnectionResponse decodeBody(
+ IoSession session, IoBuffer in) throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+ String id = in.getString(UTF_8_DECODER);
+
+ return new ConnectionFactoryCreateConnectionResponse(id);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/NullPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/NullPacketCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/NullPacketCodec.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class NullPacketCodec extends AbstractPacketCodec<NullPacket>
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public NullPacketCodec()
+ {
+ super(NULL);
+ }
+
+ // AbstractPackedCodec overrides----------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, NullPacket packet,
+ IoBuffer out) throws Exception
+ {
+ // no body
+ out.putInt(0);
+ }
+
+ @Override
+ protected NullPacket decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ in.getInt(); // skip body length
+ return new NullPacket();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class PacketCodecFactory extends DemuxingProtocolCodecFactory
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // FIXME: split encoder/decoder required only on client and/or server sides
+ public PacketCodecFactory()
+ {
+ super.addMessageDecoder(NullPacketCodec.class);
+ super.addMessageEncoder(NullPacket.class, NullPacketCodec.class);
+
+ // TextPacket are for testing purpose only!
+ super.addMessageDecoder(TextPacketCodec.class);
+ super.addMessageEncoder(TextPacket.class, TextPacketCodec.class);
+
+ super
+ .addMessageDecoder(ConnectionFactoryCreateConnectionRequestCodec.class);
+ super.addMessageEncoder(ConnectionFactoryCreateConnectionRequest.class,
+ ConnectionFactoryCreateConnectionRequestCodec.class);
+
+ super
+ .addMessageDecoder(ConnectionFactoryCreateConnectionResponseCodec.class);
+ super.addMessageEncoder(ConnectionFactoryCreateConnectionResponse.class,
+ ConnectionFactoryCreateConnectionResponseCodec.class);
+
+ // super.addMessageDecoder(ClientDeliveryCodec.class);
+ // super.addMessageEncoder(ClientDelivery.class,
+ // ClientDeliveryCodec.class);
+ //
+ // super.addMessageDecoder(ConnectionCreateSessionRequestCodec.class);
+ // super.addMessageEncoder(ConnectionCreateSessionRequest.class,
+ // ConnectionCreateSessionRequestCodec.class);
+ //
+ // super.addMessageDecoder(ConnectionCreateSessionResponseCodec.class);
+ // super.addMessageEncoder(ConnectionCreateSessionResponse.class,
+ // ConnectionCreateSessionResponseCodec.class);
+
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/TextPacketCodec.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.Constants.NULL_BYTE;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.core.remoting.Constants.UTF_8_ENCODER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class TextPacketCodec extends AbstractPacketCodec<TextPacket>
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public TextPacketCodec()
+ {
+ super(TEXT);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, TextPacket packet, IoBuffer out)
+ throws Exception
+ {
+ int bodyLength = packet.getText().getBytes().length + 1; // for NULL byte
+
+ out.putInt(bodyLength);
+ out.putString(packet.getText(), UTF_8_ENCODER);
+ out.put(NULL_BYTE);
+ }
+
+ @Override
+ protected TextPacket decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+ String text = in.getString(UTF_8_DECODER);
+
+ return new TextPacket(text);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.reqres.Response;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ClientHandler extends IoHandlerAdapter
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // IoHandlerAdapter overrides ------------------------------------
+
+ @Override
+ public void messageReceived(IoSession session, Object message)
+ throws Exception
+ {
+ if (message instanceof AbstractPacket)
+ {
+ AbstractPacket packet = (AbstractPacket) message;
+ String targetID = packet.getTargetID();
+ PacketHandler handler = (PacketHandler) session.getAttribute(targetID);
+ if (handler != null)
+ {
+ handler.handle(packet);
+ } else
+ {
+ System.err
+ .println("ClientHandler.messageReceived() unhandled packet: "
+ + packet);
+ }
+ } else if (message instanceof Response)
+ {
+ // response is handled by the reqres filter.
+ // do nothing
+ } else
+ {
+ System.err.println("unhandled message: " + message);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
+import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
+
+import org.apache.mina.filter.reqres.ResponseInspector;
+import org.apache.mina.filter.reqres.ResponseType;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class PacketInspector implements ResponseInspector
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // ResponseInspector implementation ------------------------------
+
+ public Object getRequestId(Object message)
+ {
+ if (!(message instanceof AbstractPacket))
+ {
+ return null;
+ }
+ long correlationID = ((AbstractPacket) message).getCorrelationID();
+ if (correlationID != NO_CORRELATION_ID)
+ {
+ return correlationID;
+ } else
+ {
+ return null;
+ }
+ }
+
+ public ResponseType getResponseType(Object message)
+ {
+ if (!(message instanceof AbstractPacket))
+ {
+ return null;
+ }
+
+ return WHOLE;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/RemoteDispatcherImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/RemoteDispatcherImpl.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/RemoteDispatcherImpl.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,243 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.Constants.CONNECTION_TIMEOUT;
+import static org.jboss.messaging.core.remoting.TransportType.HTTP;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.filter.reqres.Request;
+import org.apache.mina.filter.reqres.RequestResponseFilter;
+import org.apache.mina.filter.reqres.RequestTimeoutException;
+import org.apache.mina.filter.reqres.Response;
+import org.apache.mina.filter.ssl.SslFilter;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.RemoteDispatcher;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class RemoteDispatcherImpl implements RemoteDispatcher
+{
+
+ // Attributes ----------------------------------------------------
+
+ private IoSession session;
+ private SslFilter sslFilter;
+
+ // By default, a blocking request will timeout after 5 seconds
+ private int blockingRequestTimeout = 5;
+ private TimeUnit blockingRequestTimeUnit = SECONDS;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public RemoteDispatcherImpl()
+ {
+ }
+
+ // RemoteDispatcher implementation -------------------------------
+
+ public void connect(int port, TransportType transport) throws Exception
+ {
+ connect(port, transport, false);
+ }
+
+ public void connect(int port, TransportType transport, boolean useSSL)
+ throws Exception
+ {
+ assert port > 0;
+ assert transport != null;
+
+ NioSocketConnector connector = new NioSocketConnector();
+
+ connector.setConnectTimeout(CONNECTION_TIMEOUT);
+
+ if (useSSL)
+ addSSLSupport(connector.getFilterChain());
+
+ MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+ connector.getFilterChain().addLast("mdc", mdcInjectionFilter);
+
+ if (transport == TCP)
+ {
+ connector.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
+ } else
+ {
+ assert transport == HTTP;
+
+ // TODO support HTTP
+ // URL url = new URL("http://localhost:" + port + "/");
+ // connector.getFilterChain().addLast("http_codec",
+ // new ProtocolCodecFilter(new HttpProtocolCodecFactory(url)));
+ // connector.getFilterChain().addLast("http_logger", new
+ // LoggingFilter());
+ // connector.getFilterChain().addLast("http_filter",
+ // new HTTPFilter(false));
+ }
+
+ addBlockingRequestResponseFilter(connector.getFilterChain());
+
+ connector.getFilterChain().addLast("logger", new LoggingFilter());
+
+ connector.setHandler(new ClientHandler());
+ InetSocketAddress address = new InetSocketAddress(port);
+ ConnectFuture future = connector.connect(address);
+ future.awaitUninterruptibly();
+ if (!future.isConnected())
+ {
+ throw new IOException("Cannot connect to " + address.toString());
+ }
+ this.session = future.getSession();
+ }
+
+ public boolean disconnect() throws Exception
+ {
+ assert session != null;
+
+ if (sslFilter != null)
+ {
+ sslFilter.stopSsl(session).awaitUninterruptibly();
+ // FIXME: w/o the delay, an exception is thrown:
+ // "Inbound closed before receiving peer's close_notify"
+ Thread.sleep(500);
+ }
+ CloseFuture closeFuture = session.close().awaitUninterruptibly();
+ return closeFuture.isClosed();
+ }
+
+ public void register(PacketHandler handler)
+ {
+ assert session != null;
+ assertValidID(handler.getID());
+ assert handler != null;
+
+ session.setAttribute(handler.getID(), handler);
+ }
+
+ public void unregister(String handlerID)
+ {
+ assert session != null;
+ assertValidID(handlerID);
+
+ session.removeAttribute(handlerID);
+ }
+
+ public void sendOneWay(AbstractPacket packet)
+ {
+ assert packet != null;
+ checkConnected();
+
+ session.write(packet);
+ }
+
+ public void sendOneWay(AbstractPacket packet, PacketHandler callbackHandler)
+ {
+ assert packet != null;
+ assert callbackHandler != null;
+ checkConnected();
+
+ register(callbackHandler);
+ packet.setCallbackID(callbackHandler.getID());
+ session.write(packet);
+ }
+
+ public AbstractPacket sendBlocking(AbstractPacket packet)
+ throws TimeoutException
+ {
+ assert packet != null;
+ checkConnected();
+
+ packet.setCorrelationID(System.nanoTime());
+
+ // TODO request timeout must be configurable
+ Request req = new Request(packet.getCorrelationID(), packet,
+ blockingRequestTimeout, blockingRequestTimeUnit);
+ session.write(req);
+ Response response;
+ try
+ {
+ response = req.awaitResponse();
+ return (AbstractPacket) response.getMessage();
+ } catch (RequestTimeoutException e)
+ {
+ TimeoutException toe = new TimeoutException();
+ toe.initCause(e);
+ throw toe;
+ } catch (InterruptedException e)
+ {
+ TimeoutException toe = new TimeoutException();
+ toe.initCause(e);
+ throw toe;
+ }
+ }
+
+ public void setBlockingRequestTimeout(int timeout, TimeUnit unit)
+ {
+ this.blockingRequestTimeout = timeout;
+ this.blockingRequestTimeUnit = unit;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void addBlockingRequestResponseFilter(
+ DefaultIoFilterChainBuilder chain)
+ {
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ RequestResponseFilter filter = new RequestResponseFilter(
+ new PacketInspector(), scheduler);
+ chain.addLast("reqres", filter);
+ }
+
+ private void addSSLSupport(DefaultIoFilterChainBuilder chain)
+ throws Exception
+ {
+ // TODO support SSL
+ // this.sslFilter = new
+ // SslFilter(BogusSslContextFactory.getInstance(false));
+ // sslFilter.setUseClientMode(true);
+ // chain.addLast("sslFilter", sslFilter);
+ }
+
+ private void checkConnected()
+ {
+ if (session == null)
+ {
+ throw new IllegalStateException("RemoteDispatcher is not connected.");
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,94 @@
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.Constants.NO_CALLBACK_ID;
+import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
+import static org.jboss.messaging.core.remoting.Constants.NO_TARGET_ID;
+
+public class AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long correlationID = NO_CORRELATION_ID;
+
+ private String targetID = NO_TARGET_ID;
+
+ private String callbackID = NO_CALLBACK_ID;
+
+ private final PacketType type;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public AbstractPacket(PacketType type)
+ {
+ assert type != null;
+
+ this.type = type;
+ }
+
+ // Public --------------------------------------------------------
+
+ public PacketType getType()
+ {
+ return type;
+ }
+
+ public void setCorrelationID(long correlationID)
+ {
+ this.correlationID = correlationID;
+ }
+
+ public long getCorrelationID()
+ {
+ return correlationID;
+ }
+
+ public String getTargetID()
+ {
+ return targetID;
+ }
+
+ public void setTargetID(String targetID)
+ {
+ assertValidID(targetID);
+
+ this.targetID = targetID;
+ }
+
+ public void setCallbackID(String callbackID)
+ {
+ assertValidID(callbackID);
+
+ this.callbackID = callbackID;
+ }
+
+ public String getCallbackID()
+ {
+ return callbackID;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + "]";
+ }
+
+ protected String getParentString()
+ {
+ return "PACKET[type=" + type + ", correlationID=" + correlationID
+ + ", targetID=" + targetID + ", callabckID=" + callbackID;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ConnectionFactoryCreateConnectionRequest extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // TODO: add auth credentials
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ConnectionFactoryCreateConnectionRequest()
+ {
+ super(REQ_CONNECTIONFACTORY_CREATECONNECTION);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CONNECTIONFACTORY_CREATECONNECTION;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ConnectionFactoryCreateConnectionResponse extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String id;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ConnectionFactoryCreateConnectionResponse(String id)
+ {
+ super(RESP_CONNECTIONFACTORY_CREATECONNECTION);
+
+ assertValidID(id);
+
+ this.id = id;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getID()
+ {
+ return id;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", id=" + id + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/NullPacket.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/NullPacket.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/NullPacket.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class NullPacket extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public NullPacket()
+ {
+ super(NULL);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public enum PacketType
+{
+ NULL((byte)1),
+ CLIENT_DELIVERY((byte) 2),
+ TEXT((byte) 3),
+ REQ_CONNECTIONFACTORY_CREATECONNECTION((byte) 100),
+ RESP_CONNECTIONFACTORY_CREATECONNECTION((byte) 100100),
+ REQ_CONNECTION_CREATESESSION((byte) 201),
+ RESP_CONNECTION_CREATESESSION((byte) 100200);
+
+ private byte type;
+
+ PacketType(byte type)
+ {
+ this.type = type;
+ }
+
+ public byte byteValue()
+ {
+ return type;
+ }
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/TextPacket.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/TextPacket.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/TextPacket.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
+
+
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class TextPacket extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String text;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public TextPacket(String text)
+ {
+ super(TEXT);
+
+ assert text != null;
+
+ this.text = text;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getText()
+ {
+ return text;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", text=" + text + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,179 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.jboss.messaging.core.remoting.Constants.PORT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.jboss.messaging.core.remoting.AbstractPacketHandler;
+import org.jboss.messaging.core.remoting.RemoteDispatcher;
+import org.jboss.messaging.core.remoting.RemoteDispatcherFactory;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class RemoteDispatcherTest extends TestSupport
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testCanNotSendPacketIfNotConnected() throws Exception
+ {
+ RemoteDispatcher dispatcher = RemoteDispatcherFactory.newDispatcher();
+
+ try
+ {
+ dispatcher.sendOneWay(new NullPacket());
+ fail("can not send a packet if the dispatcher is not connected");
+ } catch (IllegalStateException e)
+ {
+
+ }
+ }
+
+ public void testSendOneWay() throws Exception
+ {
+ TextPacket packet = new TextPacket("testSendOneWay");
+ clientDispatcher.sendOneWay(packet);
+
+ Thread.sleep(300);
+
+ List<AbstractPacket> messages = serverHandler.getPackets();
+ assertEquals(1, messages.size());
+ String response = ((TextPacket) messages.get(0)).getText();
+ assertEquals(packet.getText(), response);
+ }
+
+ public void testSendManyOneWay() throws Exception
+ {
+ TextPacket[] packets = new TextPacket[MANY_MESSAGES];
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ packets[i] = new TextPacket("testSendManyOneWay " + i);
+ clientDispatcher.sendOneWay(packets[i]);
+ }
+
+ List<AbstractPacket> receivedPackets = serverHandler.getPackets();
+ // unlikely that all messages have been consumed
+ assertNotSame(MANY_MESSAGES, receivedPackets.size());
+
+ Thread.sleep(2 * 1000);
+
+ assertEquals(MANY_MESSAGES, receivedPackets.size());
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ TextPacket receivedPacket = (TextPacket) receivedPackets.get(i);
+ assertEquals(packets[i].getText(), receivedPacket.getText());
+ }
+ }
+
+ public void testSendOneWayWithCallbackHandler() throws Exception
+ {
+ // add some lag
+ serverHandler.setSleepTime(300, MILLISECONDS);
+
+ TestClientHandler callbackHandler = new TestClientHandler();
+
+ TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
+
+ clientDispatcher.sendOneWay(packet, callbackHandler);
+ assertEquals(0, callbackHandler.getPackets().size());
+
+ Thread.sleep(600);
+
+ assertEquals(1, callbackHandler.getPackets().size());
+ String response = callbackHandler.getPackets().get(0).getText();
+ assertEquals(reverse(packet.getText()), response);
+ }
+
+ public void testSendBlocking() throws Exception
+ {
+ TextPacket request = new TextPacket("testSendBlocking");
+
+ AbstractPacket receivedPacket = clientDispatcher.sendBlocking(request);
+
+ assertNotNull(receivedPacket);
+ assertTrue(receivedPacket instanceof TextPacket);
+ TextPacket response = (TextPacket) receivedPacket;
+ assertEquals(reverse(request.getText()), response.getText());
+ }
+
+ public void testSendBlockingWithTimeout() throws Exception
+ {
+ clientDispatcher.setBlockingRequestTimeout(5, SECONDS);
+ serverHandler.setSleepTime(7, SECONDS);
+
+ AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
+
+ try
+ {
+ clientDispatcher.sendBlocking(packet);
+ fail("a RequestTimeoutException should be thrown");
+ } catch (TimeoutException e)
+ {
+ }
+ }
+
+ // TestCase implementation ---------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startServer(PORT, TRANSPORT);
+ startClient(PORT, TRANSPORT);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ clientDispatcher.disconnect();
+ serverAcceptor.unbind();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private final class TestClientHandler extends AbstractPacketHandler
+ {
+ private final List<TextPacket> packets;
+
+ private TestClientHandler()
+ {
+ packets = new ArrayList<TextPacket>();
+ }
+
+ public void handle(AbstractPacket packet)
+ {
+ packets.add((TextPacket) packet);
+ }
+
+ public List<TextPacket> getPackets()
+ {
+ return packets;
+ }
+ }
+}
Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,133 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static org.jboss.messaging.core.remoting.Constants.NO_CALLBACK_ID;
+import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
+import static org.jboss.messaging.core.remoting.TransportType.HTTP;
+import static org.jboss.test.messaging.core.remoting.TestSupport.reverse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class ReverseServerHandler extends IoHandlerAdapter
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private List<AbstractPacket> packets = new ArrayList<AbstractPacket>();
+
+ private long sleepTime = 0;
+
+ private List<IoSession> sessions = new ArrayList<IoSession>();
+
+ private TransportType transport;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ ReverseServerHandler(TransportType transport)
+ {
+ this.transport = transport;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setSleepTime(long time, TimeUnit unit)
+ {
+ this.sleepTime = unit.toMillis(time);
+ }
+
+ public List<AbstractPacket> getPackets()
+ {
+ return packets;
+ }
+
+ public List<IoSession> getSessions()
+ {
+ return sessions;
+ }
+
+ // IoHandlerAdapter overrides ------------------------------------
+
+ @Override
+ public void messageReceived(IoSession session, Object msg)
+ {
+ packets.add((AbstractPacket) msg);
+
+ // TODO put this logic in the real server handler
+ if (((AbstractPacket) msg).getCorrelationID() == NO_CORRELATION_ID
+ && transport == HTTP)
+ {
+ session.write(new NullPacket());
+ }
+
+ if (msg instanceof TextPacket)
+ {
+ TextPacket incomingPacket = (TextPacket) msg;
+ if (mustReply(incomingPacket))
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ TextPacket p = new TextPacket(reverse(incomingPacket.getText()));
+ p.setCorrelationID(incomingPacket.getCorrelationID());
+ if (!NO_CALLBACK_ID.equals(incomingPacket.getCallbackID()))
+ {
+ p.setTargetID(incomingPacket.getCallbackID());
+ }
+ session.write(p);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ sessions.add(session);
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ super.sessionClosed(session);
+ sessions.remove(session);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private boolean mustReply(AbstractPacket incomingPacket)
+ {
+ boolean mustReply = (incomingPacket.getCorrelationID() != NO_CORRELATION_ID)
+ || (!NO_CALLBACK_ID.equals(incomingPacket.getCallbackID()));
+ return mustReply;
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file
Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static org.jboss.messaging.core.remoting.Constants.PORT;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.AbstractPacketHandler;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class TargetHandlerTest extends TestSupport
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testClientHandlePacketSentByServer() throws Exception
+ {
+ ClientTargetHandler targetHandler = new ClientTargetHandler();
+
+ clientDispatcher.register(targetHandler);
+
+ // send a packet to create the IoSession on the server
+ clientDispatcher.sendOneWay(new TextPacket(
+ "testClientHandlePacketSentByServer from client"));
+
+ Thread.sleep(300);
+
+ assertEquals(1, serverHandler.getSessions().size());
+ IoSession serverSession = serverHandler.getSessions().get(0);
+ TextPacket packetFromServer = new TextPacket(
+ "testClientHandlePacketSentByServer from server");
+ packetFromServer.setTargetID(targetHandler.getID());
+ serverSession.write(packetFromServer);
+
+ Thread.sleep(300);
+
+ List<TextPacket> packets = targetHandler.getPackets();
+ assertEquals(1, packets.size());
+ TextPacket packetReceivedByClient = (TextPacket) packets.get(0);
+ assertEquals(packetFromServer.getText(), packetReceivedByClient.getText());
+ }
+
+ // TestCase overrides --------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ startServer(PORT, TRANSPORT);
+ startClient(PORT, TRANSPORT);
+ }
+
+ public void tearDown() throws Exception
+ {
+ clientDispatcher.disconnect();
+ serverAcceptor.unbind();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private final class ClientTargetHandler extends AbstractPacketHandler
+ {
+ private final List<TextPacket> packets;
+
+ private ClientTargetHandler()
+ {
+ packets = new ArrayList<TextPacket>();
+ }
+
+ public void handle(AbstractPacket packet)
+ {
+ packets.add((TextPacket) packet);
+ }
+
+ public List<TextPacket> getPackets()
+ {
+ return packets;
+ }
+ }
+}
Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting;
+
+import static org.jboss.messaging.core.remoting.TransportType.HTTP;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.messaging.core.remoting.RemoteDispatcher;
+import org.jboss.messaging.core.remoting.RemoteDispatcherFactory;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public abstract class TestSupport extends TestCase
+{
+ // Constants -----------------------------------------------------
+
+ public static final int MANY_MESSAGES = 500;
+
+ /**
+ * Configurable by system property <code>transport.type</code>, default is
+ * TCP
+ */
+ public final static TransportType TRANSPORT;
+
+ // Attributes ----------------------------------------------------
+
+ NioSocketAcceptor serverAcceptor;
+
+ ReverseServerHandler serverHandler;
+
+ RemoteDispatcher clientDispatcher;
+
+ // Static --------------------------------------------------------
+
+ static
+ {
+ String transportType = System.getProperty("transport.type", TCP
+ .toString());
+ TRANSPORT = TransportType.valueOf(transportType);
+ info("Default transport is " + TRANSPORT);
+ }
+
+ static String reverse(String text)
+ {
+ // Reverse text
+ StringBuffer buf = new StringBuffer(text.length());
+ for (int i = text.length() - 1; i >= 0; i--)
+ {
+ buf.append(text.charAt(i));
+ }
+ return buf.toString();
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ void startServer(int port, TransportType transport) throws Exception
+ {
+ startServer(port, transport, false);
+ }
+
+ void startServer(int port, TransportType transport, boolean useSSL)
+ throws Exception
+ {
+ serverAcceptor = new NioSocketAcceptor();
+ serverHandler = new ReverseServerHandler(transport);
+
+ MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+ serverAcceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
+
+ if (useSSL)
+ addSSLSupport(serverAcceptor.getFilterChain());
+
+ info("using " + transport);
+ if (transport == TCP)
+ {
+ serverAcceptor.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
+ } else
+ {
+ assert transport == HTTP;
+
+ // TODO add http support
+ // serverAcceptor.getFilterChain().addLast("http_codec",
+ // new ProtocolCodecFilter(new HttpServerCodecFactory()));
+ // serverAcceptor.getFilterChain().addLast("http_filter",
+ // new HTTPFilter(true));
+ }
+ serverAcceptor.getFilterChain().addLast("logger", new LoggingFilter());
+
+ // Bind
+ serverAcceptor.setLocalAddress(new InetSocketAddress(port));
+ info("listening on " + port);
+ serverAcceptor.setHandler(serverHandler);
+ serverAcceptor.bind();
+ }
+
+ private void addSSLSupport(DefaultIoFilterChainBuilder chain)
+ throws Exception
+ {
+ // TODO add SSL support
+ // SslFilter sslFilter = new
+ // SslFilter(BogusSslContextFactory.getInstance(true));
+ // chain.addLast("sslFilter", sslFilter);
+ // info("SSL enabled");
+ }
+
+ void startClient(int port, TransportType transport) throws Exception
+ {
+ startClient(port, transport, false);
+ }
+
+ void startClient(int port, TransportType transport, boolean useSSL)
+ throws Exception
+ {
+ clientDispatcher = RemoteDispatcherFactory.newDispatcher();
+ clientDispatcher.connect(port, transport, useSSL);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private static void info(String info)
+ {
+ System.out.format("### %-50s ###\n", info);
+ }
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-14 10:20:05 UTC (rev 3327)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.wireformat;
+
+import static java.util.UUID.randomUUID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CONNECTIONFACTORY_CREATECONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.filter.codec.ProtocolCodecSession;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class PacketTypeTest extends TestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testNullPacket() throws Exception
+ {
+ NullPacket packet = new NullPacket();
+ packet.setCallbackID(randomUUID().toString());
+ packet.setCorrelationID(System.currentTimeMillis());
+ packet.setTargetID(randomUUID().toString());
+
+ AbstractPacket decodedPacket = encodeAndDecode(packet);
+
+ assertTrue(decodedPacket instanceof NullPacket);
+ NullPacket p = (NullPacket) decodedPacket;
+
+ assertEquals(NULL, p.getType());
+ assertEquals(packet.getCallbackID(), p.getCallbackID());
+ assertEquals(packet.getCorrelationID(), p.getCorrelationID());
+ assertEquals(packet.getTargetID(), p.getTargetID());
+ }
+
+ public void testTextPacket() throws Exception
+ {
+ TextPacket packet = new TextPacket("testTextPacket");
+
+ AbstractPacket decodedPacket = encodeAndDecode(packet);
+
+ assertTrue(decodedPacket instanceof TextPacket);
+ TextPacket p = (TextPacket) decodedPacket;
+
+ assertEquals(TEXT, p.getType());
+ assertEquals(packet.getText(), p.getText());
+ }
+
+ public void testConnectionFactoryCreateConnectionRequest() throws Exception
+ {
+ ConnectionFactoryCreateConnectionRequest packet = new ConnectionFactoryCreateConnectionRequest();
+
+ AbstractPacket decodedPacket = encodeAndDecode(packet);
+
+ assertTrue(decodedPacket instanceof ConnectionFactoryCreateConnectionRequest);
+ assertEquals(REQ_CONNECTIONFACTORY_CREATECONNECTION, decodedPacket
+ .getType());
+ }
+
+ public void testConnectionFactoryCreateConnectionResponse() throws Exception
+ {
+ ConnectionFactoryCreateConnectionResponse packet = new ConnectionFactoryCreateConnectionResponse(
+ randomUUID().toString());
+
+ AbstractPacket decodedPacket = encodeAndDecode(packet);
+
+ assertTrue(decodedPacket instanceof ConnectionFactoryCreateConnectionResponse);
+ assertEquals(RESP_CONNECTIONFACTORY_CREATECONNECTION, decodedPacket
+ .getType());
+ }
+
+ // TestCase overrides ---------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private AbstractPacket encodeAndDecode(AbstractPacket packet)
+ throws Exception
+ {
+ IoBuffer buffer = encode(packet);
+ return decode(buffer);
+ }
+
+ private IoBuffer encode(AbstractPacket packet) throws Exception
+ {
+ ProtocolCodecSession httpCodecSession = new ProtocolCodecSession();
+ ProtocolEncoder encoder = new PacketCodecFactory().getEncoder();
+ encoder.encode(httpCodecSession, packet, httpCodecSession
+ .getEncoderOutput());
+ IoBuffer buffer = httpCodecSession.getEncoderOutputQueue().poll();
+ return buffer;
+ }
+
+ private AbstractPacket decode(IoBuffer buffer) throws Exception
+ {
+ ProtocolCodecSession session = new ProtocolCodecSession();
+ ProtocolDecoder decoder = new PacketCodecFactory().getDecoder();
+ decoder.decode(session, buffer, session.getDecoderOutput());
+
+ Object o = session.getDecoderOutputQueue().poll();
+
+ assertTrue(o instanceof AbstractPacket);
+
+ return (AbstractPacket) o;
+ }
+
+ // Inner classes -------------------------------------------------
+}
More information about the jboss-cvs-commits
mailing list