[jboss-cvs] JBoss Messaging SVN: r3398 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/messaging/core/remoting/codec and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 3 09:33:46 EST 2007
Author: jmesnil
Date: 2007-12-03 09:33:46 -0500 (Mon, 03 Dec 2007)
New Revision: 3398
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java
Removed:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DecoderStatus.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RemotingBuffer.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544 Replace client-server transport with NIO based transport
* refactoring
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -26,11 +26,10 @@
import org.apache.mina.filter.reqres.RequestResponseFilter;
import org.apache.mina.filter.reqres.RequestTimeoutException;
import org.apache.mina.filter.reqres.Response;
-import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
-import org.jboss.messaging.core.remoting.internal.ClientHandler;
-import org.jboss.messaging.core.remoting.internal.PacketInspector;
+import org.jboss.messaging.core.remoting.internal.MinaHandler;
+import org.jboss.messaging.core.remoting.internal.MinaInspector;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
/**
@@ -43,7 +42,6 @@
// Attributes ----------------------------------------------------
private IoSession session;
- private SslFilter sslFilter;
// By default, a blocking request will timeout after 5 seconds
private int blockingRequestTimeout = 5;
@@ -86,7 +84,7 @@
connector.getFilterChain().addLast("logger", new LoggingFilter());
- connector.setHandler(new ClientHandler());
+ connector.setHandler(new MinaHandler(PacketDispatcher.client));
connector.getSessionConfig().setKeepAlive(true);
connector.getSessionConfig().setReuseAddress(true);
InetSocketAddress address = new InetSocketAddress(host, port);
@@ -110,7 +108,7 @@
assert connector != null;
assert blockingScheduler != null;
-
+
CloseFuture closeFuture = session.close().awaitUninterruptibly();
boolean closed = closeFuture.isClosed();
@@ -128,7 +126,7 @@
}
return Long.toString(session.getId());
}
-
+
public void sendOneWay(AbstractPacket packet)
{
assert packet != null;
@@ -183,20 +181,10 @@
{
blockingScheduler = Executors.newScheduledThreadPool(1);
RequestResponseFilter filter = new RequestResponseFilter(
- new PacketInspector(), blockingScheduler);
+ new MinaInspector(), blockingScheduler);
chain.addLast("reqres", filter);
}
- private void addSSLSupport(DefaultIoFilterChainBuilder chain)
- throws Exception
- {
- // TODO support SSL
- // this.sslFilter = new
- // SslFilter(BogusSslContextFactory.getInstance(false));
- // sslFilter.setUseClientMode(true);
- // chain.addLast("sslFilter", sslFilter);
- }
-
private void checkConnected()
{
if (session == null)
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -11,6 +11,9 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
*
@@ -18,7 +21,10 @@
*/
public class PacketDispatcher
{
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PacketDispatcher.class);
+
// Attributes ----------------------------------------------------
private Map<String, PacketHandler> handlers;
@@ -42,6 +48,11 @@
assertValidID(handler.getID());
assert handler != null;
+ if (log.isDebugEnabled())
+ {
+ log.debug("register " + handler);
+ }
+
handlers.put(handler.getID(), handler);
}
@@ -50,6 +61,11 @@
assertValidID(handlerID);
handlers.remove(handlerID);
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("unregister handler for " + handlerID);
+ }
}
public PacketHandler getHandler(String handlerID)
@@ -58,7 +74,23 @@
return handlers.get(handlerID);
}
+
+ public void dispatch(AbstractPacket packet, PacketSender sender)
+ {
+ String targetID = packet.getTargetID();
+ PacketHandler handler = getHandler(targetID);
+ if (handler != null)
+ {
+ if (log.isTraceEnabled())
+ log.trace(handler + " handles " + packet);
+ handler.handle(packet, sender);
+ } else
+ {
+ log.warn("Unhandled packet " + packet);
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AbstractPacketCodec.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -30,11 +30,11 @@
{
// Constants -----------------------------------------------------
- protected static final int INT_LENGTH = 4;
+ public static final int INT_LENGTH = 4;
- protected static final int FLOAT_LENGTH = 4;
+ public static final int FLOAT_LENGTH = 4;
- protected static final int LONG_LENGTH = 8;
+ public static final int LONG_LENGTH = 8;
// Attributes ----------------------------------------------------
Deleted: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,75 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.codec;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
-
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
-import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- */
-public class AddCallbackMessageCodec extends
- AbstractPacketCodec<UpdateCallbackMessage>
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public AddCallbackMessageCodec()
- {
- super(MSG_UPDATECALLBACK);
- }
-
- // AbstractPackedCodec overrides----------------------------------
-
- @Override
- protected void encodeBody(UpdateCallbackMessage message, RemotingBuffer out) throws Exception
- {
- String remotingSessionID = message.getRemotingSessionID();
- String clientVMID = message.getClientVMID();
- boolean add = message.isAdd();
-
- int bodyLength = sizeof(remotingSessionID) + sizeof(clientVMID) + 1;
-
- out.putInt(bodyLength);
- out.putNullableString(remotingSessionID);
- out.putNullableString(clientVMID);
- out.putBoolean(add);
- }
-
- @Override
- protected UpdateCallbackMessage decodeBody(RemotingBuffer in)
- throws Exception
- {
- int bodyLength = in.getInt();
- if (in.remaining() < bodyLength)
- {
- return null;
- }
- String remotingSessionID = MinaPacketCodec.getString(in);
- String clientVMID = MinaPacketCodec.getString(in);
- boolean add = in.getBoolean();
-
- return new UpdateCallbackMessage(remotingSessionID, clientVMID, add);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DecoderStatus.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DecoderStatus.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DecoderStatus.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,7 +6,6 @@
*/
package org.jboss.messaging.core.remoting.codec;
-import org.apache.mina.filter.codec.demux.MessageDecoder;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -103,7 +103,7 @@
addCodec(GetTopologyResponse.class, GetTopologyResponseCodec.class);
- addCodec(UpdateCallbackMessage.class, AddCallbackMessageCodec.class);
+ addCodec(UpdateCallbackMessage.class, UpdateCallbackMessageCodec.class);
addCodec(CreateSessionRequest.class, CreateSessionRequestCodec.class);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RemotingBuffer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RemotingBuffer.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RemotingBuffer.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -7,8 +7,6 @@
package org.jboss.messaging.core.remoting.codec;
import java.nio.charset.CharacterCodingException;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
Copied: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java (from rev 3389, branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UpdateCallbackMessageCodec.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+
+import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class UpdateCallbackMessageCodec extends
+ AbstractPacketCodec<UpdateCallbackMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public UpdateCallbackMessageCodec()
+ {
+ super(MSG_UPDATECALLBACK);
+ }
+
+ // AbstractPackedCodec overrides----------------------------------
+
+ @Override
+ protected void encodeBody(UpdateCallbackMessage message, RemotingBuffer out) throws Exception
+ {
+ String remotingSessionID = message.getRemotingSessionID();
+ String clientVMID = message.getClientVMID();
+ boolean add = message.isAdd();
+
+ int bodyLength = sizeof(remotingSessionID) + sizeof(clientVMID) + 1;
+
+ out.putInt(bodyLength);
+ out.putNullableString(remotingSessionID);
+ out.putNullableString(clientVMID);
+ out.putBoolean(add);
+ }
+
+ @Override
+ protected UpdateCallbackMessage decodeBody(RemotingBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+ String remotingSessionID = MinaPacketCodec.getString(in);
+ String clientVMID = MinaPacketCodec.getString(in);
+ boolean add = in.getBoolean();
+
+ return new UpdateCallbackMessage(remotingSessionID, clientVMID, add);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.internal;
-
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.reqres.Response;
-import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class ClientHandler extends IoHandlerAdapter
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ClientHandler.class);
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // IoHandlerAdapter overrides ------------------------------------
-
- @Override
- public void messageReceived(final IoSession session, Object message)
- throws Exception
- {
- if (message instanceof AbstractPacket)
- {
- AbstractPacket packet = (AbstractPacket) message;
-
- if (log.isTraceEnabled())
- log.trace("received packet " + packet);
-
- String targetID = packet.getTargetID();
- PacketHandler handler = PacketDispatcher.client.getHandler(targetID);
- if (handler != null)
- {
- if (log.isTraceEnabled())
- log.trace(handler + " handles " + packet);
-
- handler.handle(packet, new PacketSender() {
- public void send(AbstractPacket p)
- {
- session.write(p);
- }
-
- });
- } else
- {
- log.error("unhandled packet " + packet);
- }
- } else if (message instanceof Response)
- {
- // response is handled by the reqres filter.
- // do nothing
- } else
- {
- log.error("unhandled remote message " + message);
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaHandler.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.reqres.Response;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaHandler extends IoHandlerAdapter
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(MinaHandler.class);
+
+ // Attributes ----------------------------------------------------
+
+ private final PacketDispatcher dispatcher;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public MinaHandler(PacketDispatcher dispatcher)
+ {
+ this.dispatcher = dispatcher;
+ }
+
+ // Public --------------------------------------------------------
+
+ // IoHandlerAdapter overrides ------------------------------------
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause)
+ throws Exception
+ {
+ session.close();
+ }
+
+ @Override
+ public void messageReceived(final IoSession session, Object message)
+ throws Exception
+ {
+ if (message instanceof Response)
+ {
+ log.trace("received response " + message);
+ // response is handled by the reqres filter.
+ // do nothing
+ return;
+ }
+
+ if (!(message instanceof AbstractPacket))
+ {
+ throw new IllegalArgumentException("Unknown message type: " + message);
+ }
+
+ AbstractPacket packet = (AbstractPacket) message;
+ PacketSender sender = new PacketSender()
+ {
+ public void send(AbstractPacket p)
+ {
+ session.write(p);
+ }
+ };
+
+ if (log.isTraceEnabled())
+ log.trace("received packet " + packet);
+
+ dispatcher.dispatch(packet, sender);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Copied: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java (from rev 3389, branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaInspector.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
+
+import org.apache.mina.filter.reqres.ResponseInspector;
+import org.apache.mina.filter.reqres.ResponseType;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class MinaInspector implements ResponseInspector
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // ResponseInspector implementation ------------------------------
+
+ public Object getRequestId(Object message)
+ {
+ if (!(message instanceof AbstractPacket))
+ {
+ return null;
+ }
+ AbstractPacket packet = (AbstractPacket) message;
+ if (packet.isRequest())
+ {
+ return packet.getCorrelationID();
+ } else
+ {
+ return null;
+ }
+ }
+
+ public ResponseType getResponseType(Object message)
+ {
+ if (!(message instanceof AbstractPacket))
+ {
+ return null;
+ }
+
+ return WHOLE;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaPacketCodec.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -59,12 +59,6 @@
private final AbstractPacketCodec<P> codec;
- public MinaPacketCodec(Class<? extends AbstractPacketCodec<P>> codecClass)
- throws Throwable
- {
- this.codec = (AbstractPacketCodec<P>) codecClass.newInstance();
- }
-
public MinaPacketCodec(AbstractPacketCodec<P> codec)
{
this.codec = codec;
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/MinaService.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.internal;
+
+import java.net.InetSocketAddress;
+import java.util.Formatter;
+
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.filter.logging.MdcInjectionFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaService
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(MinaService.class);
+
+ // Attributes ----------------------------------------------------
+
+ private int port;
+
+ private NioSocketAcceptor acceptor;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+
+ public MinaService()
+ {
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public void start() throws Exception
+ {
+ if (acceptor == null)
+ {
+ info("Starting MINA on port " + port);
+
+ acceptor = new NioSocketAcceptor();
+
+ // Prepare the configuration
+ MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+ acceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
+ acceptor.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
+ acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+
+ // Bind
+ acceptor.setLocalAddress(new InetSocketAddress("127.0.0.1", port));
+ acceptor.setReuseAddress(true);
+ acceptor.getSessionConfig().setReuseAddress(true);
+ acceptor.getSessionConfig().setKeepAlive(true);
+ acceptor.setDisconnectOnUnbind(false);
+
+ acceptor.setHandler(new MinaHandler(PacketDispatcher.server));
+ acceptor.bind();
+
+ info("MINA started");
+ }
+ }
+
+
+ public void stop()
+ {
+ if (acceptor != null)
+ {
+ acceptor.unbind();
+ acceptor.dispose();
+ acceptor = null;
+
+ info("Stopped MINA ");
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+
+ private void info(String s)
+ {
+ log.info(new Formatter().format("### %-30s ###", s).toString());
+ }
+
+ // Inner classes -------------------------------------------------
+}
Deleted: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/PacketInspector.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,69 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.internal;
-
-import static org.apache.mina.filter.reqres.ResponseType.WHOLE;
-import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
-
-import org.apache.mina.filter.reqres.ResponseInspector;
-import org.apache.mina.filter.reqres.ResponseType;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class PacketInspector implements ResponseInspector
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // ResponseInspector implementation ------------------------------
-
- public Object getRequestId(Object message)
- {
- if (!(message instanceof AbstractPacket))
- {
- return null;
- }
- long correlationID = ((AbstractPacket) message).getCorrelationID();
- if (correlationID != NO_CORRELATION_ID)
- {
- return correlationID;
- } else
- {
- return null;
- }
- }
-
- public ResponseType getResponseType(Object message)
- {
- if (!(message instanceof AbstractPacket))
- {
- return null;
- }
-
- return WHOLE;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
\ No newline at end of file
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -105,6 +105,14 @@
setTargetID(other.getCallbackID());
}
+ /**
+ * An AbstractPacket is a request if it has a target ID and a correlation ID
+ */
+ public boolean isRequest()
+ {
+ return targetID != NO_ID_SET && correlationID != NO_CORRELATION_ID;
+ }
+
@Override
public String toString()
{
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -7,7 +7,6 @@
package org.jboss.test.messaging.core.remoting;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.List;
import java.util.concurrent.TimeoutException;
@@ -25,6 +24,8 @@
*/
public class RemoteDispatcherTest extends TestSupport
{
+ private ReversePacketHandler serverPacketHandler;
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -34,7 +35,7 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
public void testCanNotSendPacketIfNotConnected() throws Exception
{
Client client = new Client();
@@ -51,15 +52,16 @@
public void testSendOneWay() throws Exception
{
- serverHandler.expectMessage(1);
+ serverPacketHandler.expectMessage(1);
TextPacket packet = new TextPacket("testSendOneWay");
packet.setVersion((byte)1);
+ packet.setTargetID(serverPacketHandler.getID());
client.sendOneWay(packet);
- serverHandler.await();
+ serverPacketHandler.await();
- List<AbstractPacket> messages = serverHandler.getPackets();
+ List<TextPacket> messages = serverPacketHandler.getPackets();
assertEquals(1, messages.size());
String response = ((TextPacket) messages.get(0)).getText();
assertEquals(packet.getText(), response);
@@ -67,19 +69,20 @@
public void testSendManyOneWay() throws Exception
{
- serverHandler.expectMessage(MANY_MESSAGES);
+ serverPacketHandler.expectMessage(MANY_MESSAGES);
TextPacket[] packets = new TextPacket[MANY_MESSAGES];
for (int i = 0; i < MANY_MESSAGES; i++)
{
packets[i] = new TextPacket("testSendManyOneWay " + i);
packets[i].setVersion((byte)1);
+ packets[i].setTargetID(serverPacketHandler.getID());
client.sendOneWay(packets[i]);
}
- serverHandler.await();
+ serverPacketHandler.await();
- List<AbstractPacket> receivedPackets = serverHandler.getPackets();
+ List<TextPacket> receivedPackets = serverPacketHandler.getPackets();
assertEquals(MANY_MESSAGES, receivedPackets.size());
for (int i = 0; i < MANY_MESSAGES; i++)
{
@@ -89,10 +92,7 @@
}
public void testSendOneWayWithCallbackHandler() throws Exception
- {
- // add some lag
- serverHandler.setSleepTime(300, MILLISECONDS);
-
+ {
TestPacketHandler callbackHandler = new TestPacketHandler();
callbackHandler.expectMessage(1);
@@ -100,6 +100,7 @@
TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
packet.setVersion((byte)1);
+ packet.setTargetID(serverPacketHandler.getID());
packet.setCallbackID(callbackHandler.getID());
client.sendOneWay(packet);
@@ -115,7 +116,8 @@
{
TextPacket request = new TextPacket("testSendBlocking");
request.setVersion((byte)1);
-
+ request.setTargetID(serverPacketHandler.getID());
+
AbstractPacket receivedPacket = client.sendBlocking(request);
assertNotNull(receivedPacket);
@@ -126,8 +128,8 @@
public void testSendBlockingWithTimeout() throws Exception
{
- client.setBlockingRequestTimeout(1, SECONDS);
- serverHandler.setSleepTime(2, SECONDS);
+ client.setBlockingRequestTimeout(500, MILLISECONDS);
+ serverPacketHandler.setSleepTime(1000, MILLISECONDS);
AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
packet.setVersion((byte)1);
@@ -148,12 +150,17 @@
{
startServer(TestSupport.PORT, TRANSPORT);
startClient(TestSupport.PORT, TRANSPORT);
+
+ serverPacketHandler = new ReversePacketHandler();
+ PacketDispatcher.server.register(serverPacketHandler);
}
@Override
protected void tearDown() throws Exception
{
+ PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
client.disconnect();
- serverAcceptor.unbind();
+ stopServer();
}
}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,7 +6,14 @@
*/
package org.jboss.test.messaging.core.remoting;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static junit.framework.Assert.fail;
+import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
+import static org.jboss.messaging.core.remoting.Constants.NO_ID_SET;
import static org.jboss.test.messaging.core.remoting.TestSupport.reverse;
+
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
import org.jboss.messaging.core.remoting.PacketSender;
@@ -15,9 +22,9 @@
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
+ *
* @version <tt>$Revision$</tt>
- *
+ *
*/
public class ReversePacketHandler extends TestPacketHandler
{
@@ -25,26 +32,57 @@
// Attributes ----------------------------------------------------
+ private int sleepTime;
+ private TimeUnit timeUnit;
+ private PacketSender lastSender;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- // TestPacketHandler overrides ---------------------------------------------------
-
+ public void setSleepTime(int sleepTime, TimeUnit timeUnit)
+ {
+ this.sleepTime = sleepTime;
+ this.timeUnit = timeUnit;
+ }
+
+ public PacketSender getLastSender()
+ {
+ return lastSender;
+ }
+
+ // TestPacketHandler overrides -----------------------------------
+
@Override
protected void doHandle(AbstractPacket packet, PacketSender sender)
{
Assert.assertTrue(packet instanceof TextPacket);
+
+ lastSender = sender;
+
+ if (sleepTime > 0)
+ {
+ try
+ {
+ Thread.sleep(MILLISECONDS.convert(sleepTime, timeUnit));
+ } catch (InterruptedException e)
+ {
+ fail();
+ }
+ }
TextPacket request = (TextPacket) packet;
- TextPacket response = new TextPacket(reverse(request.getText()));
- response.normalize(request);
-
- sender.send(response);
+ if (!NO_ID_SET.equals(request.getCallbackID())
+ || request.getCorrelationID() != NO_CORRELATION_ID)
+ {
+ TextPacket response = new TextPacket(reverse(request.getText()));
+ response.normalize(request);
+ sender.send(response);
+ }
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,157 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.core.remoting;
-
-import static org.jboss.messaging.core.remoting.Constants.NO_CORRELATION_ID;
-import static org.jboss.messaging.core.remoting.Constants.NO_ID_SET;
-import static org.jboss.messaging.core.remoting.TransportType.HTTP;
-import static org.jboss.test.messaging.core.remoting.TestSupport.reverse;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.TextPacket;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class ReverseServerHandler extends IoHandlerAdapter
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private List<AbstractPacket> packets = new ArrayList<AbstractPacket>();
-
- private long sleepTime = 0;
-
- private List<IoSession> sessions = new ArrayList<IoSession>();
-
- private TransportType transport;
-
- private CountDownLatch latch;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- ReverseServerHandler(TransportType transport)
- {
- this.transport = transport;
- }
-
- // Public --------------------------------------------------------
-
- public void setSleepTime(long time, TimeUnit unit)
- {
- this.sleepTime = unit.toMillis(time);
- }
-
- public List<AbstractPacket> getPackets()
- {
- return packets;
- }
-
- public List<IoSession> getSessions()
- {
- return sessions;
- }
-
- // IoHandlerAdapter overrides ------------------------------------
-
- @Override
- public void messageReceived(IoSession session, Object msg)
- {
- try
- {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- packets.add((AbstractPacket) msg);
-
- // TODO put this logic in the real server handler
- if (((AbstractPacket) msg).getCorrelationID() == NO_CORRELATION_ID
- && transport == HTTP)
- {
- session.write(new NullPacket());
- }
-
- if (msg instanceof TextPacket)
- {
- TextPacket incomingPacket = (TextPacket) msg;
- if (mustReply(incomingPacket))
- {
- TextPacket p = new TextPacket(reverse(incomingPacket.getText()));
- p.setCorrelationID(incomingPacket.getCorrelationID());
- p.setVersion(incomingPacket.getVersion());
- if (!NO_ID_SET.equals(incomingPacket.getCallbackID()))
- {
- p.setTargetID(incomingPacket.getCallbackID());
- }
- session.write(p);
- }
- }
-
- if (latch != null)
- latch.countDown();
-
- }
-
- @Override
- public void sessionCreated(IoSession session) throws Exception
- {
- sessions.add(session);
- }
-
- @Override
- public void sessionClosed(IoSession session) throws Exception
- {
- super.sessionClosed(session);
- sessions.remove(session);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private boolean mustReply(AbstractPacket incomingPacket)
- {
- boolean mustReply = (incomingPacket.getCorrelationID() != NO_CORRELATION_ID)
- || (!NO_ID_SET.equals(incomingPacket.getCallbackID()));
- return mustReply;
- }
-
- public void await() throws InterruptedException
- {
- latch.await();
- }
-
- /**
- * @param manyMessages
- */
- public void expectMessage(int count)
- {
- latch = new CountDownLatch(count);
- }
-
- // Inner classes -------------------------------------------------
-
-}
\ No newline at end of file
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,15 +6,10 @@
*/
package org.jboss.test.messaging.core.remoting;
-
-import java.util.ArrayList;
import java.util.List;
-import org.apache.mina.common.IoSession;
-import org.jboss.messaging.core.remoting.AbstractPacketHandler;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
/**
@@ -28,6 +23,8 @@
// Attributes ----------------------------------------------------
+ private ReversePacketHandler serverPacketHandler;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -36,27 +33,29 @@
public void testClientHandlePacketSentByServer() throws Exception
{
- TestPacketHandler clientHandler = new TestPacketHandler();
+ TestPacketHandler clientHandler = new TestPacketHandler();
PacketDispatcher.client.register(clientHandler);
- serverHandler.expectMessage(1);
+ serverPacketHandler.expectMessage(1);
clientHandler.expectMessage(1);
TextPacket packet = new TextPacket(
- "testClientHandlePacketSentByServer from client");
- packet.setVersion((byte)1);
- // send a packet to create the IoSession on the server
+ "testClientHandlePacketSentByServer from client");
+ packet.setVersion((byte) 1);
+ packet.setTargetID(serverPacketHandler.getID());
+ // send a packet to create a sender when the server
+ // handles the packet
client.sendOneWay(packet);
- serverHandler.await();
-
- assertEquals(1, serverHandler.getSessions().size());
- IoSession serverSession = serverHandler.getSessions().get(0);
+ serverPacketHandler.await();
+
+ assertNotNull(serverPacketHandler.getLastSender());
+ PacketSender sender = serverPacketHandler.getLastSender();
TextPacket packetFromServer = new TextPacket(
"testClientHandlePacketSentByServer from server");
- packetFromServer.setVersion((byte)1);
+ packetFromServer.setVersion((byte) 1);
packetFromServer.setTargetID(clientHandler.getID());
- serverSession.write(packetFromServer);
+ sender.send(packetFromServer);
clientHandler.await();
@@ -72,12 +71,17 @@
{
startServer(PORT, TRANSPORT);
startClient(PORT, TRANSPORT);
+
+ serverPacketHandler = new ReversePacketHandler();
+ PacketDispatcher.server.register(serverPacketHandler);
}
public void tearDown() throws Exception
{
+ PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
client.disconnect();
- serverAcceptor.unbind();
+ stopServer();
}
// Package protected ---------------------------------------------
@@ -87,24 +91,4 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
- private final class ClientTargetHandler extends AbstractPacketHandler
- {
- private final List<TextPacket> packets;
-
- private ClientTargetHandler()
- {
- packets = new ArrayList<TextPacket>();
- }
-
- public void handle(AbstractPacket packet, PacketSender replier)
- {
- packets.add((TextPacket) packet);
- }
-
- public List<TextPacket> getPackets()
- {
- return packets;
- }
- }
}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestPacketHandler.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -15,12 +15,12 @@
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
-class TestPacketHandler extends AbstractPacketHandler
+public class TestPacketHandler extends AbstractPacketHandler
{
private final List<TextPacket> packets;
private CountDownLatch latch;
- TestPacketHandler()
+ public TestPacketHandler()
{
packets = new ArrayList<TextPacket>();
}
@@ -40,10 +40,10 @@
{
packets.add((TextPacket) packet);
+ doHandle(packet, sender);
+
if (latch != null)
latch.countDown();
-
- doHandle(packet, sender);
}
protected void doHandle(AbstractPacket packet, PacketSender sender)
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,23 +6,12 @@
*/
package org.jboss.test.messaging.core.remoting;
-import static org.jboss.messaging.core.remoting.TransportType.HTTP;
import static org.jboss.messaging.core.remoting.TransportType.TCP;
-
-import java.net.InetSocketAddress;
-
import junit.framework.TestCase;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.logging.LoggingFilter;
-import org.apache.mina.filter.logging.MdcInjectionFilter;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jboss.messaging.core.remoting.Client;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
+import org.jboss.messaging.core.remoting.internal.MinaService;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -43,14 +32,10 @@
// Attributes ----------------------------------------------------
- NioSocketAcceptor serverAcceptor;
-
- ReverseServerHandler serverHandler;
-
- ReversePacketHandler serverPacketHandler = new ReversePacketHandler();
-
Client client;
+ private MinaService service;
+
public static final int PORT = 9090;
// Static --------------------------------------------------------
@@ -81,53 +66,19 @@
void startServer(int port, TransportType transport) throws Exception
{
startServer(port, transport, false);
- PacketDispatcher.server.register(serverPacketHandler);
}
void startServer(int port, TransportType transport, boolean useSSL)
throws Exception
{
- serverAcceptor = new NioSocketAcceptor();
- serverHandler = new ReverseServerHandler(transport);
-
- MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
- serverAcceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
-
- if (useSSL)
- addSSLSupport(serverAcceptor.getFilterChain());
-
- info("using " + transport);
- if (transport == TCP)
- {
- serverAcceptor.getFilterChain().addLast("codec",
- new ProtocolCodecFilter(new PacketCodecFactory()));
- } else
- {
- assert transport == HTTP;
-
- // TODO add http support
- // serverAcceptor.getFilterChain().addLast("http_codec",
- // new ProtocolCodecFilter(new HttpServerCodecFactory()));
- // serverAcceptor.getFilterChain().addLast("http_filter",
- // new HTTPFilter(true));
- }
- serverAcceptor.getFilterChain().addLast("logger", new LoggingFilter());
-
- // Bind
- serverAcceptor.setLocalAddress(new InetSocketAddress(port));
- info("listening on " + port);
- serverAcceptor.setHandler(serverHandler);
- serverAcceptor.bind();
+ service = new MinaService();
+ service.setPort(port);
+ service.start();
}
-
- private void addSSLSupport(DefaultIoFilterChainBuilder chain)
- throws Exception
+
+ void stopServer()
{
- // TODO add SSL support
- // SslFilter sslFilter = new
- // SslFilter(BogusSslContextFactory.getInstance(true));
- // chain.addLast("sslFilter", sslFilter);
- // info("SSL enabled");
+ service.stop();
}
void startClient(int port, TransportType transport) throws Exception
Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -1,108 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.core.remoting.internal;
-
-import static java.util.UUID.randomUUID;
-import junit.framework.TestCase;
-
-import org.apache.mina.common.IoBuffer;
-import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
-import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class MinaRemotingBuffer extends TestCase
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- private RemotingBuffer wrapper;
- private IoBuffer buffer;
-
- @Override
- protected void setUp() throws Exception
- {
- buffer = IoBuffer.allocate(256);
- buffer.setAutoExpand(true);
- wrapper = new MinaPacketCodec.BufferWrapper(buffer);
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- wrapper = null;
- buffer = null;
-
- }
-
- public void testNullString() throws Exception
- {
- assertNull(putAndGetNullableString(null));
- }
-
- public void testEmptyString() throws Exception
- {
- String result = putAndGetNullableString("");
-
- assertNotNull(result);
- assertEquals("", result);
- }
-
- public void testNonEmptyString() throws Exception
- {
- String junk = randomUUID().toString();
-
- String result = putAndGetNullableString(junk);
-
- assertNotNull(result);
- assertEquals(junk, result);
- }
-
- public void testPutTrueBoolean() throws Exception
- {
- wrapper.putBoolean(true);
-
- buffer.flip();
-
- assertTrue(wrapper.getBoolean());
- }
-
- public void testPutFalseBoolean() throws Exception
- {
- wrapper.putBoolean(false);
-
- buffer.flip();
-
- assertFalse(wrapper.getBoolean());
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private String putAndGetNullableString(String nullableString) throws Exception
- {
- wrapper.putNullableString(nullableString);
-
- buffer.flip();
-
- return wrapper.getNullableString();
- }
- // Inner classes -------------------------------------------------
-}
Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java (from rev 3389, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBuffer.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/internal/MinaRemotingBufferTest.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -0,0 +1,108 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.internal;
+
+import static java.util.UUID.randomUUID;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.IoBuffer;
+import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
+import org.jboss.messaging.core.remoting.internal.MinaPacketCodec;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class MinaRemotingBufferTest extends TestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private RemotingBuffer wrapper;
+ private IoBuffer buffer;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ buffer = IoBuffer.allocate(256);
+ buffer.setAutoExpand(true);
+ wrapper = new MinaPacketCodec.BufferWrapper(buffer);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ wrapper = null;
+ buffer = null;
+
+ }
+
+ public void testNullString() throws Exception
+ {
+ assertNull(putAndGetNullableString(null));
+ }
+
+ public void testEmptyString() throws Exception
+ {
+ String result = putAndGetNullableString("");
+
+ assertNotNull(result);
+ assertEquals("", result);
+ }
+
+ public void testNonEmptyString() throws Exception
+ {
+ String junk = randomUUID().toString();
+
+ String result = putAndGetNullableString(junk);
+
+ assertNotNull(result);
+ assertEquals(junk, result);
+ }
+
+ public void testPutTrueBoolean() throws Exception
+ {
+ wrapper.putBoolean(true);
+
+ buffer.flip();
+
+ assertTrue(wrapper.getBoolean());
+ }
+
+ public void testPutFalseBoolean() throws Exception
+ {
+ wrapper.putBoolean(false);
+
+ buffer.flip();
+
+ assertFalse(wrapper.getBoolean());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private String putAndGetNullableString(String nullableString) throws Exception
+ {
+ wrapper.putNullableString(nullableString);
+
+ buffer.flip();
+
+ return wrapper.getNullableString();
+ }
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -6,6 +6,8 @@
*/
package org.jboss.test.messaging.core.remoting.wireformat;
+import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.LONG_LENGTH;
+import static org.jboss.messaging.core.remoting.codec.AbstractPacketCodec.sizeof;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_BROWSER_RESET;
@@ -93,7 +95,7 @@
import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveriesRequestCodec;
import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveryRequestCodec;
import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveryResponseCodec;
-import org.jboss.messaging.core.remoting.codec.AddCallbackMessageCodec;
+import org.jboss.messaging.core.remoting.codec.UpdateCallbackMessageCodec;
import org.jboss.messaging.core.remoting.codec.AddTemporaryDestinationMessageCodec;
import org.jboss.messaging.core.remoting.codec.BrowserHasNextMessageResponseCodec;
import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockRequestCodec;
@@ -225,6 +227,20 @@
return randomString().getBytes();
}
+ private static void checkHeader(RemotingBuffer buffer, AbstractPacket packet)
+ throws Exception
+ {
+ assertEquals(buffer.get(), packet.getType().byteValue());
+ assertEquals(buffer.get(), packet.getVersion());
+
+ int headerLength = LONG_LENGTH + sizeof(packet.getTargetID())
+ + sizeof(packet.getCallbackID());
+ assertEquals(buffer.getInt(), headerLength);
+ assertEquals(buffer.getLong(), packet.getCorrelationID());
+ assertEquals(buffer.getNullableString(), packet.getTargetID());
+ assertEquals(buffer.getNullableString(), packet.getCallbackID());
+ }
+
private static void assertEqualsAcks(List<Ack> expected, List<Ack> actual)
{
assertEquals(expected.size(), actual.size());
@@ -329,10 +345,17 @@
packet.setCorrelationID(randomLong());
packet.setTargetID(randomString());
- AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
- NULL, NullPacket.class);
- AbstractPacket decodedPacket = encodeAndDecode(packet, codec);
+ AbstractPacketCodec<AbstractPacket> codec = PacketCodecFactory
+ .createCodecForEmptyPacket(NULL, NullPacket.class);
+ SimpleRemotingBuffer buffer = encode(packet, codec);
+ checkHeader(buffer, packet);
+ // no body
+ assertEquals(0, buffer.getInt());
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
assertTrue(decodedPacket instanceof NullPacket);
NullPacket p = (NullPacket) decodedPacket;
@@ -350,9 +373,13 @@
JMSExceptionMessage message = new JMSExceptionMessage(e);
addVersion(message);
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new JMSExceptionMessageCodec());
+ AbstractPacketCodec<JMSExceptionMessage> codec = new JMSExceptionMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
assertTrue(decodedPacket instanceof JMSExceptionMessage);
JMSExceptionMessage decodedMessage = (JMSExceptionMessage) decodedPacket;
@@ -365,10 +392,16 @@
{
TextPacket packet = new TextPacket("testTextPacket");
addVersion(packet);
+ AbstractPacketCodec<TextPacket> codec = new TextPacketCodec();
- AbstractPacket decodedPacket = encodeAndDecode(packet,
- new TextPacketCodec());
+ SimpleRemotingBuffer buffer = encode(packet, codec);
+ checkHeader(buffer, packet);
+ assertEquals(buffer.getInt(), sizeof(packet.getText()));
+ assertEquals(buffer.getNullableString(), packet.getText());
+ buffer.rewind();
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
assertTrue(decodedPacket instanceof TextPacket);
TextPacket p = (TextPacket) decodedPacket;
@@ -388,9 +421,12 @@
CreateConnectionRequest request = new CreateConnectionRequest(version,
remotingSessionID, clientVMID, failedNodeID, username, password);
addVersion(request);
+ AbstractPacketCodec<CreateConnectionRequest> codec = new ConnectionFactoryCreateConnectionRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new ConnectionFactoryCreateConnectionRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateConnectionRequest);
CreateConnectionRequest decodedRequest = (CreateConnectionRequest) decodedPacket;
@@ -410,9 +446,12 @@
CreateConnectionResponse response = new CreateConnectionResponse(
randomString(), 1234);
addVersion(response);
+ AbstractPacketCodec<CreateConnectionResponse> codec = new ConnectionFactoryCreateConnectionResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new ConnectionFactoryCreateConnectionResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateConnectionResponse);
@@ -430,7 +469,10 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_GETCLIENTAOPSTACK, GetClientAOPStackRequest.class);
- AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof GetClientAOPStackRequest);
assertEquals(REQ_GETCLIENTAOPSTACK, decodedPacket.getType());
@@ -442,9 +484,12 @@
GetClientAOPStackResponse response = new GetClientAOPStackResponse(stack);
addVersion(response);
+ AbstractPacketCodec codec = new GetClientAOPStackResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new GetClientAOPStackResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof GetClientAOPStackResponse);
GetClientAOPStackResponse decodedResponse = (GetClientAOPStackResponse) decodedPacket;
@@ -459,8 +504,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_GETTOPOLOGY, GetTopologyRequest.class);
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof GetTopologyRequest);
assertEquals(REQ_GETTOPOLOGY, decodedPacket.getType());
@@ -479,9 +527,12 @@
new HashMap());
GetTopologyResponse response = new GetTopologyResponse(topology);
addVersion(response);
+ AbstractPacketCodec codec = new GetTopologyResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new GetTopologyResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof GetTopologyResponse);
GetTopologyResponse decodedResponse = (GetTopologyResponse) decodedPacket;
@@ -494,9 +545,12 @@
UpdateCallbackMessage message = new UpdateCallbackMessage(randomString(),
randomString(), true);
addVersion(message);
+ AbstractPacketCodec codec = new UpdateCallbackMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new AddCallbackMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof UpdateCallbackMessage);
@@ -512,9 +566,12 @@
{
CreateSessionRequest request = new CreateSessionRequest(true, 0, false);
addVersion(request);
+ AbstractPacketCodec codec = new CreateSessionRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new CreateSessionRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateSessionRequest);
@@ -531,9 +588,12 @@
CreateSessionResponse response = new CreateSessionResponse(
randomString(), 23, false);
addVersion(response);
+ AbstractPacketCodec codec = new CreateSessionResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new CreateSessionResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateSessionResponse);
@@ -549,9 +609,12 @@
{
IDBlockRequest request = new IDBlockRequest(23);
addVersion(request);
+ AbstractPacketCodec codec = new IDBlockRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new IDBlockRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof IDBlockRequest);
@@ -565,9 +628,12 @@
IDBlockResponse response = new IDBlockResponse(randomLong(),
randomLong() * 2);
addVersion(response);
+ AbstractPacketCodec codec = new IDBlockResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new IDBlockResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof IDBlockResponse);
@@ -582,8 +648,12 @@
SendMessage packet = new SendMessage(new JBossMessage(System
.currentTimeMillis()), true, randomLong());
addVersion(packet);
+ AbstractPacketCodec codec = new SendMessageCodec();
+ SimpleRemotingBuffer buffer = encode(packet, codec);
+ checkHeader(buffer, packet);
+ buffer.rewind();
- AbstractPacket p = encodeAndDecode(packet, new SendMessageCodec());
+ AbstractPacket p = codec.decode(buffer);
assertTrue(p instanceof SendMessage);
@@ -603,9 +673,12 @@
CreateConsumerRequest request = new CreateConsumerRequest(destination,
"color = 'red'", false, "subscription", false, false);
addVersion(request);
+ AbstractPacketCodec codec = new CreateConsumerRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new CreateConsumerRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateConsumerRequest);
@@ -627,9 +700,12 @@
CreateDestinationRequest request = new CreateDestinationRequest(
"testCreateDestinationRequest", false);
addVersion(request);
+ AbstractPacketCodec codec = new CreateDestinationRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new CreateDestinationRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateDestinationRequest);
@@ -646,9 +722,12 @@
CreateDestinationResponse response = new CreateDestinationResponse(
destination);
addVersion(response);
+ AbstractPacketCodec codec = new CreateDestinationResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new CreateDestinationResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateDestinationResponse);
@@ -665,9 +744,12 @@
CreateDestinationResponse response = new CreateDestinationResponse(
destination);
addVersion(response);
+ AbstractPacketCodec codec = new CreateDestinationResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new CreateDestinationResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateDestinationResponse);
@@ -683,9 +765,12 @@
CreateConsumerResponse response = new CreateConsumerResponse(
randomString(), 23, 42, randomLong());
addVersion(response);
+ AbstractPacketCodec codec = new CreateConsumerResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new CreateConsumerResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateConsumerResponse);
@@ -705,8 +790,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
MSG_STARTCONNECTION, StartConnectionMessage.class);
+ SimpleRemotingBuffer buffer = encode(packet, codec);
+ checkHeader(buffer, packet);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(packet, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof StartConnectionMessage);
assertEquals(MSG_STARTCONNECTION, decodedPacket.getType());
@@ -719,8 +807,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
MSG_STOPCONNECTION, StopConnectionMessage.class);
+ SimpleRemotingBuffer buffer = encode(packet, codec);
+ checkHeader(buffer, packet);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(packet, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof StopConnectionMessage);
assertEquals(MSG_STOPCONNECTION, decodedPacket.getType());
@@ -730,9 +821,12 @@
{
ChangeRateMessage message = new ChangeRateMessage(0.63f);
addVersion(message);
+ AbstractPacketCodec codec = new ChangeRateMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new ChangeRateMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ChangeRateMessage);
ChangeRateMessage decodedMessage = (ChangeRateMessage) decodedPacket;
@@ -746,9 +840,12 @@
DeliverMessage message = new DeliverMessage(msg, randomString(),
randomLong(), 23);
addVersion(message);
+ AbstractPacketCodec codec = new DeliverMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new DeliverMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof DeliverMessage);
DeliverMessage decodedMessage = (DeliverMessage) decodedPacket;
@@ -766,9 +863,12 @@
AcknowledgeDeliveryRequest request = new AcknowledgeDeliveryRequest(
randomLong());
addVersion(request);
+ AbstractPacketCodec codec = new AcknowledgeDeliveryRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new AcknowledgeDeliveryRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof AcknowledgeDeliveryRequest);
AcknowledgeDeliveryRequest decodedRequest = (AcknowledgeDeliveryRequest) decodedPacket;
@@ -786,9 +886,12 @@
AcknowledgeDeliveriesMessage request = new AcknowledgeDeliveriesMessage(
acks);
addVersion(request);
+ AbstractPacketCodec codec = new AcknowledgeDeliveriesRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new AcknowledgeDeliveriesRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof AcknowledgeDeliveriesMessage);
AcknowledgeDeliveriesMessage decodedRequest = (AcknowledgeDeliveriesMessage) decodedPacket;
@@ -801,9 +904,12 @@
AcknowledgeDeliveryResponse response = new AcknowledgeDeliveryResponse(
true);
addVersion(response);
+ AbstractPacketCodec codec = new AcknowledgeDeliveryResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new AcknowledgeDeliveryResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof AcknowledgeDeliveryResponse);
AcknowledgeDeliveryResponse decodedResponse = (AcknowledgeDeliveryResponse) decodedPacket;
@@ -816,9 +922,12 @@
{
ClosingRequest request = new ClosingRequest(randomLong());
addVersion(request);
+ AbstractPacketCodec codec = new ClosingRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new ClosingRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ClosingRequest);
ClosingRequest decodedRequest = (ClosingRequest) decodedPacket;
@@ -830,9 +939,12 @@
{
ClosingResponse response = new ClosingResponse(System.currentTimeMillis());
addVersion(response);
+ AbstractPacketCodec codec = new ClosingResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new ClosingResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ClosingResponse);
ClosingResponse decodedRequest = (ClosingResponse) decodedPacket;
@@ -847,8 +959,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
MSG_CLOSE, CloseMessage.class);
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CloseMessage);
CloseMessage decodedMessage = (CloseMessage) decodedPacket;
@@ -864,9 +979,12 @@
SendTransactionMessage message = new SendTransactionMessage(tr, true);
addVersion(message);
+ AbstractPacketCodec codec = new SendTransactionMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new SendTransactionMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SendTransactionMessage);
SendTransactionMessage decodedMessage = (SendTransactionMessage) decodedPacket;
@@ -888,8 +1006,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_GETPREPAREDTRANSACTIONS, GetPreparedTransactionsRequest.class);
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof GetPreparedTransactionsRequest);
assertEquals(REQ_GETPREPAREDTRANSACTIONS, decodedPacket.getType());
@@ -904,9 +1025,12 @@
GetPreparedTransactionsResponse response = new GetPreparedTransactionsResponse(
xids);
addVersion(response);
+ AbstractPacketCodec codec = new GetPreparedTransactionsResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new GetPreparedTransactionsResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof GetPreparedTransactionsResponse);
GetPreparedTransactionsResponse decodedResponse = (GetPreparedTransactionsResponse) decodedPacket;
@@ -921,8 +1045,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_GETCLIENTID, GetClientIDRequest.class);
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof GetClientIDRequest);
assertEquals(REQ_GETCLIENTID, decodedPacket.getType());
@@ -932,9 +1059,12 @@
{
GetClientIDResponse response = new GetClientIDResponse(randomString());
addVersion(response);
+ AbstractPacketCodec codec = new GetClientIDResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new GetClientIDResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof GetClientIDResponse);
GetClientIDResponse decodedResponse = (GetClientIDResponse) decodedPacket;
@@ -946,9 +1076,12 @@
{
SetClientIDMessage message = new SetClientIDMessage(randomString());
addVersion(message);
+ AbstractPacketCodec codec = new SetClientIDMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new SetClientIDMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SetClientIDMessage);
SetClientIDMessage decodedMessage = (SetClientIDMessage) decodedPacket;
@@ -969,9 +1102,12 @@
RecoverDeliveriesMessage message = new RecoverDeliveriesMessage(
deliveries, randomString());
addVersion(message);
+ AbstractPacketCodec codec = new RecoverDeliveriesMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new RecoverDeliveriesMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof RecoverDeliveriesMessage);
RecoverDeliveriesMessage decodedMessage = (RecoverDeliveriesMessage) decodedPacket;
@@ -986,9 +1122,12 @@
Cancel cancel = new DefaultCancel(randomLong(), 23, true, false);
CancelDeliveryMessage message = new CancelDeliveryMessage(cancel);
addVersion(message);
+ AbstractPacketCodec codec = new CancelDeliveryMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new CancelDeliveryMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CancelDeliveryMessage);
CancelDeliveryMessage decodedMessage = (CancelDeliveryMessage) decodedPacket;
@@ -1010,9 +1149,12 @@
CancelDeliveriesMessage message = new CancelDeliveriesMessage(cancels);
addVersion(message);
+ AbstractPacketCodec codec = new CancelDeliveriesMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new CancelDeliveriesMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CancelDeliveriesMessage);
CancelDeliveriesMessage decodedMessage = (CancelDeliveriesMessage) decodedPacket;
@@ -1027,9 +1169,12 @@
CreateBrowserRequest request = new CreateBrowserRequest(destination,
"color = 'red'");
addVersion(request);
+ AbstractPacketCodec codec = new CreateBrowserRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request,
- new CreateBrowserRequestCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateBrowserRequest);
@@ -1043,9 +1188,12 @@
{
CreateBrowserResponse response = new CreateBrowserResponse(randomString());
addVersion(response);
+ AbstractPacketCodec codec = new CreateBrowserResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new CreateBrowserResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CreateBrowserResponse);
@@ -1061,8 +1209,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
MSG_BROWSER_RESET, BrowserResetMessage.class);
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof BrowserResetMessage);
assertEquals(MSG_BROWSER_RESET, decodedPacket.getType());
@@ -1075,8 +1226,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_BROWSER_HASNEXTMESSAGE, BrowserHasNextMessageRequest.class);
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof BrowserHasNextMessageRequest);
assertEquals(REQ_BROWSER_HASNEXTMESSAGE, decodedPacket.getType());
@@ -1087,9 +1241,12 @@
BrowserHasNextMessageResponse response = new BrowserHasNextMessageResponse(
false);
addVersion(response);
+ AbstractPacketCodec codec = new BrowserHasNextMessageResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new BrowserHasNextMessageResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof BrowserHasNextMessageResponse);
@@ -1105,8 +1262,11 @@
AbstractPacketCodec codec = PacketCodecFactory.createCodecForEmptyPacket(
REQ_BROWSER_NEXTMESSAGE, BrowserNextMessageRequest.class);
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(request, codec);
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof BrowserNextMessageRequest);
assertEquals(REQ_BROWSER_NEXTMESSAGE, decodedPacket.getType());
@@ -1117,9 +1277,12 @@
JBossMessage msg = new JBossMessage(randomLong());
BrowserNextMessageResponse response = new BrowserNextMessageResponse(msg);
addVersion(response);
+ AbstractPacketCodec codec = new BrowserNextMessageResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new BrowserNextMessageResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof BrowserNextMessageResponse);
BrowserNextMessageResponse decodedResponse = (BrowserNextMessageResponse) decodedPacket;
@@ -1133,9 +1296,13 @@
BrowserNextMessageBlockRequest request = new BrowserNextMessageBlockRequest(
randomLong());
addVersion(request);
-
- AbstractPacket decodedPacket = encodeAndDecode(request, new BrowserNextMessageBlockRequestCodec());
+ AbstractPacketCodec codec = new BrowserNextMessageBlockRequestCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ buffer.rewind();
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
assertTrue(decodedPacket instanceof BrowserNextMessageBlockRequest);
BrowserNextMessageBlockRequest decodedRequest = (BrowserNextMessageBlockRequest) decodedPacket;
assertEquals(REQ_BROWSER_NEXTMESSAGEBLOCK, decodedPacket.getType());
@@ -1150,9 +1317,12 @@
BrowserNextMessageBlockResponse response = new BrowserNextMessageBlockResponse(
messages);
addVersion(response);
+ AbstractPacketCodec codec = new BrowserNextMessageBlockResponseCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(response,
- new BrowserNextMessageBlockResponseCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof BrowserNextMessageBlockResponse);
BrowserNextMessageBlockResponse decodedResponse = (BrowserNextMessageBlockResponse) decodedPacket;
@@ -1168,9 +1338,12 @@
UnsubscribeMessage message = new UnsubscribeMessage(
"testUnsubscribeMessage");
addVersion(message);
+ AbstractPacketCodec codec = new UnsubscribeMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new UnsubscribeMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof UnsubscribeMessage);
UnsubscribeMessage decodedMessage = (UnsubscribeMessage) decodedPacket;
@@ -1186,9 +1359,12 @@
AddTemporaryDestinationMessage message = new AddTemporaryDestinationMessage(
destination);
addVersion(message);
+ AbstractPacketCodec codec = new AddTemporaryDestinationMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new AddTemporaryDestinationMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof AddTemporaryDestinationMessage);
@@ -1204,9 +1380,12 @@
DeleteTemporaryDestinationMessage message = new DeleteTemporaryDestinationMessage(
destination);
addVersion(message);
+ AbstractPacketCodec codec = new DeleteTemporaryDestinationMessageCodec();
+ SimpleRemotingBuffer buffer = encode(message, codec);
+ checkHeader(buffer, message);
+ buffer.rewind();
- AbstractPacket decodedPacket = encodeAndDecode(message,
- new DeleteTemporaryDestinationMessageCodec());
+ AbstractPacket decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof DeleteTemporaryDestinationMessage);
@@ -1221,18 +1400,17 @@
// Private -------------------------------------------------------
- private AbstractPacket encodeAndDecode(AbstractPacket packet,
+ private SimpleRemotingBuffer encode(AbstractPacket packet,
AbstractPacketCodec codec) throws Exception
{
SimpleRemotingBuffer buf = new SimpleRemotingBuffer();
-
- codec.encode(packet, buf);
+
+ codec.encode(packet, buf);
buf.flip();
- return codec.decode(buf);
+
+ return buf;
}
- // Inner classes -------------------------------------------------
-
private final class SimpleRemotingBuffer implements RemotingBuffer
{
private static final byte NON_NULL_STRING = (byte) 0;
@@ -1251,8 +1429,14 @@
dos = null;
dais = new DataInputStream(
new ByteArrayInputStream(baos.toByteArray()));
+ dais.mark(1024);
}
+ public void rewind() throws IOException
+ {
+ dais.reset();
+ }
+
public byte get()
{
try
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-12-03 13:56:39 UTC (rev 3397)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-12-03 14:33:46 UTC (rev 3398)
@@ -24,7 +24,6 @@
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
-import java.net.InetSocketAddress;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -33,7 +32,6 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Formatter;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
@@ -58,10 +56,6 @@
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.logging.LoggingFilter;
-import org.apache.mina.filter.logging.MdcInjectionFilter;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.hsqldb.Server;
import org.hsqldb.persist.HsqlProperties;
import org.jboss.jms.jndi.JMSProviderAdapter;
@@ -69,8 +63,7 @@
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.remoting.JMSServerInvocationHandler;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
-import org.jboss.messaging.core.remoting.server.ServerHandler;
+import org.jboss.messaging.core.remoting.internal.MinaService;
import org.jboss.messaging.util.JNDIUtil;
import org.jboss.messaging.util.XMLUtil;
import org.jboss.remoting.InvokerLocator;
@@ -238,7 +231,7 @@
// so we don't start up multiple servers with services running on the same port
private int serverIndex;
- private NioSocketAcceptor acceptor;
+ private MinaService minaService;
// Static ---------------------------------------------------------------------------------------
@@ -538,7 +531,8 @@
stopService(REMOTING_OBJECT_NAME);
- stopMINAServer();
+ if (minaService != null)
+ minaService.stop();
if (httpConnectionFactory)
{
@@ -1415,58 +1409,12 @@
if (!transport.equals("http"))
{
- startMINAServer(locator.getPort() + 1000);
+ minaService = new MinaService();
+ minaService.setPort(locator.getPort() + 1000);
+ minaService.start();
}
}
-
- private void startMINAServer(int port) throws Exception
- {
- if (acceptor == null)
- {
- info("Starting MINA on port " + port);
-
- acceptor = new NioSocketAcceptor();
-
- // Prepare the configuration
- MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
- acceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
- acceptor.getFilterChain().addLast("codec",
- new ProtocolCodecFilter(new PacketCodecFactory()));
- acceptor.getFilterChain().addLast("logger", new LoggingFilter());
-
- // Bind
- acceptor.setLocalAddress(new InetSocketAddress("127.0.0.1", port));
- acceptor.setReuseAddress(true);
- acceptor.getSessionConfig().setReuseAddress(true);
- acceptor.getSessionConfig().setKeepAlive(true);
- acceptor.setDisconnectOnUnbind(false);
-
- acceptor.setHandler(new ServerHandler());
- acceptor.bind();
-
- info("MINA started");
- }
- }
-
- private void stopMINAServer()
- {
- if (acceptor != null)
- {
- acceptor.unbind();
- acceptor.dispose();
- acceptor = null;
-
- info("Stopped MINA ");
- }
- }
-
- private void info(String s)
- {
- log.info(new Formatter().format("### %-30s ###\n", s).toString());
- }
-
-
private void startSecurityManager() throws Exception
{
MockJBossSecurityManager sm = new MockJBossSecurityManager();
More information about the jboss-cvs-commits
mailing list