[jboss-cvs] JBoss Messaging SVN: r3459 - in branches/Branch_JBMESSAGING-544: tests/src/org/jboss/test/messaging/core/remoting/integration and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 10 08:12:48 EST 2007
Author: jmesnil
Date: 2007-12-10 08:12:48 -0500 (Mon, 10 Dec 2007)
New Revision: 3459
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaClientTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/ReversePacketHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TargetHandlerTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TestSupport.java
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544 Replace client-server transport with NIO based transport
* refactoring + code clean up
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java 2007-12-10 13:11:38 UTC (rev 3458)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java 2007-12-10 13:12:48 UTC (rev 3459)
@@ -31,7 +31,6 @@
import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java 2007-12-10 13:11:38 UTC (rev 3458)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java 2007-12-10 13:12:48 UTC (rev 3459)
@@ -15,7 +15,6 @@
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java 2007-12-10 13:11:38 UTC (rev 3458)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java 2007-12-10 13:12:48 UTC (rev 3459)
@@ -6,7 +6,11 @@
*/
package org.jboss.messaging.core.remoting.integration;
+import java.util.concurrent.TimeUnit;
+
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.reqres.Request;
+import org.apache.mina.filter.reqres.Response;
import org.jboss.messaging.core.remoting.NIOSession;
/**
@@ -36,25 +40,25 @@
// Public --------------------------------------------------------
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.remoting.integration.NIOSession#getID()
- */
public long getID()
{
return session.getId();
}
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.remoting.integration.NIOSession#write(java.lang.Object)
- */
public void write(Object object)
{
session.write(object);
}
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.remoting.integration.NIOSession#isConnected()
- */
+ public Object writeAndBlock(long requestID, Object object, long timeout,
+ TimeUnit timeUnit) throws Throwable
+ {
+ Request req = new Request(requestID, object, timeout, timeUnit);
+ session.write(req);
+ Response response = req.awaitResponse();
+ return response.getMessage();
+ }
+
public boolean isConnected()
{
return session.isConnected();
Copied: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/PacketCodecFactory.java (from rev 3446, branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/PacketCodecFactory.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/PacketCodecFactory.java 2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,327 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.integration;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+
+import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
+import org.jboss.messaging.core.remoting.codec.AbstractPacketCodec;
+import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveriesRequestCodec;
+import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveryRequestCodec;
+import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveryResponseCodec;
+import org.jboss.messaging.core.remoting.codec.AddTemporaryDestinationMessageCodec;
+import org.jboss.messaging.core.remoting.codec.BrowserHasNextMessageResponseCodec;
+import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockRequestCodec;
+import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockResponseCodec;
+import org.jboss.messaging.core.remoting.codec.BrowserNextMessageResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CancelDeliveriesMessageCodec;
+import org.jboss.messaging.core.remoting.codec.CancelDeliveryMessageCodec;
+import org.jboss.messaging.core.remoting.codec.ChangeRateMessageCodec;
+import org.jboss.messaging.core.remoting.codec.ClosingRequestCodec;
+import org.jboss.messaging.core.remoting.codec.ClosingResponseCodec;
+import org.jboss.messaging.core.remoting.codec.ConnectionFactoryCreateConnectionRequestCodec;
+import org.jboss.messaging.core.remoting.codec.ConnectionFactoryCreateConnectionResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CreateBrowserRequestCodec;
+import org.jboss.messaging.core.remoting.codec.CreateBrowserResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CreateConsumerRequestCodec;
+import org.jboss.messaging.core.remoting.codec.CreateConsumerResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CreateDestinationRequestCodec;
+import org.jboss.messaging.core.remoting.codec.CreateDestinationResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CreateSessionRequestCodec;
+import org.jboss.messaging.core.remoting.codec.CreateSessionResponseCodec;
+import org.jboss.messaging.core.remoting.codec.DeleteTemporaryDestinationMessageCodec;
+import org.jboss.messaging.core.remoting.codec.DeliverMessageCodec;
+import org.jboss.messaging.core.remoting.codec.GetClientAOPStackResponseCodec;
+import org.jboss.messaging.core.remoting.codec.GetClientIDResponseCodec;
+import org.jboss.messaging.core.remoting.codec.GetPreparedTransactionsResponseCodec;
+import org.jboss.messaging.core.remoting.codec.GetTopologyResponseCodec;
+import org.jboss.messaging.core.remoting.codec.IDBlockRequestCodec;
+import org.jboss.messaging.core.remoting.codec.IDBlockResponseCodec;
+import org.jboss.messaging.core.remoting.codec.JMSExceptionMessageCodec;
+import org.jboss.messaging.core.remoting.codec.RecoverDeliveriesMessageCodec;
+import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
+import org.jboss.messaging.core.remoting.codec.SendMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SendTransactionMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SetClientIDMessageCodec;
+import org.jboss.messaging.core.remoting.codec.TextPacketCodec;
+import org.jboss.messaging.core.remoting.codec.UnsubscribeMessageCodec;
+import org.jboss.messaging.core.remoting.codec.UpdateCallbackMessageCodec;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageRequest;
+import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
+import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageBlockRequest;
+import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageBlockResponse;
+import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageRequest;
+import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
+import org.jboss.messaging.core.remoting.wireformat.BrowserResetMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetPreparedTransactionsRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetPreparedTransactionsResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetTopologyRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
+import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
+import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
+import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.SendMessage;
+import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
+import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
+import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class PacketCodecFactory extends DemuxingProtocolCodecFactory
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // FIXME: split encoder/decoder required only on client and/or server sides
+ public PacketCodecFactory() throws Exception
+ {
+ addCodecForEmptyPacket(NULL, NullPacket.class);
+
+ addCodec(JMSExceptionMessage.class, JMSExceptionMessageCodec.class);
+
+ // TextPacket are for testing purpose only!
+ addCodec(TextPacket.class, TextPacketCodec.class);
+
+ addCodec(CreateConnectionRequest.class,
+ ConnectionFactoryCreateConnectionRequestCodec.class);
+
+ addCodec(CreateConnectionResponse.class,
+ ConnectionFactoryCreateConnectionResponseCodec.class);
+
+ addCodecForEmptyPacket(PacketType.REQ_GETCLIENTAOPSTACK,
+ GetClientAOPStackRequest.class);
+
+ addCodec(GetClientAOPStackResponse.class,
+ GetClientAOPStackResponseCodec.class);
+
+ addCodecForEmptyPacket(PacketType.REQ_GETTOPOLOGY,
+ GetTopologyRequest.class);
+
+ addCodec(GetTopologyResponse.class, GetTopologyResponseCodec.class);
+
+ addCodec(UpdateCallbackMessage.class, UpdateCallbackMessageCodec.class);
+
+ addCodec(CreateSessionRequest.class, CreateSessionRequestCodec.class);
+
+ addCodec(CreateSessionResponse.class, CreateSessionResponseCodec.class);
+
+ addCodec(IDBlockRequest.class, IDBlockRequestCodec.class);
+
+ addCodec(IDBlockResponse.class, IDBlockResponseCodec.class);
+
+ addCodecForEmptyPacket(PacketType.REQ_GETCLIENTID,
+ GetClientIDRequest.class);
+
+ addCodec(GetClientIDResponse.class, GetClientIDResponseCodec.class);
+
+ addCodec(SetClientIDMessage.class, SetClientIDMessageCodec.class);
+
+ addCodec(SendMessage.class, SendMessageCodec.class);
+
+ addCodec(CreateConsumerRequest.class, CreateConsumerRequestCodec.class);
+
+ addCodec(CreateDestinationRequest.class,
+ CreateDestinationRequestCodec.class);
+
+ addCodec(CreateDestinationResponse.class,
+ CreateDestinationResponseCodec.class);
+
+ addCodec(CreateConsumerResponse.class, CreateConsumerResponseCodec.class);
+
+ addCodec(CreateBrowserRequest.class, CreateBrowserRequestCodec.class);
+
+ addCodec(CreateBrowserResponse.class, CreateBrowserResponseCodec.class);
+
+ addCodecForEmptyPacket(PacketType.MSG_STARTCONNECTION,
+ StartConnectionMessage.class);
+
+ addCodecForEmptyPacket(PacketType.MSG_STOPCONNECTION,
+ StopConnectionMessage.class);
+
+ addCodec(ChangeRateMessage.class, ChangeRateMessageCodec.class);
+
+ addCodec(DeliverMessage.class, DeliverMessageCodec.class);
+
+ addCodec(AcknowledgeDeliveryRequest.class,
+ AcknowledgeDeliveryRequestCodec.class);
+
+ addCodec(AcknowledgeDeliveryResponse.class,
+ AcknowledgeDeliveryResponseCodec.class);
+
+ addCodec(AcknowledgeDeliveriesMessage.class,
+ AcknowledgeDeliveriesRequestCodec.class);
+
+ addCodec(RecoverDeliveriesMessage.class,
+ RecoverDeliveriesMessageCodec.class);
+
+ addCodec(CancelDeliveryMessage.class, CancelDeliveryMessageCodec.class);
+
+ addCodec(CancelDeliveriesMessage.class,
+ CancelDeliveriesMessageCodec.class);
+
+ addCodec(ClosingRequest.class, ClosingRequestCodec.class);
+
+ addCodec(ClosingResponse.class, ClosingResponseCodec.class);
+
+ addCodecForEmptyPacket(PacketType.MSG_CLOSE, CloseMessage.class);
+
+ addCodec(SendTransactionMessage.class, SendTransactionMessageCodec.class);
+
+ addCodecForEmptyPacket(PacketType.REQ_GETPREPAREDTRANSACTIONS,
+ GetPreparedTransactionsRequest.class);
+
+ addCodec(GetPreparedTransactionsResponse.class,
+ GetPreparedTransactionsResponseCodec.class);
+
+ addCodecForEmptyPacket(PacketType.MSG_BROWSER_RESET,
+ BrowserResetMessage.class);
+
+ addCodecForEmptyPacket(PacketType.REQ_BROWSER_HASNEXTMESSAGE,
+ BrowserHasNextMessageRequest.class);
+
+ addCodec(BrowserHasNextMessageResponse.class,
+ BrowserHasNextMessageResponseCodec.class);
+
+ addCodecForEmptyPacket(PacketType.REQ_BROWSER_NEXTMESSAGE,
+ BrowserNextMessageRequest.class);
+
+ addCodec(BrowserNextMessageResponse.class,
+ BrowserNextMessageResponseCodec.class);
+
+ addCodec(BrowserNextMessageBlockRequest.class,
+ BrowserNextMessageBlockRequestCodec.class);
+
+ addCodec(BrowserNextMessageBlockResponse.class,
+ BrowserNextMessageBlockResponseCodec.class);
+
+ addCodec(UnsubscribeMessage.class, UnsubscribeMessageCodec.class);
+
+ addCodec(AddTemporaryDestinationMessage.class,
+ AddTemporaryDestinationMessageCodec.class);
+
+ addCodec(DeleteTemporaryDestinationMessage.class,
+ DeleteTemporaryDestinationMessageCodec.class);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // FIXME generics definition should be in term of <P>...
+ private void addCodec(
+ Class<? extends AbstractPacket> packetClass,
+ Class<? extends AbstractPacketCodec<? extends AbstractPacket>> codecClass) throws Exception
+ {
+ AbstractPacketCodec<? extends AbstractPacket> codec = codecClass
+ .newInstance();
+ MinaPacketCodec<AbstractPacket> minaCodec = new MinaPacketCodec(codec);
+ super.addMessageDecoder(minaCodec);
+ super.addMessageEncoder(packetClass, minaCodec);
+ }
+
+ private void addCodecForEmptyPacket(PacketType type,
+ Class<? extends AbstractPacket> packetClass)
+ {
+ AbstractPacketCodec<AbstractPacket> codec = createCodecForEmptyPacket(
+ type, packetClass);
+ MinaPacketCodec<AbstractPacket> minaCodec = new MinaPacketCodec<AbstractPacket>(
+ codec);
+ super.addMessageDecoder(minaCodec);
+ super.addMessageEncoder(packetClass, minaCodec);
+ }
+
+ public static AbstractPacketCodec<AbstractPacket> createCodecForEmptyPacket(
+ PacketType type, final Class<? extends AbstractPacket> clazz)
+ {
+ return new CodecForEmptyPacket<AbstractPacket>(type)
+ {
+ @Override
+ protected AbstractPacket newPacket()
+ {
+ try
+ {
+ return (AbstractPacket) clazz.newInstance();
+ } catch (Throwable t)
+ {
+ return null;
+ }
+ }
+ };
+ }
+
+ // Inner classes -------------------------------------------------
+
+ abstract static class CodecForEmptyPacket<P extends AbstractPacket> extends
+ AbstractPacketCodec<P>
+ {
+
+ public CodecForEmptyPacket(PacketType type)
+ {
+ super(type);
+ }
+
+ @Override
+ protected void encodeBody(P packet, RemotingBuffer out) throws Exception
+ {
+ // no body
+ out.putInt(0);
+ }
+
+ @Override
+ protected P decodeBody(RemotingBuffer in) throws Exception
+ {
+ in.getInt(); // skip body length
+ return newPacket();
+ }
+
+ protected abstract P newPacket();
+ }
+}
Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaClientTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaClientTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaClientTest.java 2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,169 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.integration;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.integration.MinaConnector;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.test.messaging.core.remoting.TestPacketHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class MinaClientTest extends TestSupport
+{
+ private ReversePacketHandler serverPacketHandler;
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testConnected() throws Exception
+ {
+ Client client = new Client(new MinaConnector());
+
+ assertFalse(client.isConnected());
+
+ client.connect("localhost", PORT, TCP);
+ assertTrue(client.isConnected());
+
+ assertTrue(client.disconnect());
+ assertFalse(client.isConnected());
+ assertFalse(client.disconnect());
+ }
+
+ public void testSendOneWay() throws Exception
+ {
+ serverPacketHandler.expectMessage(1);
+
+ TextPacket packet = new TextPacket("testSendOneWay");
+ packet.setVersion((byte) 1);
+ packet.setTargetID(serverPacketHandler.getID());
+ client.sendOneWay(packet);
+
+ assertTrue(serverPacketHandler.await(2, SECONDS));
+
+ List<TextPacket> messages = serverPacketHandler.getPackets();
+ assertEquals(1, messages.size());
+ String response = ((TextPacket) messages.get(0)).getText();
+ assertEquals(packet.getText(), response);
+ }
+
+ public void testSendManyOneWay() throws Exception
+ {
+ serverPacketHandler.expectMessage(MANY_MESSAGES);
+
+ TextPacket[] packets = new TextPacket[MANY_MESSAGES];
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ packets[i] = new TextPacket("testSendManyOneWay " + i);
+ packets[i].setVersion((byte) 1);
+ packets[i].setTargetID(serverPacketHandler.getID());
+ client.sendOneWay(packets[i]);
+ }
+
+ assertTrue(serverPacketHandler.await(10, SECONDS));
+
+ List<TextPacket> receivedPackets = serverPacketHandler.getPackets();
+ assertEquals(MANY_MESSAGES, receivedPackets.size());
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ TextPacket receivedPacket = (TextPacket) receivedPackets.get(i);
+ assertEquals(packets[i].getText(), receivedPacket.getText());
+ }
+ }
+
+ public void testSendOneWayWithCallbackHandler() throws Exception
+ {
+ TestPacketHandler callbackHandler = new TestPacketHandler();
+ callbackHandler.expectMessage(1);
+
+ PacketDispatcher.client.register(callbackHandler);
+
+ TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
+ packet.setVersion((byte) 1);
+ packet.setTargetID(serverPacketHandler.getID());
+ packet.setCallbackID(callbackHandler.getID());
+
+ client.sendOneWay(packet);
+
+ assertTrue(callbackHandler.await(5, SECONDS));
+
+ assertEquals(1, callbackHandler.getPackets().size());
+ String response = callbackHandler.getPackets().get(0).getText();
+ assertEquals(reverse(packet.getText()), response);
+ }
+
+ public void testSendBlocking() throws Exception
+ {
+ TextPacket request = new TextPacket("testSendBlocking");
+ request.setVersion((byte) 1);
+ request.setTargetID(serverPacketHandler.getID());
+
+ AbstractPacket receivedPacket = client.sendBlocking(request);
+
+ assertNotNull(receivedPacket);
+ assertTrue(receivedPacket instanceof TextPacket);
+ TextPacket response = (TextPacket) receivedPacket;
+ assertEquals(reverse(request.getText()), response.getText());
+ }
+
+ public void testSendBlockingWithTimeout() throws Exception
+ {
+ client.setBlockingRequestTimeout(500, MILLISECONDS);
+ serverPacketHandler.setSleepTime(1000, MILLISECONDS);
+
+ AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
+ packet.setVersion((byte) 1);
+
+ try
+ {
+ client.sendBlocking(packet);
+ fail("a IOException should be thrown");
+ } catch (IOException e)
+ {
+ }
+ }
+
+ // TestCase implementation ---------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startServer(TestSupport.PORT, TRANSPORT);
+ startClient(TestSupport.PORT, TRANSPORT);
+
+ serverPacketHandler = new ReversePacketHandler();
+ PacketDispatcher.server.register(serverPacketHandler);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
+ client.disconnect();
+ stopServer();
+ }
+}
Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/ReversePacketHandler.java (from rev 3448, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/ReversePacketHandler.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/ReversePacketHandler.java 2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.integration;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static junit.framework.Assert.fail;
+import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_ID_SET;
+import static org.jboss.test.messaging.core.remoting.integration.TestSupport.reverse;
+
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.test.messaging.core.remoting.TestPacketHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ReversePacketHandler extends TestPacketHandler
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private int sleepTime;
+ private TimeUnit timeUnit;
+ private PacketSender lastSender;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void setSleepTime(int sleepTime, TimeUnit timeUnit)
+ {
+ this.sleepTime = sleepTime;
+ this.timeUnit = timeUnit;
+ }
+
+ public PacketSender getLastSender()
+ {
+ return lastSender;
+ }
+
+ // TestPacketHandler overrides -----------------------------------
+
+ @Override
+ protected void doHandle(AbstractPacket packet, PacketSender sender)
+ {
+ Assert.assertTrue(packet instanceof TextPacket);
+
+ lastSender = sender;
+
+ if (sleepTime > 0)
+ {
+ try
+ {
+ Thread.sleep(MILLISECONDS.convert(sleepTime, timeUnit));
+ } catch (InterruptedException e)
+ {
+ fail();
+ }
+ }
+
+ TextPacket message = (TextPacket) packet;
+ if (message.isRequest() || message.getCallbackID() != NO_ID_SET)
+ {
+ TextPacket response = new TextPacket(reverse(message.getText()));
+ response.normalize(message);
+ sender.send(response);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TargetHandlerTest.java (from rev 3446, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TargetHandlerTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TargetHandlerTest.java 2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.integration;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.List;
+
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.test.messaging.core.remoting.TestPacketHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class TargetHandlerTest extends TestSupport
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private ReversePacketHandler serverPacketHandler;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testClientHandlePacketSentByServer() throws Exception
+ {
+ TestPacketHandler clientHandler = new TestPacketHandler();
+ PacketDispatcher.client.register(clientHandler);
+
+ serverPacketHandler.expectMessage(1);
+ clientHandler.expectMessage(1);
+
+ TextPacket packet = new TextPacket(
+ "testClientHandlePacketSentByServer from client");
+ packet.setVersion((byte) 1);
+ packet.setTargetID(serverPacketHandler.getID());
+ // send a packet to create a sender when the server
+ // handles the packet
+ client.sendOneWay(packet);
+
+ assertTrue(serverPacketHandler.await(2, SECONDS));
+
+ assertNotNull(serverPacketHandler.getLastSender());
+ PacketSender sender = serverPacketHandler.getLastSender();
+ TextPacket packetFromServer = new TextPacket(
+ "testClientHandlePacketSentByServer from server");
+ packetFromServer.setVersion((byte) 1);
+ packetFromServer.setTargetID(clientHandler.getID());
+ sender.send(packetFromServer);
+
+ assertTrue(clientHandler.await(2, SECONDS));
+
+ List<TextPacket> packets = clientHandler.getPackets();
+ assertEquals(1, packets.size());
+ TextPacket packetReceivedByClient = (TextPacket) packets.get(0);
+ assertEquals(packetFromServer.getText(), packetReceivedByClient.getText());
+ }
+
+ // TestCase overrides --------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ startServer(PORT, TRANSPORT);
+ startClient(PORT, TRANSPORT);
+
+ serverPacketHandler = new ReversePacketHandler();
+ PacketDispatcher.server.register(serverPacketHandler);
+ }
+
+ public void tearDown() throws Exception
+ {
+ PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
+ client.disconnect();
+ stopServer();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TestSupport.java (from rev 3446, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TestSupport.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TestSupport.java 2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,109 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.integration;
+
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.integration.MinaConnector;
+import org.jboss.messaging.core.remoting.integration.MinaService;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public abstract class TestSupport extends TestCase
+{
+ // Constants -----------------------------------------------------
+
+ public static final int MANY_MESSAGES = 500;
+
+ /**
+ * Configurable by system property <code>transport.type</code>, default is
+ * TCP
+ */
+ public final static TransportType TRANSPORT;
+
+ // Attributes ----------------------------------------------------
+
+ Client client;
+
+ private MinaService service;
+
+ public static final int PORT = 9090;
+
+ // Static --------------------------------------------------------
+
+ static
+ {
+ String transportType = System.getProperty("transport.type", TCP
+ .toString());
+ TRANSPORT = TransportType.valueOf(transportType);
+ info("Default transport is " + TRANSPORT);
+ }
+
+ static String reverse(String text)
+ {
+ // Reverse text
+ StringBuffer buf = new StringBuffer(text.length());
+ for (int i = text.length() - 1; i >= 0; i--)
+ {
+ buf.append(text.charAt(i));
+ }
+ return buf.toString();
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ void startServer(int port, TransportType transport) throws Exception
+ {
+ startServer(port, transport, false);
+ }
+
+ void startServer(int port, TransportType transport, boolean useSSL)
+ throws Exception
+ {
+ service = new MinaService();
+ service.setPort(port);
+ service.start();
+ }
+
+ void stopServer()
+ {
+ service.stop();
+ }
+
+ void startClient(int port, TransportType transport) throws Exception
+ {
+ startClient(port, transport, false);
+ }
+
+ void startClient(int port, TransportType transport, boolean useSSL)
+ throws Exception
+ {
+ client = new Client(new MinaConnector());
+ client.connect("localhost", port, transport, useSSL);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private static void info(String info)
+ {
+ System.out.format("### %-50s ###\n", info);
+ }
+
+ // Inner classes -------------------------------------------------
+}
More information about the jboss-cvs-commits
mailing list