[jboss-cvs] JBoss Messaging SVN: r3303 - in projects/jbm-mina: etc and 21 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 9 11:09:13 EST 2007
Author: jmesnil
Date: 2007-11-09 11:09:12 -0500 (Fri, 09 Nov 2007)
New Revision: 3303
Added:
projects/jbm-mina/.classpath
projects/jbm-mina/.project
projects/jbm-mina/etc/
projects/jbm-mina/etc/log4j.properties
projects/jbm-mina/examples/
projects/jbm-mina/examples/org/
projects/jbm-mina/examples/org/jboss/
projects/jbm-mina/examples/org/jboss/messaging/
projects/jbm-mina/examples/org/jboss/messaging/remoting/
projects/jbm-mina/examples/org/jboss/messaging/remoting/client/
projects/jbm-mina/examples/org/jboss/messaging/remoting/client/Client.java
projects/jbm-mina/src/
projects/jbm-mina/src/org/
projects/jbm-mina/src/org/jboss/
projects/jbm-mina/src/org/jboss/messaging/
projects/jbm-mina/src/org/jboss/messaging/remoting/
projects/jbm-mina/src/org/jboss/messaging/remoting/AbstractPacketHandler.java
projects/jbm-mina/src/org/jboss/messaging/remoting/PacketHandler.java
projects/jbm-mina/src/org/jboss/messaging/remoting/RemoteDispatcher.java
projects/jbm-mina/src/org/jboss/messaging/remoting/RemoteDispatcherFactory.java
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/Assert.java
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/ClientHandler.java
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/Constants.java
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/RemoteDispatcherImpl.java
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/TransportType.java
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/Server.java
projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/ServerHandler.java
projects/jbm-mina/src/org/jboss/messaging/remoting/transport/
projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/
projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/Base64.java
projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/HTTPFilter.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/AbstractPacket.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/AbstractPacketCodec.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ClientDelivery.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ClientDeliveryCodec.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionRequest.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionRequestCodec.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionResponse.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionResponseCodec.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionRequestCodec.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionResponseCodec.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/NullPacket.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/NullPacketCodec.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketCodecFactory.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketInspector.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketType.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/TextPacket.java
projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/TextPacketCodec.java
projects/jbm-mina/test/
projects/jbm-mina/test/org/
projects/jbm-mina/test/org/jboss/
projects/jbm-mina/test/org/jboss/messaging/
projects/jbm-mina/test/org/jboss/messaging/remoting/
projects/jbm-mina/test/org/jboss/messaging/remoting/HandlerTest.java
projects/jbm-mina/test/org/jboss/messaging/remoting/PacketTypeTest.java
projects/jbm-mina/test/org/jboss/messaging/remoting/SendPacketTest.java
projects/jbm-mina/test/org/jboss/messaging/remoting/TestServerHandler.java
projects/jbm-mina/test/org/jboss/messaging/remoting/TestSupport.java
projects/jbm-mina/test/org/jboss/messaging/remoting/TransportTest.java
Log:
Initial import of prototype using MINA to implement communication between JBoss Messaging clients and servers
Added: projects/jbm-mina/.classpath
===================================================================
--- projects/jbm-mina/.classpath (rev 0)
+++ projects/jbm-mina/.classpath 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="src" path="test"/>
+ <classpathentry kind="src" path="etc"/>
+ <classpathentry kind="src" path="examples"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/mina-core"/>
+ <classpathentry kind="var" path="SLF4J_HOME/slf4j-api-1.4.3.jar"/>
+ <classpathentry kind="var" path="SLF4J_HOME/slf4j-log4j12-1.4.3.jar"/>
+ <classpathentry kind="lib" path="/jboss-messaging/thirdparty/apache-log4j/lib/log4j.jar"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/asyncweb-core"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/mina-filter-codec-http"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
Added: projects/jbm-mina/.project
===================================================================
--- projects/jbm-mina/.project (rev 0)
+++ projects/jbm-mina/.project 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>jbm-mina</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
Added: projects/jbm-mina/etc/log4j.properties
===================================================================
--- projects/jbm-mina/etc/log4j.properties (rev 0)
+++ projects/jbm-mina/etc/log4j.properties 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,10 @@
+# Please don't modify the log level until we reach to acceptable test coverage.
+# It's very useful when I test examples manually.
+log4j.rootCategory=TRACE, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d{HH:mm:ss}] %p [%X{remoteAddress}] [%c] - %m%n
+
+# you could use this pattern to test the MDC with the Chat server
+# log4j.appender.stdout.layout.ConversionPattern=[%d{HH:mm:ss}] %t %p %X{name} [%X{user}] [%X{remoteAddress}] [%c] - %m%n
\ No newline at end of file
Added: projects/jbm-mina/examples/org/jboss/messaging/remoting/client/Client.java
===================================================================
--- projects/jbm-mina/examples/org/jboss/messaging/remoting/client/Client.java (rev 0)
+++ projects/jbm-mina/examples/org/jboss/messaging/remoting/client/Client.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,68 @@
+package org.jboss.messaging.remoting.client;
+
+import static org.jboss.messaging.remoting.internal.Constants.PORT;
+import static org.jboss.messaging.remoting.internal.TransportType.TCP;
+import static org.jboss.messaging.remoting.wireformat.PacketType.RESP_CONNECTION_CREATESESSION;
+
+import org.jboss.messaging.remoting.PacketHandler;
+import org.jboss.messaging.remoting.RemoteDispatcher;
+import org.jboss.messaging.remoting.RemoteDispatcherFactory;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.ConnectionCreateSessionRequest;
+import org.jboss.messaging.remoting.wireformat.ConnectionCreateSessionResponse;
+import org.jboss.messaging.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+import org.jboss.messaging.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+
+public class Client
+{
+
+ public static void main(String[] args) throws Exception
+ {
+ final RemoteDispatcher dispatcher = RemoteDispatcherFactory
+ .newDispatcher();
+ dispatcher.connect(PORT, TCP);
+
+ ConnectionFactoryCreateConnectionRequest req = new ConnectionFactoryCreateConnectionRequest();
+ final ConnectionFactoryCreateConnectionResponse resp = (ConnectionFactoryCreateConnectionResponse) dispatcher
+ .sendBlocking(req);
+
+ PacketHandler connectionHandler = new PacketHandler()
+ {
+ public String getID()
+ {
+ return resp.getID();
+ }
+
+ public void handle(AbstractPacket packet)
+ {
+ System.err.println("for connection:" + packet);
+ if (packet.getType() == RESP_CONNECTION_CREATESESSION)
+ {
+ final ConnectionCreateSessionResponse response = (ConnectionCreateSessionResponse) packet;
+ PacketHandler sessionHandler = new PacketHandler()
+ {
+
+ public String getID()
+ {
+ return response.getID();
+ }
+
+ public void handle(AbstractPacket packet)
+ {
+ System.err.println("for Session:" + packet);
+ }
+ };
+ dispatcher.register(sessionHandler);
+ }
+ }
+ };
+ dispatcher.register(connectionHandler);
+
+ ConnectionCreateSessionRequest request = new ConnectionCreateSessionRequest(
+ true, 1, false);
+ request.setTargetID(connectionHandler.getID());
+ dispatcher.sendOneWay(request, connectionHandler);
+
+ // dispatcher.disconnect();
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/AbstractPacketHandler.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/AbstractPacketHandler.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/AbstractPacketHandler.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,29 @@
+package org.jboss.messaging.remoting;
+
+import java.util.UUID;
+
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+
+public abstract class AbstractPacketHandler implements PacketHandler
+{
+
+ private final String id;
+
+ public AbstractPacketHandler()
+ {
+ this.id = UUID.randomUUID().toString();
+ }
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public abstract void handle(AbstractPacket packet);
+
+ @Override
+ public String toString()
+ {
+ return "AbstractPacketHandler[id=" + id + "]";
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/PacketHandler.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/PacketHandler.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/PacketHandler.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,10 @@
+package org.jboss.messaging.remoting;
+
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+
+public interface PacketHandler
+{
+ String getID();
+
+ void handle(AbstractPacket packet);
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/RemoteDispatcher.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/RemoteDispatcher.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/RemoteDispatcher.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,21 @@
+package org.jboss.messaging.remoting;
+
+import org.jboss.messaging.remoting.internal.TransportType;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+
+public interface RemoteDispatcher
+{
+ public void sendOneWay(AbstractPacket packet);
+
+ public void sendOneWay(AbstractPacket packet, PacketHandler callbackHandler);
+
+ public AbstractPacket sendBlocking(AbstractPacket packet) throws Exception;
+
+ public void connect(int port, TransportType transport) throws Exception;
+
+ public void disconnect() throws Exception;
+
+ public void register(PacketHandler handler);
+
+ public void unregister(String handlerID);
+}
\ No newline at end of file
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/RemoteDispatcherFactory.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/RemoteDispatcherFactory.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/RemoteDispatcherFactory.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,38 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.remoting;
+
+import org.jboss.messaging.remoting.internal.RemoteDispatcherImpl;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class RemoteDispatcherFactory
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public static RemoteDispatcher newDispatcher()
+ {
+ return new RemoteDispatcherImpl();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/internal/Assert.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/internal/Assert.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/internal/Assert.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,10 @@
+package org.jboss.messaging.remoting.internal;
+
+public class Assert
+{
+ public final static void assertValidID(String id)
+ {
+ assert id != null;
+ assert id.length() != 0;
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/internal/ClientHandler.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/internal/ClientHandler.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/internal/ClientHandler.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,39 @@
+package org.jboss.messaging.remoting.internal;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.reqres.Response;
+import org.jboss.messaging.remoting.PacketHandler;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+
+public class ClientHandler extends IoHandlerAdapter
+{
+ @Override
+ public void messageReceived(IoSession session, Object message)
+ throws Exception
+ {
+ if (message instanceof AbstractPacket)
+ {
+ AbstractPacket packet = (AbstractPacket) message;
+ String targetID = packet.getTargetID();
+ PacketHandler handler = (PacketHandler) session.getAttribute(targetID);
+ if (handler != null)
+ {
+ handler.handle(packet);
+ } else
+ {
+ System.err
+ .println("ClientHandler.messageReceived() unhandled packet: "
+ + packet);
+ }
+ } else if (message instanceof Response)
+ {
+ // response is handled by the reqres filter.
+ // do nothing
+ }
+ else
+ {
+ System.err.println("unhandled message: " + message);
+ }
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/internal/Constants.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/internal/Constants.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/internal/Constants.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,44 @@
+package org.jboss.messaging.remoting.internal;
+
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+
+public class Constants
+{
+ public static final int CONNECTION_TIMEOUT = 20 * 1000;
+
+ public static final byte NULL_BYTE = (byte) 0;
+
+ public static final CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8")
+ .newEncoder();
+
+ public static final CharsetDecoder UTF_8_DECODER = Charset.forName("UTF-8")
+ .newDecoder();
+
+ public static final long NO_CORRELATION_ID = -1L;
+
+ public static final String NO_TARGET_ID = "NO_TARGET_ID_SET";
+
+ public static final String NO_CALLBACK_ID = "NO_CALLBACK_ID_SET";
+
+ public static final int INT_LENGTH = 4;
+
+ public static final int LONG_LENGTH = 8;
+
+ public static final byte TRUE = (byte) 0;
+
+ public static final byte FALSE = (byte) 1;
+
+ public static final int PORT = 8080;
+
+ public static final String PACKET_TYPE_HEADER = "PACKET_TYPE";
+
+ public static final String TARGET_ID_HEADER = "TARGET_ID";
+
+ public static final String CALLBACK_ID_HEADER = "CALLBACK_ID";
+
+ public static final String CORRELATION_ID_HEADER = "CORRELATION_ID";
+
+ public static final String PACKET_HTTP_PARAMETER = "packet";
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/internal/RemoteDispatcherImpl.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/internal/RemoteDispatcherImpl.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/internal/RemoteDispatcherImpl.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,138 @@
+package org.jboss.messaging.remoting.internal;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.jboss.messaging.remoting.internal.Assert.assertValidID;
+import static org.jboss.messaging.remoting.internal.Constants.CONNECTION_TIMEOUT;
+import static org.jboss.messaging.remoting.internal.TransportType.HTTP;
+import static org.jboss.messaging.remoting.internal.TransportType.TCP;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.http.HttpProtocolCodecFactory;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.filter.reqres.Request;
+import org.apache.mina.filter.reqres.RequestResponseFilter;
+import org.apache.mina.filter.reqres.Response;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.jboss.messaging.remoting.PacketHandler;
+import org.jboss.messaging.remoting.RemoteDispatcher;
+import org.jboss.messaging.remoting.transport.http.HTTPFilter;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.PacketCodecFactory;
+import org.jboss.messaging.remoting.wireformat.PacketInspector;
+
+public class RemoteDispatcherImpl implements RemoteDispatcher
+{
+
+ private IoSession session;
+
+ public RemoteDispatcherImpl()
+ {
+ }
+
+ public void connect(int port, TransportType transport) throws Exception
+ {
+ NioSocketConnector connector = new NioSocketConnector();
+
+ connector.setConnectTimeout(CONNECTION_TIMEOUT);
+
+ MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+ connector.getFilterChain().addLast("mdc", mdcInjectionFilter);
+
+ if (transport == TCP)
+ {
+ connector.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
+ } else
+ {
+ assert transport == HTTP;
+
+ // FIXME
+ URL url = new URL("http://localhost:" + port + "/");
+ connector.getFilterChain().addLast("http_codec",
+ new ProtocolCodecFilter(new HttpProtocolCodecFactory(url)));
+ connector.getFilterChain().addLast("http_logger", new LoggingFilter());
+ connector.getFilterChain().addLast("http_filter", new HTTPFilter(false));
+ }
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ RequestResponseFilter filter = new RequestResponseFilter(
+ new PacketInspector(), scheduler);
+ connector.getFilterChain().addLast("reqres", filter);
+ connector.getFilterChain().addLast("logger", new LoggingFilter());
+
+ connector.setHandler(new ClientHandler());
+ InetSocketAddress address = new InetSocketAddress(port);
+ ConnectFuture future = connector.connect(address);
+ future.awaitUninterruptibly();
+ if (!future.isConnected())
+ {
+ throw new IOException("Cannot connect to " + address.toString());
+ }
+
+ this.session = future.getSession();
+ }
+
+ public void disconnect() throws Exception
+ {
+ assert session != null;
+
+ session.close().await();
+ }
+
+ public void register(PacketHandler handler)
+ {
+ assert session != null;
+ assertValidID(handler.getID());
+ assert handler != null;
+
+ session.setAttribute(handler.getID(), handler);
+ }
+
+ public void unregister(String handlerID)
+ {
+ assert session != null;
+ assertValidID(handlerID);
+
+ session.removeAttribute(handlerID);
+ }
+
+ public void sendOneWay(AbstractPacket packet)
+ {
+ assert session != null;
+ assert packet != null;
+
+ session.write(packet);
+ }
+
+ public void sendOneWay(AbstractPacket packet, PacketHandler callbackHandler)
+ {
+ assert session != null;
+ assert packet != null;
+ assert callbackHandler != null;
+
+ register(callbackHandler);
+ packet.setCallbackID(callbackHandler.getID());
+ session.write(packet);
+ }
+
+ public AbstractPacket sendBlocking(AbstractPacket packet) throws Exception
+ {
+ assert session != null;
+ assert packet != null;
+
+ packet.setCorrelationID(System.nanoTime());
+
+ Request req = new Request(packet.getCorrelationID(), packet, 5, SECONDS);
+ session.write(req);
+ Response response = req.awaitResponse();
+ return (AbstractPacket) response.getMessage();
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/internal/TransportType.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/internal/TransportType.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/internal/TransportType.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,15 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.remoting.internal;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public enum TransportType
+{
+ TCP, HTTP;
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/Server.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/Server.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/Server.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,33 @@
+package org.jboss.messaging.remoting.internal.server;
+
+import static org.jboss.messaging.remoting.internal.Constants.PORT;
+
+import java.net.InetSocketAddress;
+
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.messaging.remoting.wireformat.PacketCodecFactory;
+
+public class Server
+{
+ public static void main(String[] args) throws Exception
+ {
+ NioSocketAcceptor acceptor = new NioSocketAcceptor();
+
+ // Prepare the configuration
+ MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+ acceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
+ acceptor.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
+ acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+
+ // Bind
+ acceptor.setLocalAddress(new InetSocketAddress(PORT));
+ acceptor.setHandler(new ServerHandler());
+ acceptor.bind();
+
+ System.out.println("Listening on port " + PORT);
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/ServerHandler.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/ServerHandler.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/internal/server/ServerHandler.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.jboss.messaging.remoting.internal.server;
+
+import static org.jboss.messaging.remoting.wireformat.PacketType.REQ_CONNECTION_CREATESESSION;
+
+import java.util.UUID;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.remoting.AbstractPacketHandler;
+import org.jboss.messaging.remoting.PacketHandler;
+import org.jboss.messaging.remoting.internal.Constants;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.ClientDelivery;
+import org.jboss.messaging.remoting.wireformat.ConnectionCreateSessionRequest;
+import org.jboss.messaging.remoting.wireformat.ConnectionCreateSessionResponse;
+import org.jboss.messaging.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+import org.jboss.messaging.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+import org.jboss.messaging.remoting.wireformat.NullPacket;
+import org.jboss.messaging.remoting.wireformat.PacketType;
+
+public class ServerHandler extends IoHandlerAdapter
+{
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause)
+ {
+ // Close connection when unexpected exception is caught.
+ session.close();
+ }
+
+ @Override
+ public void messageReceived(final IoSession session, Object msg)
+ {
+ if (!(msg instanceof AbstractPacket))
+ {
+ // FIXME: error handling
+ System.err.println("unsupported message: " + msg);
+ return;
+ }
+
+ AbstractPacket packet = (AbstractPacket) msg;
+
+ if(packet.getCorrelationID() == Constants.NO_CORRELATION_ID)
+ {
+ session.write(new NullPacket());
+ }
+
+ String targetID = packet.getTargetID();
+ PacketHandler handler = (PacketHandler) session.getAttribute(targetID);
+ if (handler != null)
+ {
+ handler.handle(packet);
+ return;
+ } else
+ {
+ PacketType type = packet.getType();
+ if (type == PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION)
+ {
+ System.err.println("for connection factory:" + packet);
+ ConnectionFactoryCreateConnectionResponse response = createConnection(
+ session, (ConnectionFactoryCreateConnectionRequest) packet);
+ session.write(response);
+ } else
+ {
+ System.err.println("unhandled packet:" + packet);
+ }
+ }
+ }
+
+ private ConnectionFactoryCreateConnectionResponse createConnection(
+ final IoSession session,
+ ConnectionFactoryCreateConnectionRequest request)
+ {
+ PacketHandler connectionHandler = new AbstractPacketHandler()
+ {
+ @Override
+ public void handle(AbstractPacket packet)
+ {
+ System.err.println("for connection: " + packet);
+ PacketType type = packet.getType();
+ if (type == REQ_CONNECTION_CREATESESSION)
+ {
+ ConnectionCreateSessionRequest request = (ConnectionCreateSessionRequest) packet;
+ final String sessionID = UUID.randomUUID().toString();
+ final ConnectionCreateSessionResponse response = new ConnectionCreateSessionResponse(
+ sessionID, 10);
+ transferHead(request, response);
+ session.write(response);
+
+ sendDeliveryLater(sessionID);
+ }
+ }
+
+ private void sendDeliveryLater(final String sessionID)
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(2 * 1000);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ AbstractPacket delivery = new ClientDelivery(sessionID,
+ System.currentTimeMillis(), 0);
+ delivery.setTargetID(sessionID);
+ session.write(delivery);
+ }
+ };
+ new Thread(runnable).start();
+ }
+ };
+ session.setAttribute(connectionHandler.getID(), connectionHandler);
+
+ ConnectionFactoryCreateConnectionResponse response = new ConnectionFactoryCreateConnectionResponse(
+ connectionHandler.getID());
+ transferHead(request, response);
+ return response;
+ }
+
+ void transferHead(AbstractPacket in, AbstractPacket out)
+ {
+ out.setCorrelationID(in.getCorrelationID());
+ out.setTargetID(in.getCallbackID());
+ }
+
+}
\ No newline at end of file
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/Base64.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/Base64.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/Base64.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,235 @@
+/*
+ * @(#)Base64.java 1.5 03/12/19
+ *
+ * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
+ * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
+ */
+
+package org.jboss.messaging.remoting.transport.http;
+
+class Base64 {
+ /**
+ * Translates the specified byte array into a Base64 string as per
+ * Preferences.put(byte[]).
+ */
+ static String byteArrayToBase64(byte[] a) {
+ return byteArrayToBase64(a, false);
+ }
+
+ /**
+ * Translates the specified byte array into an "aternate representation"
+ * Base64 string. This non-standard variant uses an alphabet that does
+ * not contain the uppercase alphabetic characters, which makes it
+ * suitable for use in situations where case-folding occurs.
+ */
+ static String byteArrayToAltBase64(byte[] a) {
+ return byteArrayToBase64(a, true);
+ }
+
+ private static String byteArrayToBase64(byte[] a, boolean alternate) {
+ int aLen = a.length;
+ int numFullGroups = aLen/3;
+ int numBytesInPartialGroup = aLen - 3*numFullGroups;
+ int resultLen = 4*((aLen + 2)/3);
+ StringBuffer result = new StringBuffer(resultLen);
+ char[] intToAlpha = (alternate ? intToAltBase64 : intToBase64);
+
+ // Translate all full groups from byte array elements to Base64
+ int inCursor = 0;
+ for (int i=0; i<numFullGroups; i++) {
+ int byte0 = a[inCursor++] & 0xff;
+ int byte1 = a[inCursor++] & 0xff;
+ int byte2 = a[inCursor++] & 0xff;
+ result.append(intToAlpha[byte0 >> 2]);
+ result.append(intToAlpha[(byte0 << 4)&0x3f | (byte1 >> 4)]);
+ result.append(intToAlpha[(byte1 << 2)&0x3f | (byte2 >> 6)]);
+ result.append(intToAlpha[byte2 & 0x3f]);
+ }
+
+ // Translate partial group if present
+ if (numBytesInPartialGroup != 0) {
+ int byte0 = a[inCursor++] & 0xff;
+ result.append(intToAlpha[byte0 >> 2]);
+ if (numBytesInPartialGroup == 1) {
+ result.append(intToAlpha[(byte0 << 4) & 0x3f]);
+ result.append("==");
+ } else {
+ // assert numBytesInPartialGroup == 2;
+ int byte1 = a[inCursor++] & 0xff;
+ result.append(intToAlpha[(byte0 << 4)&0x3f | (byte1 >> 4)]);
+ result.append(intToAlpha[(byte1 << 2)&0x3f]);
+ result.append('=');
+ }
+ }
+ // assert inCursor == a.length;
+ // assert result.length() == resultLen;
+ return result.toString();
+ }
+
+ /**
+ * This array is a lookup table that translates 6-bit positive integer
+ * index values into their "Base64 Alphabet" equivalents as specified
+ * in Table 1 of RFC 2045.
+ */
+ private static final char intToBase64[] = {
+ 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
+ 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
+ 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
+ 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'
+ };
+
+ /**
+ * This array is a lookup table that translates 6-bit positive integer
+ * index values into their "Alternate Base64 Alphabet" equivalents.
+ * This is NOT the real Base64 Alphabet as per in Table 1 of RFC 2045.
+ * This alternate alphabet does not use the capital letters. It is
+ * designed for use in environments where "case folding" occurs.
+ */
+ private static final char intToAltBase64[] = {
+ '!', '"', '#', '$', '%', '&', '\'', '(', ')', ',', '-', '.', ':',
+ ';', '<', '>', '@', '[', ']', '^', '`', '_', '{', '|', '}', '~',
+ 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
+ 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '?'
+ };
+
+ /**
+ * Translates the specified Base64 string (as per Preferences.get(byte[]))
+ * into a byte array.
+ *
+ * @throw IllegalArgumentException if <tt>s</tt> is not a valid Base64
+ * string.
+ */
+ static byte[] base64ToByteArray(String s) {
+ return base64ToByteArray(s, false);
+ }
+
+ /**
+ * Translates the specified "aternate representation" Base64 string
+ * into a byte array.
+ *
+ * @throw IllegalArgumentException or ArrayOutOfBoundsException
+ * if <tt>s</tt> is not a valid alternate representation
+ * Base64 string.
+ */
+ static byte[] altBase64ToByteArray(String s) {
+ return base64ToByteArray(s, true);
+ }
+
+ private static byte[] base64ToByteArray(String s, boolean alternate) {
+ byte[] alphaToInt = (alternate ? altBase64ToInt : base64ToInt);
+ int sLen = s.length();
+ int numGroups = sLen/4;
+ if (4*numGroups != sLen)
+ throw new IllegalArgumentException(
+ "String length must be a multiple of four.");
+ int missingBytesInLastGroup = 0;
+ int numFullGroups = numGroups;
+ if (sLen != 0) {
+ if (s.charAt(sLen-1) == '=') {
+ missingBytesInLastGroup++;
+ numFullGroups--;
+ }
+ if (s.charAt(sLen-2) == '=')
+ missingBytesInLastGroup++;
+ }
+ byte[] result = new byte[3*numGroups - missingBytesInLastGroup];
+
+ // Translate all full groups from base64 to byte array elements
+ int inCursor = 0, outCursor = 0;
+ for (int i=0; i<numFullGroups; i++) {
+ int ch0 = base64toInt(s.charAt(inCursor++), alphaToInt);
+ int ch1 = base64toInt(s.charAt(inCursor++), alphaToInt);
+ int ch2 = base64toInt(s.charAt(inCursor++), alphaToInt);
+ int ch3 = base64toInt(s.charAt(inCursor++), alphaToInt);
+ result[outCursor++] = (byte) ((ch0 << 2) | (ch1 >> 4));
+ result[outCursor++] = (byte) ((ch1 << 4) | (ch2 >> 2));
+ result[outCursor++] = (byte) ((ch2 << 6) | ch3);
+ }
+
+ // Translate partial group, if present
+ if (missingBytesInLastGroup != 0) {
+ int ch0 = base64toInt(s.charAt(inCursor++), alphaToInt);
+ int ch1 = base64toInt(s.charAt(inCursor++), alphaToInt);
+ result[outCursor++] = (byte) ((ch0 << 2) | (ch1 >> 4));
+
+ if (missingBytesInLastGroup == 1) {
+ int ch2 = base64toInt(s.charAt(inCursor++), alphaToInt);
+ result[outCursor++] = (byte) ((ch1 << 4) | (ch2 >> 2));
+ }
+ }
+ // assert inCursor == s.length()-missingBytesInLastGroup;
+ // assert outCursor == result.length;
+ return result;
+ }
+
+ /**
+ * Translates the specified character, which is assumed to be in the
+ * "Base 64 Alphabet" into its equivalent 6-bit positive integer.
+ *
+ * @throw IllegalArgumentException or ArrayOutOfBoundsException if
+ * c is not in the Base64 Alphabet.
+ */
+ private static int base64toInt(char c, byte[] alphaToInt) {
+ int result = alphaToInt[c];
+ if (result < 0)
+ throw new IllegalArgumentException("Illegal character " + c);
+ return result;
+ }
+
+ /**
+ * This array is a lookup table that translates unicode characters
+ * drawn from the "Base64 Alphabet" (as specified in Table 1 of RFC 2045)
+ * into their 6-bit positive integer equivalents. Characters that
+ * are not in the Base64 alphabet but fall within the bounds of the
+ * array are translated to -1.
+ */
+ private static final byte base64ToInt[] = {
+ -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, -1, -1, -1, 62, -1, -1, -1, 63, 52, 53, 54,
+ 55, 56, 57, 58, 59, 60, 61, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4,
+ 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
+ 24, 25, -1, -1, -1, -1, -1, -1, 26, 27, 28, 29, 30, 31, 32, 33, 34,
+ 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51
+ };
+
+ /**
+ * This array is the analogue of base64ToInt, but for the nonstandard
+ * variant that avoids the use of uppercase alphabetic characters.
+ */
+ private static final byte altBase64ToInt[] = {
+ -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1,
+ 2, 3, 4, 5, 6, 7, 8, -1, 62, 9, 10, 11, -1 , 52, 53, 54, 55, 56, 57,
+ 58, 59, 60, 61, 12, 13, 14, -1, 15, 63, 16, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, 17, -1, 18, 19, 21, 20, 26, 27, 28, 29, 30, 31, 32, 33,
+ 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
+ 51, 22, 23, 24, 25
+ };
+
+ public static void main(String args[]) {
+ int numRuns = Integer.parseInt(args[0]);
+ int numBytes = Integer.parseInt(args[1]);
+ java.util.Random rnd = new java.util.Random();
+ for (int i=0; i<numRuns; i++) {
+ for (int j=0; j<numBytes; j++) {
+ byte[] arr = new byte[j];
+ for (int k=0; k<j; k++)
+ arr[k] = (byte)rnd.nextInt();
+
+ String s = byteArrayToBase64(arr);
+ byte [] b = base64ToByteArray(s);
+ if (!java.util.Arrays.equals(arr, b))
+ System.out.println("Dismal failure!");
+
+ s = byteArrayToAltBase64(arr);
+ b = altBase64ToByteArray(s);
+ if (!java.util.Arrays.equals(arr, b))
+ System.out.println("Alternate dismal failure!");
+ }
+ }
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/HTTPFilter.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/HTTPFilter.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/HTTPFilter.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,147 @@
+package org.jboss.messaging.remoting.transport.http;
+
+import static org.jboss.messaging.remoting.internal.Constants.PACKET_HTTP_PARAMETER;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.filter.codec.ProtocolCodecSession;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.http.HttpRequestMessage;
+import org.apache.mina.filter.codec.http.HttpResponseMessage;
+import org.apache.mina.filter.util.WriteRequestFilter;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.PacketCodecFactory;
+import org.safehaus.asyncweb.common.DefaultHttpRequest;
+import org.safehaus.asyncweb.common.DefaultHttpResponse;
+import org.safehaus.asyncweb.common.content.StringContent;
+
+public class HTTPFilter extends WriteRequestFilter
+{
+
+ private boolean server;
+
+ public HTTPFilter(boolean server)
+ {
+ this.server = server;
+ }
+
+ @Override
+ public void messageReceived(NextFilter nextFilter, IoSession session,
+ Object message) throws Exception
+ {
+ if (server)
+ {
+ assert message instanceof DefaultHttpRequest;
+
+ DefaultHttpRequest request = (DefaultHttpRequest) message;
+
+ String s = request.getParameter(PACKET_HTTP_PARAMETER);
+ byte[] bytes = Base64.base64ToByteArray(s);
+ IoBuffer in = IoBuffer.wrap(bytes);
+
+ ProtocolCodecSession httpCodecSession = new ProtocolCodecSession();
+
+ ProtocolDecoder decoder = new PacketCodecFactory().getDecoder();
+ decoder.decode(httpCodecSession, in, httpCodecSession
+ .getDecoderOutput());
+
+ AbstractPacket packet = (AbstractPacket) httpCodecSession
+ .getDecoderOutputQueue().poll();
+
+ nextFilter.messageReceived(session, packet);
+ } else
+ {
+ assert message instanceof HttpResponseMessage;
+
+ HttpResponseMessage response = (HttpResponseMessage) message;
+ byte[] bytes = response.getContent();
+ String s = new String(bytes);
+
+ System.err.println(s);
+
+ byte[] b = Base64.base64ToByteArray(s);
+
+ IoBuffer in = IoBuffer.wrap(b);
+
+ ProtocolCodecSession httpCodecSession = new ProtocolCodecSession();
+
+ ProtocolDecoder decoder = new PacketCodecFactory().getDecoder();
+ decoder.decode(httpCodecSession, in, httpCodecSession
+ .getDecoderOutput());
+
+ AbstractPacket packet = (AbstractPacket) httpCodecSession
+ .getDecoderOutputQueue().poll();
+ nextFilter.messageReceived(session, packet);
+ }
+ }
+
+ @Override
+ protected Object doFilterWrite(NextFilter nextFilter, IoSession session,
+ WriteRequest writeRequest) throws IOException
+ {
+ assert writeRequest.getMessage() instanceof AbstractPacket;
+ AbstractPacket packet = (AbstractPacket) writeRequest.getMessage();
+
+ if (server)
+ {
+
+ ProtocolCodecSession httpCodecSession = new ProtocolCodecSession();
+ try
+ {
+ DefaultHttpResponse response = new DefaultHttpResponse();
+ ProtocolEncoder encoder = new PacketCodecFactory().getEncoder();
+ encoder.encode(httpCodecSession, packet, httpCodecSession
+ .getEncoderOutput());
+
+ IoBuffer buffer = httpCodecSession.getEncoderOutputQueue().poll();
+
+ int limit = buffer.limit();
+ byte[] bytes = new byte[limit];
+ buffer.get(bytes, 0, limit);
+ String packetAsBase64 = Base64.byteArrayToBase64(bytes);
+
+ System.err.println(packetAsBase64);
+
+ response.setContent(new StringContent(packetAsBase64));
+ response.setHeader("Content-Length", Integer
+ .toString(packetAsBase64.getBytes().length));
+ return response;
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ } else
+ {
+ ProtocolCodecSession httpCodecSession = new ProtocolCodecSession();
+ try
+ {
+ HttpRequestMessage request = new HttpRequestMessage(UUID
+ .randomUUID().toString());
+
+ ProtocolEncoder encoder = new PacketCodecFactory().getEncoder();
+ encoder.encode(httpCodecSession, packet, httpCodecSession
+ .getEncoderOutput());
+
+ IoBuffer buffer = httpCodecSession.getEncoderOutputQueue().poll();
+ int limit = buffer.limit();
+ byte[] bytes = new byte[limit];
+ buffer.get(bytes, 0, limit);
+ String packetAsBase64 = Base64.byteArrayToBase64(bytes);
+
+ request.setParameter(PACKET_HTTP_PARAMETER, packetAsBase64);
+
+ return request;
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ }
+ }
+}
Property changes on: projects/jbm-mina/src/org/jboss/messaging/remoting/transport/http/HTTPFilter.java
___________________________________________________________________
Name: svn:executable
+ *
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/AbstractPacket.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/AbstractPacket.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/AbstractPacket.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,75 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Assert.assertValidID;
+import static org.jboss.messaging.remoting.internal.Constants.NO_CALLBACK_ID;
+import static org.jboss.messaging.remoting.internal.Constants.NO_CORRELATION_ID;
+import static org.jboss.messaging.remoting.internal.Constants.NO_TARGET_ID;
+
+public abstract class AbstractPacket
+{
+ private long correlationID = NO_CORRELATION_ID;
+
+ private String targetID = NO_TARGET_ID;
+
+ private String callbackID = NO_CALLBACK_ID;
+
+ private PacketType type;
+
+ public AbstractPacket(PacketType type)
+ {
+ assert type != null;
+
+ this.type = type;
+ }
+
+ public PacketType getType()
+ {
+ return type;
+ }
+
+ public void setCorrelationID(long correlationID)
+ {
+ this.correlationID = correlationID;
+ }
+
+ public long getCorrelationID()
+ {
+ return correlationID;
+ }
+
+ public String getTargetID()
+ {
+ return targetID;
+ }
+
+ public void setTargetID(String targetID)
+ {
+ assertValidID(targetID);
+
+ this.targetID = targetID;
+ }
+
+ public void setCallbackID(String callbackID)
+ {
+ assertValidID(callbackID);
+
+ this.callbackID = callbackID;
+ }
+
+ public String getCallbackID()
+ {
+ return callbackID;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + "]";
+ }
+
+ String getParentString()
+ {
+ return "PACKET[type=" + type + ", correlationID=" + correlationID
+ + ", targetID=" + targetID + ", callabckID=" + callbackID;
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/AbstractPacketCodec.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/AbstractPacketCodec.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/AbstractPacketCodec.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,147 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Constants.LONG_LENGTH;
+import static org.jboss.messaging.remoting.internal.Constants.INT_LENGTH;
+import static org.jboss.messaging.remoting.internal.Constants.NULL_BYTE;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_ENCODER;
+
+import java.nio.charset.CharacterCodingException;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageDecoder;
+import org.apache.mina.filter.codec.demux.MessageDecoderResult;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+
+public abstract class AbstractPacketCodec<P extends AbstractPacket> implements
+ MessageEncoder<P>, MessageDecoder
+{
+ private PacketType type;
+
+ protected AbstractPacketCodec(PacketType type)
+ {
+ assert type != null;
+
+ this.type = type;
+ }
+
+ public void encode(IoSession session, P packet, ProtocolEncoderOutput out)
+ throws Exception
+ {
+ long correlationID = packet.getCorrelationID();
+ String targetID = packet.getTargetID();
+ String callbackID = packet.getCallbackID();
+
+ int headerLength = LONG_LENGTH + targetID.getBytes().length
+ + callbackID.getBytes().length + 2 /* NULL bytes */;
+
+ IoBuffer buf = IoBuffer.allocate(256);
+
+ // Enable auto-expand for easier encoding
+ buf.setAutoExpand(true);
+
+ buf.put(packet.getType().byteValue());
+ buf.putInt(headerLength);
+ buf.putLong(correlationID);
+ buf.putString(targetID, UTF_8_ENCODER);
+ buf.put(NULL_BYTE);
+ buf.putString(callbackID, UTF_8_ENCODER);
+ buf.put(NULL_BYTE);
+
+ encodeBody(session, packet, buf);
+
+ buf.flip();
+ out.write(buf);
+ }
+
+ protected abstract void encodeBody(IoSession session, P packet, IoBuffer buf)
+ throws Exception;
+
+ public MessageDecoderResult decodable(IoSession session, IoBuffer in)
+ {
+ byte t = in.get();
+ if (t != type.byteValue())
+ {
+ return NOT_OK;
+ }
+ if (in.remaining() < INT_LENGTH)
+ {
+ System.out.println("need more data to read header length");
+ // can not read next int
+ return NEED_DATA;
+ }
+ int headerLength = in.getInt();
+ if (in.remaining() < headerLength)
+ {
+ System.out.println("need more data to read header");
+ return NEED_DATA;
+ }
+ in.getLong(); // correlation ID
+ try
+ {
+ in.getString(UTF_8_DECODER);
+ } catch (CharacterCodingException e)
+ {
+ return NOT_OK;
+ }
+ try
+ {
+ in.getString(UTF_8_DECODER);
+ } catch (CharacterCodingException e)
+ {
+ return NOT_OK;
+ }
+
+ if (in.remaining() < INT_LENGTH)
+ {
+ System.out.println("need more data to read body length");
+ // can not read next int
+ return NEED_DATA;
+ }
+ int bodyLength = in.getInt();
+ if (bodyLength == 0)
+ {
+ return OK;
+ }
+ if (in.remaining() < bodyLength)
+ {
+ System.out.println("need more data to read body");
+ return NEED_DATA;
+ }
+ return OK;
+ }
+
+ public MessageDecoderResult decode(IoSession session, IoBuffer in,
+ ProtocolDecoderOutput out) throws Exception
+ {
+ in.get(); // skip message type
+ in.getInt(); // skip header length
+ long correlationID = in.getLong();
+ String targetID = in.getString(UTF_8_DECODER);
+ String callbackID = in.getString(UTF_8_DECODER);
+
+ P packet = decodeBody(session, in);
+
+ if (packet == null)
+ {
+ return MessageDecoderResult.NEED_DATA;
+ }
+ packet.setTargetID(targetID);
+ packet.setCorrelationID(correlationID);
+ packet.setCallbackID(callbackID);
+ out.write(packet);
+
+ return OK;
+ }
+
+ protected abstract P decodeBody(IoSession session, IoBuffer in)
+ throws Exception;
+
+ public void finishDecode(IoSession session, ProtocolDecoderOutput out)
+ throws Exception
+ {
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ClientDelivery.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ClientDelivery.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ClientDelivery.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,45 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import org.jboss.messaging.remoting.internal.Assert;
+
+public class ClientDelivery extends AbstractPacket
+{
+ private final String consumerID;
+
+ private final long deliveryID;
+
+ private final int deliveryCount;
+
+ public ClientDelivery(String consumerID, long deliveryID, int deliveryCount)
+ {
+ super(PacketType.CLIENT_DELIVERY);
+
+ Assert.assertValidID(consumerID);
+
+ this.consumerID = consumerID;
+ this.deliveryID = deliveryID;
+ this.deliveryCount = deliveryCount;
+ }
+
+ public String getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getDeliveryID()
+ {
+ return deliveryID;
+ }
+
+ public int getDeliveryCount()
+ {
+ return deliveryCount;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", consumerID=" + consumerID + ", deliveryID="
+ + deliveryID + ", deliveryCount=" + deliveryCount + "]";
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ClientDeliveryCodec.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ClientDeliveryCodec.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ClientDeliveryCodec.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,53 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Constants.INT_LENGTH;
+import static org.jboss.messaging.remoting.internal.Constants.LONG_LENGTH;
+import static org.jboss.messaging.remoting.internal.Constants.NULL_BYTE;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_ENCODER;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+
+public class ClientDeliveryCodec extends AbstractPacketCodec<ClientDelivery>
+{
+
+ public ClientDeliveryCodec()
+ {
+ super(PacketType.CLIENT_DELIVERY);
+ }
+
+ @Override
+ protected void encodeBody(IoSession session, ClientDelivery delivery,
+ IoBuffer out) throws Exception
+ {
+ String consumerID = delivery.getConsumerID();
+ long deliveryID = delivery.getDeliveryID();
+ int deliveryCount = delivery.getDeliveryCount();
+
+ int bodyLength = consumerID.getBytes().length + 1 /* NULL byte */
+ + LONG_LENGTH + INT_LENGTH;
+
+ out.putInt(bodyLength);
+ out.putString(consumerID, UTF_8_ENCODER);
+ out.put(NULL_BYTE);
+ out.putLong(deliveryID);
+ out.putInt(deliveryCount);
+ }
+
+ @Override
+ protected ClientDelivery decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+ String consumerID = in.getString(UTF_8_DECODER);
+ long deliveryID = in.getLong();
+ int deliveryCount = in.getInt();
+
+ return new ClientDelivery(consumerID, deliveryID, deliveryCount);
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionRequest.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionRequest.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionRequest.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,46 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.wireformat.PacketType.REQ_CONNECTION_CREATESESSION;
+
+public class ConnectionCreateSessionRequest extends AbstractPacket
+{
+
+ private final boolean transacted;
+
+ private final int acknowledgmentMode;
+
+ private final boolean xa;
+
+ public ConnectionCreateSessionRequest(boolean transacted,
+ int acknowledgementMode, boolean xa)
+ {
+ super(REQ_CONNECTION_CREATESESSION);
+
+ this.transacted = transacted;
+ this.acknowledgmentMode = acknowledgementMode;
+ this.xa = xa;
+ }
+
+ public boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ public int getAcknowledgmentMode()
+ {
+ return acknowledgmentMode;
+ }
+
+ public boolean isXa()
+ {
+ return xa;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", transacted=" + transacted
+ + ", acknowledgementMode=" + acknowledgmentMode + ", xa=" + xa
+ + "]";
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionRequestCodec.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionRequestCodec.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionRequestCodec.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,52 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Constants.FALSE;
+import static org.jboss.messaging.remoting.internal.Constants.INT_LENGTH;
+import static org.jboss.messaging.remoting.internal.Constants.TRUE;
+import static org.jboss.messaging.remoting.wireformat.PacketType.REQ_CONNECTION_CREATESESSION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+
+public class ConnectionCreateSessionRequestCodec extends
+ AbstractPacketCodec<ConnectionCreateSessionRequest>
+{
+
+ public ConnectionCreateSessionRequestCodec()
+ {
+ super(REQ_CONNECTION_CREATESESSION);
+ }
+
+ @Override
+ protected void encodeBody(IoSession session,
+ ConnectionCreateSessionRequest request, IoBuffer out) throws Exception
+ {
+ int bodyLength = 1 + INT_LENGTH + 1;
+
+ byte transacted = request.isTransacted() ? TRUE : FALSE;
+ int ackMode = request.getAcknowledgmentMode();
+ byte xa = request.isXa() ? TRUE : FALSE;
+
+ out.putInt(bodyLength);
+ out.put(transacted);
+ out.putInt(ackMode);
+ out.put(xa);
+ }
+
+ @Override
+ protected ConnectionCreateSessionRequest decodeBody(IoSession session,
+ IoBuffer in) throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+ boolean transacted = (in.get() == TRUE);
+ int ackMode = in.getInt();
+ boolean xa = (in.get() == TRUE);
+
+ return new ConnectionCreateSessionRequest(transacted, ackMode, xa);
+ }
+
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionResponse.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionResponse.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionResponse.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,39 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Assert.assertValidID;
+import static org.jboss.messaging.remoting.wireformat.PacketType.RESP_CONNECTION_CREATESESSION;
+
+public class ConnectionCreateSessionResponse extends AbstractPacket
+{
+
+ private final String id;
+
+ private final int dupsOKBatchSize;
+
+ public ConnectionCreateSessionResponse(String id, int dupsOKBatchSize)
+ {
+ super(RESP_CONNECTION_CREATESESSION);
+
+ assertValidID(id);
+
+ this.id = id;
+ this.dupsOKBatchSize = dupsOKBatchSize;
+ }
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public int getDupsOKBatchSize()
+ {
+ return dupsOKBatchSize;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", id=" + id + ", dupsOKBatchSize="
+ + dupsOKBatchSize + "]";
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionResponseCodec.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionResponseCodec.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionCreateSessionResponseCodec.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,52 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Constants.INT_LENGTH;
+import static org.jboss.messaging.remoting.internal.Constants.NULL_BYTE;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_ENCODER;
+import static org.jboss.messaging.remoting.wireformat.PacketType.RESP_CONNECTION_CREATESESSION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+
+public class ConnectionCreateSessionResponseCodec extends
+ AbstractPacketCodec<ConnectionCreateSessionResponse>
+{
+
+ public ConnectionCreateSessionResponseCodec()
+ {
+ super(RESP_CONNECTION_CREATESESSION);
+ }
+
+ @Override
+ protected void encodeBody(IoSession session,
+ ConnectionCreateSessionResponse response, IoBuffer out)
+ throws Exception
+ {
+ String id = response.getID();
+ int dupsOKBatchSize = response.getDupsOKBatchSize();
+
+ int bodyLength = id.getBytes().length + 1 /* NULL byte */+ INT_LENGTH;
+
+ out.putInt(bodyLength);
+ out.putString(id, UTF_8_ENCODER);
+ out.put(NULL_BYTE);
+ out.putInt(dupsOKBatchSize);
+ }
+
+ @Override
+ protected ConnectionCreateSessionResponse decodeBody(IoSession session,
+ IoBuffer in) throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+ String id = in.getString(UTF_8_DECODER);
+ int dupsOKBatchSize = in.getInt();
+
+ return new ConnectionCreateSessionResponse(id, dupsOKBatchSize);
+ }
+
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionRequest.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,14 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.wireformat.PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION;
+
+public class ConnectionFactoryCreateConnectionRequest extends AbstractPacket
+{
+
+ // TODO: auth credentials
+
+ public ConnectionFactoryCreateConnectionRequest()
+ {
+ super(REQ_CONNECTIONFACTORY_CREATECONNECTION);
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionRequestCodec.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionRequestCodec.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionRequestCodec.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,34 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.wireformat.PacketType.REQ_CONNECTIONFACTORY_CREATECONNECTION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+
+public class ConnectionFactoryCreateConnectionRequestCodec extends
+ AbstractPacketCodec<ConnectionFactoryCreateConnectionRequest>
+{
+
+ public ConnectionFactoryCreateConnectionRequestCodec()
+ {
+ super(REQ_CONNECTIONFACTORY_CREATECONNECTION);
+ }
+
+ @Override
+ protected void encodeBody(IoSession session,
+ ConnectionFactoryCreateConnectionRequest request, IoBuffer out)
+ throws Exception
+ {
+ // no body
+ out.putInt(0);
+ }
+
+ @Override
+ protected ConnectionFactoryCreateConnectionRequest decodeBody(
+ IoSession session, IoBuffer in) throws Exception
+ {
+ in.getInt(); // skip body length
+ return new ConnectionFactoryCreateConnectionRequest();
+ }
+
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionResponse.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,29 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Assert.assertValidID;
+import static org.jboss.messaging.remoting.wireformat.PacketType.RESP_CONNECTIONFACTORY_CREATECONNECTION;
+
+public class ConnectionFactoryCreateConnectionResponse extends AbstractPacket
+{
+ private final String id;
+
+ public ConnectionFactoryCreateConnectionResponse(String id)
+ {
+ super(RESP_CONNECTIONFACTORY_CREATECONNECTION);
+
+ assertValidID(id);
+
+ this.id = id;
+ }
+
+ public String getID()
+ {
+ return id;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", id=" + id + "]";
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionResponseCodec.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionResponseCodec.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/ConnectionFactoryCreateConnectionResponseCodec.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,48 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Constants.NULL_BYTE;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_ENCODER;
+import static org.jboss.messaging.remoting.wireformat.PacketType.RESP_CONNECTIONFACTORY_CREATECONNECTION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+
+public class ConnectionFactoryCreateConnectionResponseCodec extends
+ AbstractPacketCodec<ConnectionFactoryCreateConnectionResponse>
+{
+
+ public ConnectionFactoryCreateConnectionResponseCodec()
+ {
+ super(RESP_CONNECTIONFACTORY_CREATECONNECTION);
+ }
+
+ @Override
+ protected void encodeBody(IoSession session,
+ ConnectionFactoryCreateConnectionResponse response, IoBuffer out)
+ throws Exception
+ {
+ String id = response.getID();
+
+ int bodyLength = id.getBytes().length + 1 /* NULL byte */;
+
+ out.putInt(bodyLength);
+ out.putString(id, UTF_8_ENCODER);
+ out.put(NULL_BYTE);
+ }
+
+ @Override
+ protected ConnectionFactoryCreateConnectionResponse decodeBody(
+ IoSession session, IoBuffer in) throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+ String id = in.getString(UTF_8_DECODER);
+
+ return new ConnectionFactoryCreateConnectionResponse(id);
+ }
+
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/NullPacket.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/NullPacket.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/NullPacket.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,28 @@
+package org.jboss.messaging.remoting.wireformat;
+
+public class NullPacket extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public NullPacket()
+ {
+ super(PacketType.NULL);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/NullPacketCodec.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/NullPacketCodec.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/NullPacketCodec.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,29 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+
+public class NullPacketCodec extends AbstractPacketCodec<NullPacket>
+{
+
+ public NullPacketCodec()
+ {
+ super(PacketType.NULL);
+ }
+
+ @Override
+ protected void encodeBody(IoSession session, NullPacket packet,
+ IoBuffer out) throws Exception
+ {
+ // no body
+ out.putInt(0);
+ }
+
+ @Override
+ protected NullPacket decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ in.getInt(); // skip body length
+ return new NullPacket();
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketCodecFactory.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketCodecFactory.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketCodecFactory.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,39 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
+
+public class PacketCodecFactory extends DemuxingProtocolCodecFactory
+{
+
+ // FIXME: split encoder/decoder required only on client and/or server sides
+ public PacketCodecFactory()
+ {
+ super.addMessageDecoder(NullPacketCodec.class);
+ super.addMessageEncoder(NullPacket.class, NullPacketCodec.class);
+
+ super.addMessageDecoder(TextPacketCodec.class);
+ super.addMessageEncoder(TextPacket.class, TextPacketCodec.class);
+
+ super
+ .addMessageDecoder(ConnectionFactoryCreateConnectionRequestCodec.class);
+ super.addMessageEncoder(ConnectionFactoryCreateConnectionRequest.class,
+ ConnectionFactoryCreateConnectionRequestCodec.class);
+
+ super
+ .addMessageDecoder(ConnectionFactoryCreateConnectionResponseCodec.class);
+ super.addMessageEncoder(ConnectionFactoryCreateConnectionResponse.class,
+ ConnectionFactoryCreateConnectionResponseCodec.class);
+
+ super.addMessageDecoder(ClientDeliveryCodec.class);
+ super.addMessageEncoder(ClientDelivery.class, ClientDeliveryCodec.class);
+
+ super.addMessageDecoder(ConnectionCreateSessionRequestCodec.class);
+ super.addMessageEncoder(ConnectionCreateSessionRequest.class,
+ ConnectionCreateSessionRequestCodec.class);
+
+ super.addMessageDecoder(ConnectionCreateSessionResponseCodec.class);
+ super.addMessageEncoder(ConnectionCreateSessionResponse.class,
+ ConnectionCreateSessionResponseCodec.class);
+
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketInspector.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketInspector.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketInspector.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,35 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import org.apache.mina.filter.reqres.ResponseInspector;
+import org.apache.mina.filter.reqres.ResponseType;
+import org.jboss.messaging.remoting.internal.Constants;
+
+public class PacketInspector implements ResponseInspector
+{
+
+ public Object getRequestId(Object message)
+ {
+ if (!(message instanceof AbstractPacket))
+ {
+ return null;
+ }
+ long correlationID = ((AbstractPacket) message).getCorrelationID();
+ if (correlationID != Constants.NO_CORRELATION_ID)
+ {
+ return correlationID;
+ } else
+ {
+ return null;
+ }
+ }
+
+ public ResponseType getResponseType(Object message)
+ {
+ if (!(message instanceof AbstractPacket))
+ {
+ return null;
+ }
+
+ return ResponseType.WHOLE;
+ }
+}
\ No newline at end of file
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketType.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketType.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/PacketType.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,24 @@
+package org.jboss.messaging.remoting.wireformat;
+
+public enum PacketType
+{
+ NULL((byte)1),
+ CLIENT_DELIVERY((byte) 2),
+ TEXT((byte) 3),
+ REQ_CONNECTIONFACTORY_CREATECONNECTION((byte) 100),
+ RESP_CONNECTIONFACTORY_CREATECONNECTION((byte) 100100),
+ REQ_CONNECTION_CREATESESSION((byte) 201),
+ RESP_CONNECTION_CREATESESSION((byte) 100200);
+
+ private byte type;
+
+ PacketType(byte type)
+ {
+ this.type = type;
+ }
+
+ public byte byteValue()
+ {
+ return type;
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/TextPacket.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/TextPacket.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/TextPacket.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,26 @@
+package org.jboss.messaging.remoting.wireformat;
+
+public class TextPacket extends AbstractPacket
+{
+ private final String text;
+
+ public TextPacket(String text)
+ {
+ super(PacketType.TEXT);
+
+ assert text != null;
+
+ this.text = text;
+ }
+
+ public String getText()
+ {
+ return text;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", text=" + text + "]";
+ }
+}
Added: projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/TextPacketCodec.java
===================================================================
--- projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/TextPacketCodec.java (rev 0)
+++ projects/jbm-mina/src/org/jboss/messaging/remoting/wireformat/TextPacketCodec.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,44 @@
+package org.jboss.messaging.remoting.wireformat;
+
+import static org.jboss.messaging.remoting.internal.Constants.NULL_BYTE;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_DECODER;
+import static org.jboss.messaging.remoting.internal.Constants.UTF_8_ENCODER;
+import static org.jboss.messaging.remoting.wireformat.PacketType.TEXT;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+
+public class TextPacketCodec extends AbstractPacketCodec<TextPacket>
+{
+
+ public TextPacketCodec()
+ {
+ super(TEXT);
+ }
+
+ @Override
+ protected void encodeBody(IoSession session, TextPacket packet, IoBuffer out)
+ throws Exception
+ {
+ int bodyLength = packet.getText().getBytes().length + 1; // for NULL byte
+
+ out.putInt(bodyLength);
+ out.putString(packet.getText(), UTF_8_ENCODER);
+ out.put(NULL_BYTE);
+ }
+
+ @Override
+ protected TextPacket decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+ String text = in.getString(UTF_8_DECODER);
+
+ return new TextPacket(text);
+ }
+
+}
Added: projects/jbm-mina/test/org/jboss/messaging/remoting/HandlerTest.java
===================================================================
--- projects/jbm-mina/test/org/jboss/messaging/remoting/HandlerTest.java (rev 0)
+++ projects/jbm-mina/test/org/jboss/messaging/remoting/HandlerTest.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,86 @@
+package org.jboss.messaging.remoting;
+
+import static org.jboss.messaging.remoting.internal.Constants.PORT;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.TextPacket;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HandlerTest extends TestSupport
+{
+ private final class TestClientHandler implements PacketHandler
+ {
+ private final List<TextPacket> packets;
+ private final String id = UUID.randomUUID().toString();
+
+ private TestClientHandler()
+ {
+ packets = new ArrayList<TextPacket>();
+ }
+
+ public void handle(AbstractPacket packet)
+ {
+ packets.add((TextPacket) packet);
+ }
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public List<TextPacket> getPackets()
+ {
+ return packets;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ startServer(PORT, TRANSPORT);
+ startClient(PORT, TRANSPORT);
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ clientDispatcher.disconnect();
+ serverAcceptor.unbind();
+ }
+
+ @Test
+ public void testHandlerPacketSentByServer() throws Exception
+ {
+ TestClientHandler targetHandler = new TestClientHandler();
+
+ clientDispatcher.register(targetHandler);
+
+ TextPacket packet = new TextPacket(
+ "testHandlerPacketSentByServer from client");
+ // send a packet to create the IoSession on the server
+ clientDispatcher.sendOneWay(packet);
+
+ Thread.sleep(300);
+
+ assertEquals(1, serverHandler.getSessions().size());
+ IoSession serverSession = serverHandler.getSessions().get(0);
+ packet = new TextPacket("testHandlerPacketSentByServer from server");
+ packet.setTargetID(targetHandler.getID());
+ serverSession.write(packet);
+
+ Thread.sleep(300);
+
+ List<TextPacket> packets = targetHandler.getPackets();
+ assertEquals(1, packets.size());
+ String response = ((TextPacket) packets.get(0)).getText();
+ assertEquals(packet.getText(), response);
+ }
+}
Added: projects/jbm-mina/test/org/jboss/messaging/remoting/PacketTypeTest.java
===================================================================
--- projects/jbm-mina/test/org/jboss/messaging/remoting/PacketTypeTest.java (rev 0)
+++ projects/jbm-mina/test/org/jboss/messaging/remoting/PacketTypeTest.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,140 @@
+package org.jboss.messaging.remoting;
+
+import static org.jboss.messaging.remoting.internal.Constants.PORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.ClientDelivery;
+import org.jboss.messaging.remoting.wireformat.ConnectionCreateSessionRequest;
+import org.jboss.messaging.remoting.wireformat.ConnectionCreateSessionResponse;
+import org.jboss.messaging.remoting.wireformat.ConnectionFactoryCreateConnectionRequest;
+import org.jboss.messaging.remoting.wireformat.ConnectionFactoryCreateConnectionResponse;
+import org.jboss.messaging.remoting.wireformat.NullPacket;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PacketTypeTest extends TestSupport
+{
+ @Before
+ public void setUp() throws Exception
+ {
+ startServer(PORT, TRANSPORT);
+ startClient(PORT, TRANSPORT);
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ clientDispatcher.disconnect();
+ serverAcceptor.unbind();
+ }
+
+ @Test
+ public void testNullPacket() throws Exception
+ {
+ NullPacket packet = new NullPacket();
+ clientDispatcher.sendOneWay(packet);
+
+ Thread.sleep(200);
+
+ List<AbstractPacket> packets = serverHandler.getPackets();
+ assertEquals(1, packets.size());
+ AbstractPacket p = packets.get(0);
+ assertTrue(p instanceof NullPacket);
+ }
+
+ @Test
+ public void testConnectionFactoryCreateConnectionRequest() throws Exception
+ {
+ ConnectionFactoryCreateConnectionRequest request = new ConnectionFactoryCreateConnectionRequest();
+ clientDispatcher.sendOneWay(request);
+
+ Thread.sleep(200);
+
+ List<AbstractPacket> packets = serverHandler.getPackets();
+ assertEquals(1, packets.size());
+ AbstractPacket packet = packets.get(0);
+ assertTrue(packet instanceof ConnectionFactoryCreateConnectionRequest);
+ }
+
+ @Test
+ public void testConnectionFactoryCreateConnectionResponse() throws Exception
+ {
+ ConnectionFactoryCreateConnectionResponse response = new ConnectionFactoryCreateConnectionResponse(
+ UUID.randomUUID().toString());
+ clientDispatcher.sendOneWay(response);
+
+ Thread.sleep(200);
+
+ List<AbstractPacket> packets = serverHandler.getPackets();
+ assertEquals(1, packets.size());
+ AbstractPacket packet = packets.get(0);
+ assertTrue(packet instanceof ConnectionFactoryCreateConnectionResponse);
+ ConnectionFactoryCreateConnectionResponse receivedResponse = (ConnectionFactoryCreateConnectionResponse) packet;
+ assertEquals(response.getID(), receivedResponse.getID());
+ }
+
+ @Test
+ public void testConnectionCreateSessionRequest() throws Exception
+ {
+ ConnectionCreateSessionRequest request = new ConnectionCreateSessionRequest(
+ true, 1, false);
+ clientDispatcher.sendOneWay(request);
+
+ Thread.sleep(200);
+
+ List<AbstractPacket> packets = serverHandler.getPackets();
+ assertEquals(1, packets.size());
+ AbstractPacket packet = packets.get(0);
+ assertTrue(packet instanceof ConnectionCreateSessionRequest);
+ ConnectionCreateSessionRequest receivedRequest = (ConnectionCreateSessionRequest) packet;
+ assertEquals(request.isTransacted(), receivedRequest.isTransacted());
+ assertEquals(request.getAcknowledgmentMode(), receivedRequest
+ .getAcknowledgmentMode());
+ assertEquals(request.isXa(), receivedRequest.isXa());
+ }
+
+ @Test
+ public void testConnectionCreateSessionResponse() throws Exception
+ {
+ ConnectionCreateSessionResponse response = new ConnectionCreateSessionResponse(
+ "testSendConnectionCreateSessionResponse", 1234);
+ clientDispatcher.sendOneWay(response);
+
+ Thread.sleep(200);
+
+ List<AbstractPacket> packets = serverHandler.getPackets();
+ assertEquals(1, packets.size());
+ AbstractPacket packet = packets.get(0);
+ assertTrue(packet instanceof ConnectionCreateSessionResponse);
+ ConnectionCreateSessionResponse receivedResponse = (ConnectionCreateSessionResponse) packet;
+ assertEquals(response.getID(), receivedResponse.getID());
+ assertEquals(response.getDupsOKBatchSize(), receivedResponse
+ .getDupsOKBatchSize());
+ }
+
+ @Test
+ public void testClientDelivery() throws Exception
+ {
+ ClientDelivery delivery = new ClientDelivery("testSendClientDelivery",
+ System.currentTimeMillis(), 23);
+ clientDispatcher.sendOneWay(delivery);
+
+ Thread.sleep(300);
+
+ List<AbstractPacket> packets = serverHandler.getPackets();
+ assertEquals(1, packets.size());
+ AbstractPacket packet = packets.get(0);
+ assertTrue(packet instanceof ClientDelivery);
+ ClientDelivery receivedDelivery = (ClientDelivery) packet;
+ assertEquals(delivery.getConsumerID(), receivedDelivery.getConsumerID());
+ assertEquals(delivery.getDeliveryID(), receivedDelivery.getDeliveryID());
+ assertEquals(delivery.getDeliveryCount(), receivedDelivery
+ .getDeliveryCount());
+ }
+}
Added: projects/jbm-mina/test/org/jboss/messaging/remoting/SendPacketTest.java
===================================================================
--- projects/jbm-mina/test/org/jboss/messaging/remoting/SendPacketTest.java (rev 0)
+++ projects/jbm-mina/test/org/jboss/messaging/remoting/SendPacketTest.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,148 @@
+package org.jboss.messaging.remoting;
+
+import static org.jboss.messaging.remoting.internal.Constants.CONNECTION_TIMEOUT;
+import static org.jboss.messaging.remoting.internal.Constants.PORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.mina.filter.reqres.RequestTimeoutException;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.TextPacket;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SendPacketTest extends TestSupport
+{
+ private final class TestClientHandler implements PacketHandler
+ {
+ private final List<TextPacket> packets;
+ private final String id = UUID.randomUUID().toString();
+
+ private TestClientHandler()
+ {
+ packets = new ArrayList<TextPacket>();
+ }
+
+ public void handle(AbstractPacket packet)
+ {
+ packets.add((TextPacket) packet);
+ }
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public List<TextPacket> getPackets()
+ {
+ return packets;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ startServer(PORT, TRANSPORT);
+ startClient(PORT, TRANSPORT);
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ clientDispatcher.disconnect();
+ serverAcceptor.unbind();
+ }
+
+ @Test
+ public void testSendOneWay() throws Exception
+ {
+ TextPacket packet = new TextPacket("testOneWayPacket");
+ clientDispatcher.sendOneWay(packet);
+
+ Thread.sleep(300);
+
+ List<AbstractPacket> messages = serverHandler.getPackets();
+ assertEquals(1, messages.size());
+ String response = ((TextPacket) messages.get(0)).getText();
+ assertEquals(packet.getText(), response);
+ }
+
+ @Test
+ public void testSendManyOneWay() throws Exception
+ {
+ final int NUM_OF_MESSAGES = 500;
+
+ for (int i = 0; i < NUM_OF_MESSAGES; i++)
+ {
+ TextPacket packet = new TextPacket("testSendManyOneWay " + i);
+ clientDispatcher.sendOneWay(packet);
+ }
+
+ List<AbstractPacket> messages = serverHandler.getPackets();
+ // unlikely that all messages have been consumed
+ assertNotSame(NUM_OF_MESSAGES, messages.size());
+
+ Thread.sleep(2 * 1000);
+
+ assertEquals(NUM_OF_MESSAGES, messages.size());
+ for (int i = 0; i < NUM_OF_MESSAGES; i++)
+ {
+ assertEquals("testSendManyOneWay " + i, ((TextPacket) messages.get(i)).getText());
+ }
+ }
+
+ @Test
+ public void testSendOneWayWithCallbackHandler() throws Exception
+ {
+ serverHandler.setSleepTime(300);
+
+ TestClientHandler callbackHandler = new TestClientHandler();
+
+ TextPacket packet = new TextPacket("testSendOneWayWithHandler");
+
+ clientDispatcher.sendOneWay(packet, callbackHandler);
+ assertEquals(0, callbackHandler.getPackets().size());
+
+ Thread.sleep(600);
+
+ assertEquals(1, callbackHandler.getPackets().size());
+ String response = callbackHandler.getPackets().get(0).getText();
+ assertEquals(TestSupport.reverse(packet.getText()), response);
+ }
+
+ @Test
+ public void testSendBlocking() throws Exception
+ {
+ TextPacket packet = new TextPacket("testSendBlocking");
+
+ AbstractPacket response = clientDispatcher.sendBlocking(packet);
+
+ Assert.assertNotNull(response);
+ Assert.assertTrue(response instanceof TextPacket);
+ Assert.assertEquals(TestSupport.reverse(packet.getText()), ((TextPacket) response)
+ .getText());
+ }
+
+ @Test
+ public void testSendBlockingWithTimeout() throws Exception
+ {
+ serverHandler.setSleepTime(CONNECTION_TIMEOUT + 2000);
+
+ AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
+
+ try
+ {
+ clientDispatcher.sendBlocking(packet);
+ fail("a RequestTimeoutException should be thrown");
+ } catch (RequestTimeoutException e)
+ {
+ }
+ }
+}
Added: projects/jbm-mina/test/org/jboss/messaging/remoting/TestServerHandler.java
===================================================================
--- projects/jbm-mina/test/org/jboss/messaging/remoting/TestServerHandler.java (rev 0)
+++ projects/jbm-mina/test/org/jboss/messaging/remoting/TestServerHandler.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,92 @@
+package org.jboss.messaging.remoting;
+
+import static org.jboss.messaging.remoting.TestSupport.reverse;
+import static org.jboss.messaging.remoting.internal.Constants.NO_CALLBACK_ID;
+import static org.jboss.messaging.remoting.internal.Constants.NO_CORRELATION_ID;
+import static org.jboss.messaging.remoting.internal.TransportType.HTTP;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.NullPacket;
+import org.jboss.messaging.remoting.wireformat.TextPacket;
+
+public class TestServerHandler extends IoHandlerAdapter
+{
+ private List<AbstractPacket> packets = new ArrayList<AbstractPacket>();
+ private long sleepTime = 0;
+ private List<IoSession> sessions = new ArrayList<IoSession>();
+
+ public void setSleepTime(long sleepTime)
+ {
+ this.sleepTime = sleepTime;
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object msg)
+ {
+ packets.add((AbstractPacket) msg);
+
+ // TODO put this logic in the real server handler
+ if (((AbstractPacket) msg).getCorrelationID() == NO_CORRELATION_ID
+ && TestSupport.TRANSPORT == HTTP)
+ {
+ session.write(new NullPacket());
+ }
+
+ if (msg instanceof TextPacket)
+ {
+ TextPacket incomingPacket = (TextPacket) msg;
+ if (mustReply(incomingPacket))
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ TextPacket p = new TextPacket(reverse(incomingPacket.getText()));
+ p.setCorrelationID(incomingPacket.getCorrelationID());
+ if (!NO_CALLBACK_ID.equals(incomingPacket.getCallbackID()))
+ {
+ p.setTargetID(incomingPacket.getCallbackID());
+ }
+ session.write(p);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private boolean mustReply(AbstractPacket incomingPacket)
+ {
+ boolean mustReply = (incomingPacket.getCorrelationID() != NO_CORRELATION_ID)
+ || (!NO_CALLBACK_ID.equals(incomingPacket.getCallbackID()));
+ return mustReply;
+ }
+
+ List<AbstractPacket> getPackets()
+ {
+ return packets;
+ }
+
+ @Override
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ sessions.add(session);
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ super.sessionClosed(session);
+ sessions.remove(session);
+ }
+
+ public List<IoSession> getSessions()
+ {
+ return sessions;
+ }
+}
\ No newline at end of file
Added: projects/jbm-mina/test/org/jboss/messaging/remoting/TestSupport.java
===================================================================
--- projects/jbm-mina/test/org/jboss/messaging/remoting/TestSupport.java (rev 0)
+++ projects/jbm-mina/test/org/jboss/messaging/remoting/TestSupport.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,104 @@
+package org.jboss.messaging.remoting;
+
+import static org.jboss.messaging.remoting.internal.TransportType.HTTP;
+import static org.jboss.messaging.remoting.internal.TransportType.TCP;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.messaging.remoting.internal.TransportType;
+import org.jboss.messaging.remoting.transport.http.HTTPFilter;
+import org.jboss.messaging.remoting.wireformat.PacketCodecFactory;
+import org.safehaus.asyncweb.codec.HttpServerCodecFactory;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class TestSupport
+{
+ // Constants -----------------------------------------------------
+
+ /**
+ * Configurable by system property <code>transport.type</code>, default is TCP
+ */
+ public final static TransportType TRANSPORT;
+
+ // Attributes ----------------------------------------------------
+
+ NioSocketAcceptor serverAcceptor;
+
+ TestServerHandler serverHandler;
+
+ RemoteDispatcher clientDispatcher;
+
+ // Static --------------------------------------------------------
+
+ static
+ {
+ String transportType = System.getProperty("transport.type", TCP.toString());
+ TRANSPORT = TransportType.valueOf(transportType);
+ System.out.println("Test using transport type: " + TRANSPORT);
+ }
+
+ static String reverse(String text)
+ {
+ // Reverse text
+ StringBuffer buf = new StringBuffer(text.length());
+ for (int i = text.length() - 1; i >= 0; i--)
+ {
+ buf.append(text.charAt(i));
+ }
+ return buf.toString();
+ }
+
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ void startServer(int port, TransportType transport) throws IOException
+ {
+ serverAcceptor = new NioSocketAcceptor();
+ serverHandler = new TestServerHandler();
+
+ MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+ serverAcceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
+ if (transport == TCP)
+ {
+ serverAcceptor.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
+ } else
+ {
+ assert transport == HTTP;
+
+ serverAcceptor.getFilterChain().addLast("http_codec",
+ new ProtocolCodecFilter(new HttpServerCodecFactory()));
+ serverAcceptor.getFilterChain().addLast("http_filter",
+ new HTTPFilter(true));
+ }
+ serverAcceptor.getFilterChain().addLast("logger", new LoggingFilter());
+
+ // Bind
+ serverAcceptor.setLocalAddress(new InetSocketAddress(port));
+ serverAcceptor.setHandler(serverHandler);
+ serverAcceptor.bind();
+ }
+
+ void startClient(int port, TransportType transport) throws Exception
+ {
+ clientDispatcher = RemoteDispatcherFactory.newDispatcher();
+ clientDispatcher.connect(port, transport);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: projects/jbm-mina/test/org/jboss/messaging/remoting/TransportTest.java
===================================================================
--- projects/jbm-mina/test/org/jboss/messaging/remoting/TransportTest.java (rev 0)
+++ projects/jbm-mina/test/org/jboss/messaging/remoting/TransportTest.java 2007-11-09 16:09:12 UTC (rev 3303)
@@ -0,0 +1,59 @@
+package org.jboss.messaging.remoting;
+
+import static org.jboss.messaging.remoting.internal.Constants.PORT;
+import static org.jboss.messaging.remoting.internal.TransportType.HTTP;
+import static org.jboss.messaging.remoting.internal.TransportType.TCP;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.jboss.messaging.remoting.internal.TransportType;
+import org.jboss.messaging.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.remoting.wireformat.TextPacket;
+import org.junit.After;
+import org.junit.Test;
+
+public class TransportTest extends TestSupport
+{
+
+ public void setUp(TransportType transport) throws Exception
+ {
+ startServer(PORT, transport);
+ startClient(PORT, transport);
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ serverAcceptor.unbind();
+ clientDispatcher.disconnect();
+ }
+
+ public void sendUsingTransport(TransportType transport, String text)
+ throws Exception
+ {
+ setUp(transport);
+
+ TextPacket packet = new TextPacket(text);
+ clientDispatcher.sendOneWay(packet);
+
+ Thread.sleep(300);
+
+ List<AbstractPacket> messages = serverHandler.getPackets();
+ assertEquals(1, messages.size());
+ String response = ((TextPacket) messages.get(0)).getText();
+ assertEquals(packet.getText(), response);
+ }
+
+ @Test
+ public void testHTTP() throws Exception
+ {
+ sendUsingTransport(HTTP, "testHTTP");
+ }
+
+ @Test
+ public void testTCP() throws Exception
+ {
+ sendUsingTransport(TCP, "testTCP");
+ }
+}
More information about the jboss-cvs-commits
mailing list