[jboss-cvs] JBoss Messaging SVN: r3459 - in branches/Branch_JBMESSAGING-544: tests/src/org/jboss/test/messaging/core/remoting/integration and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 10 08:12:48 EST 2007


Author: jmesnil
Date: 2007-12-10 08:12:48 -0500 (Mon, 10 Dec 2007)
New Revision: 3459

Added:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/PacketCodecFactory.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaClientTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/ReversePacketHandler.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TargetHandlerTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TestSupport.java
Modified:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544 Replace client-server transport with NIO based transport
* refactoring + code clean up

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java	2007-12-10 13:11:38 UTC (rev 3458)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaConnector.java	2007-12-10 13:12:48 UTC (rev 3459)
@@ -31,7 +31,6 @@
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java	2007-12-10 13:11:38 UTC (rev 3458)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaService.java	2007-12-10 13:12:48 UTC (rev 3459)
@@ -15,7 +15,6 @@
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.codec.PacketCodecFactory;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java	2007-12-10 13:11:38 UTC (rev 3458)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/MinaSession.java	2007-12-10 13:12:48 UTC (rev 3459)
@@ -6,7 +6,11 @@
  */
 package org.jboss.messaging.core.remoting.integration;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.reqres.Request;
+import org.apache.mina.filter.reqres.Response;
 import org.jboss.messaging.core.remoting.NIOSession;
 
 /**
@@ -36,25 +40,25 @@
 
    // Public --------------------------------------------------------
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.remoting.integration.NIOSession#getID()
-    */
    public long getID()
    {
       return session.getId();
    }
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.remoting.integration.NIOSession#write(java.lang.Object)
-    */
    public void write(Object object)
    {
       session.write(object);
    }
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.remoting.integration.NIOSession#isConnected()
-    */
+   public Object writeAndBlock(long requestID, Object object, long timeout,
+         TimeUnit timeUnit) throws Throwable
+   {
+      Request req = new Request(requestID, object, timeout, timeUnit);
+      session.write(req);
+      Response response = req.awaitResponse();
+      return response.getMessage();
+   }
+
    public boolean isConnected()
    {
       return session.isConnected();

Copied: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/PacketCodecFactory.java (from rev 3446, branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/PacketCodecFactory.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/integration/PacketCodecFactory.java	2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,327 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.integration;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
+
+import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
+import org.jboss.messaging.core.remoting.codec.AbstractPacketCodec;
+import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveriesRequestCodec;
+import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveryRequestCodec;
+import org.jboss.messaging.core.remoting.codec.AcknowledgeDeliveryResponseCodec;
+import org.jboss.messaging.core.remoting.codec.AddTemporaryDestinationMessageCodec;
+import org.jboss.messaging.core.remoting.codec.BrowserHasNextMessageResponseCodec;
+import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockRequestCodec;
+import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockResponseCodec;
+import org.jboss.messaging.core.remoting.codec.BrowserNextMessageResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CancelDeliveriesMessageCodec;
+import org.jboss.messaging.core.remoting.codec.CancelDeliveryMessageCodec;
+import org.jboss.messaging.core.remoting.codec.ChangeRateMessageCodec;
+import org.jboss.messaging.core.remoting.codec.ClosingRequestCodec;
+import org.jboss.messaging.core.remoting.codec.ClosingResponseCodec;
+import org.jboss.messaging.core.remoting.codec.ConnectionFactoryCreateConnectionRequestCodec;
+import org.jboss.messaging.core.remoting.codec.ConnectionFactoryCreateConnectionResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CreateBrowserRequestCodec;
+import org.jboss.messaging.core.remoting.codec.CreateBrowserResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CreateConsumerRequestCodec;
+import org.jboss.messaging.core.remoting.codec.CreateConsumerResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CreateDestinationRequestCodec;
+import org.jboss.messaging.core.remoting.codec.CreateDestinationResponseCodec;
+import org.jboss.messaging.core.remoting.codec.CreateSessionRequestCodec;
+import org.jboss.messaging.core.remoting.codec.CreateSessionResponseCodec;
+import org.jboss.messaging.core.remoting.codec.DeleteTemporaryDestinationMessageCodec;
+import org.jboss.messaging.core.remoting.codec.DeliverMessageCodec;
+import org.jboss.messaging.core.remoting.codec.GetClientAOPStackResponseCodec;
+import org.jboss.messaging.core.remoting.codec.GetClientIDResponseCodec;
+import org.jboss.messaging.core.remoting.codec.GetPreparedTransactionsResponseCodec;
+import org.jboss.messaging.core.remoting.codec.GetTopologyResponseCodec;
+import org.jboss.messaging.core.remoting.codec.IDBlockRequestCodec;
+import org.jboss.messaging.core.remoting.codec.IDBlockResponseCodec;
+import org.jboss.messaging.core.remoting.codec.JMSExceptionMessageCodec;
+import org.jboss.messaging.core.remoting.codec.RecoverDeliveriesMessageCodec;
+import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
+import org.jboss.messaging.core.remoting.codec.SendMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SendTransactionMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SetClientIDMessageCodec;
+import org.jboss.messaging.core.remoting.codec.TextPacketCodec;
+import org.jboss.messaging.core.remoting.codec.UnsubscribeMessageCodec;
+import org.jboss.messaging.core.remoting.codec.UpdateCallbackMessageCodec;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageRequest;
+import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
+import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageBlockRequest;
+import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageBlockResponse;
+import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageRequest;
+import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
+import org.jboss.messaging.core.remoting.wireformat.BrowserResetMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetPreparedTransactionsRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetPreparedTransactionsResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetTopologyRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
+import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
+import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
+import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.SendMessage;
+import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
+import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
+import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class PacketCodecFactory extends DemuxingProtocolCodecFactory
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // FIXME: split encoder/decoder required only on client and/or server sides
+   public PacketCodecFactory() throws Exception
+   {
+      addCodecForEmptyPacket(NULL, NullPacket.class);
+
+      addCodec(JMSExceptionMessage.class, JMSExceptionMessageCodec.class);
+
+      // TextPacket are for testing purpose only!
+      addCodec(TextPacket.class, TextPacketCodec.class);
+
+      addCodec(CreateConnectionRequest.class,
+            ConnectionFactoryCreateConnectionRequestCodec.class);
+
+      addCodec(CreateConnectionResponse.class,
+            ConnectionFactoryCreateConnectionResponseCodec.class);
+
+      addCodecForEmptyPacket(PacketType.REQ_GETCLIENTAOPSTACK,
+            GetClientAOPStackRequest.class);
+
+      addCodec(GetClientAOPStackResponse.class,
+            GetClientAOPStackResponseCodec.class);
+
+      addCodecForEmptyPacket(PacketType.REQ_GETTOPOLOGY,
+            GetTopologyRequest.class);
+
+      addCodec(GetTopologyResponse.class, GetTopologyResponseCodec.class);
+
+      addCodec(UpdateCallbackMessage.class, UpdateCallbackMessageCodec.class);
+
+      addCodec(CreateSessionRequest.class, CreateSessionRequestCodec.class);
+
+      addCodec(CreateSessionResponse.class, CreateSessionResponseCodec.class);
+
+      addCodec(IDBlockRequest.class, IDBlockRequestCodec.class);
+
+      addCodec(IDBlockResponse.class, IDBlockResponseCodec.class);
+
+      addCodecForEmptyPacket(PacketType.REQ_GETCLIENTID,
+            GetClientIDRequest.class);
+
+      addCodec(GetClientIDResponse.class, GetClientIDResponseCodec.class);
+
+      addCodec(SetClientIDMessage.class, SetClientIDMessageCodec.class);
+
+      addCodec(SendMessage.class, SendMessageCodec.class);
+
+      addCodec(CreateConsumerRequest.class, CreateConsumerRequestCodec.class);
+
+      addCodec(CreateDestinationRequest.class,
+            CreateDestinationRequestCodec.class);
+
+      addCodec(CreateDestinationResponse.class,
+            CreateDestinationResponseCodec.class);
+
+      addCodec(CreateConsumerResponse.class, CreateConsumerResponseCodec.class);
+
+      addCodec(CreateBrowserRequest.class, CreateBrowserRequestCodec.class);
+
+      addCodec(CreateBrowserResponse.class, CreateBrowserResponseCodec.class);
+
+      addCodecForEmptyPacket(PacketType.MSG_STARTCONNECTION,
+            StartConnectionMessage.class);
+
+      addCodecForEmptyPacket(PacketType.MSG_STOPCONNECTION,
+            StopConnectionMessage.class);
+
+      addCodec(ChangeRateMessage.class, ChangeRateMessageCodec.class);
+
+      addCodec(DeliverMessage.class, DeliverMessageCodec.class);
+
+      addCodec(AcknowledgeDeliveryRequest.class,
+            AcknowledgeDeliveryRequestCodec.class);
+
+      addCodec(AcknowledgeDeliveryResponse.class,
+            AcknowledgeDeliveryResponseCodec.class);
+
+      addCodec(AcknowledgeDeliveriesMessage.class,
+            AcknowledgeDeliveriesRequestCodec.class);
+
+      addCodec(RecoverDeliveriesMessage.class,
+            RecoverDeliveriesMessageCodec.class);
+
+      addCodec(CancelDeliveryMessage.class, CancelDeliveryMessageCodec.class);
+
+      addCodec(CancelDeliveriesMessage.class,
+            CancelDeliveriesMessageCodec.class);
+
+      addCodec(ClosingRequest.class, ClosingRequestCodec.class);
+
+      addCodec(ClosingResponse.class, ClosingResponseCodec.class);
+
+      addCodecForEmptyPacket(PacketType.MSG_CLOSE, CloseMessage.class);
+
+      addCodec(SendTransactionMessage.class, SendTransactionMessageCodec.class);
+
+      addCodecForEmptyPacket(PacketType.REQ_GETPREPAREDTRANSACTIONS,
+            GetPreparedTransactionsRequest.class);
+
+      addCodec(GetPreparedTransactionsResponse.class,
+            GetPreparedTransactionsResponseCodec.class);
+
+      addCodecForEmptyPacket(PacketType.MSG_BROWSER_RESET,
+            BrowserResetMessage.class);
+
+      addCodecForEmptyPacket(PacketType.REQ_BROWSER_HASNEXTMESSAGE,
+            BrowserHasNextMessageRequest.class);
+
+      addCodec(BrowserHasNextMessageResponse.class,
+            BrowserHasNextMessageResponseCodec.class);
+
+      addCodecForEmptyPacket(PacketType.REQ_BROWSER_NEXTMESSAGE,
+            BrowserNextMessageRequest.class);
+
+      addCodec(BrowserNextMessageResponse.class,
+            BrowserNextMessageResponseCodec.class);
+
+      addCodec(BrowserNextMessageBlockRequest.class,
+            BrowserNextMessageBlockRequestCodec.class);
+
+      addCodec(BrowserNextMessageBlockResponse.class,
+            BrowserNextMessageBlockResponseCodec.class);
+
+      addCodec(UnsubscribeMessage.class, UnsubscribeMessageCodec.class);
+
+      addCodec(AddTemporaryDestinationMessage.class,
+            AddTemporaryDestinationMessageCodec.class);
+
+      addCodec(DeleteTemporaryDestinationMessage.class,
+            DeleteTemporaryDestinationMessageCodec.class);
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // FIXME generics definition should be in term of <P>...
+   private void addCodec(
+         Class<? extends AbstractPacket> packetClass,
+         Class<? extends AbstractPacketCodec<? extends AbstractPacket>> codecClass) throws Exception
+   {
+      AbstractPacketCodec<? extends AbstractPacket> codec = codecClass
+            .newInstance();
+      MinaPacketCodec<AbstractPacket> minaCodec = new MinaPacketCodec(codec);
+      super.addMessageDecoder(minaCodec);
+      super.addMessageEncoder(packetClass, minaCodec);
+   }
+
+   private void addCodecForEmptyPacket(PacketType type,
+         Class<? extends AbstractPacket> packetClass)
+   {
+      AbstractPacketCodec<AbstractPacket> codec = createCodecForEmptyPacket(
+            type, packetClass);
+      MinaPacketCodec<AbstractPacket> minaCodec = new MinaPacketCodec<AbstractPacket>(
+            codec);
+      super.addMessageDecoder(minaCodec);
+      super.addMessageEncoder(packetClass, minaCodec);
+   }
+
+   public static AbstractPacketCodec<AbstractPacket> createCodecForEmptyPacket(
+         PacketType type, final Class<? extends AbstractPacket> clazz)
+   {
+      return new CodecForEmptyPacket<AbstractPacket>(type)
+      {
+         @Override
+         protected AbstractPacket newPacket()
+         {
+            try
+            {
+               return (AbstractPacket) clazz.newInstance();
+            } catch (Throwable t)
+            {
+               return null;
+            }
+         }
+      };
+   }
+
+   // Inner classes -------------------------------------------------
+
+   abstract static class CodecForEmptyPacket<P extends AbstractPacket> extends
+         AbstractPacketCodec<P>
+   {
+
+      public CodecForEmptyPacket(PacketType type)
+      {
+         super(type);
+      }
+
+      @Override
+      protected void encodeBody(P packet, RemotingBuffer out) throws Exception
+      {
+         // no body
+         out.putInt(0);
+      }
+
+      @Override
+      protected P decodeBody(RemotingBuffer in) throws Exception
+      {
+         in.getInt(); // skip body length
+         return newPacket();
+      }
+
+      protected abstract P newPacket();
+   }
+}

Added: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaClientTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaClientTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/MinaClientTest.java	2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,169 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.integration;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.integration.MinaConnector;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.test.messaging.core.remoting.TestPacketHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class MinaClientTest extends TestSupport
+{
+   private ReversePacketHandler serverPacketHandler;
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testConnected() throws Exception
+   {
+      Client client = new Client(new MinaConnector());
+
+      assertFalse(client.isConnected());
+
+      client.connect("localhost", PORT, TCP);
+      assertTrue(client.isConnected());
+
+      assertTrue(client.disconnect());
+      assertFalse(client.isConnected());
+      assertFalse(client.disconnect());
+   }
+      
+   public void testSendOneWay() throws Exception
+   {
+      serverPacketHandler.expectMessage(1);
+
+      TextPacket packet = new TextPacket("testSendOneWay");
+      packet.setVersion((byte) 1);
+      packet.setTargetID(serverPacketHandler.getID());
+      client.sendOneWay(packet);
+
+      assertTrue(serverPacketHandler.await(2, SECONDS));
+
+      List<TextPacket> messages = serverPacketHandler.getPackets();
+      assertEquals(1, messages.size());
+      String response = ((TextPacket) messages.get(0)).getText();
+      assertEquals(packet.getText(), response);
+   }
+
+   public void testSendManyOneWay() throws Exception
+   {
+      serverPacketHandler.expectMessage(MANY_MESSAGES);
+
+      TextPacket[] packets = new TextPacket[MANY_MESSAGES];
+      for (int i = 0; i < MANY_MESSAGES; i++)
+      {
+         packets[i] = new TextPacket("testSendManyOneWay " + i);
+         packets[i].setVersion((byte) 1);
+         packets[i].setTargetID(serverPacketHandler.getID());
+         client.sendOneWay(packets[i]);
+      }
+
+      assertTrue(serverPacketHandler.await(10, SECONDS));
+
+      List<TextPacket> receivedPackets = serverPacketHandler.getPackets();
+      assertEquals(MANY_MESSAGES, receivedPackets.size());
+      for (int i = 0; i < MANY_MESSAGES; i++)
+      {
+         TextPacket receivedPacket = (TextPacket) receivedPackets.get(i);
+         assertEquals(packets[i].getText(), receivedPacket.getText());
+      }
+   }
+
+   public void testSendOneWayWithCallbackHandler() throws Exception
+   {
+      TestPacketHandler callbackHandler = new TestPacketHandler();
+      callbackHandler.expectMessage(1);
+
+      PacketDispatcher.client.register(callbackHandler);
+
+      TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
+      packet.setVersion((byte) 1);
+      packet.setTargetID(serverPacketHandler.getID());
+      packet.setCallbackID(callbackHandler.getID());
+
+      client.sendOneWay(packet);
+
+      assertTrue(callbackHandler.await(5, SECONDS));
+
+      assertEquals(1, callbackHandler.getPackets().size());
+      String response = callbackHandler.getPackets().get(0).getText();
+      assertEquals(reverse(packet.getText()), response);
+   }
+
+   public void testSendBlocking() throws Exception
+   {
+      TextPacket request = new TextPacket("testSendBlocking");
+      request.setVersion((byte) 1);
+      request.setTargetID(serverPacketHandler.getID());
+
+      AbstractPacket receivedPacket = client.sendBlocking(request);
+
+      assertNotNull(receivedPacket);
+      assertTrue(receivedPacket instanceof TextPacket);
+      TextPacket response = (TextPacket) receivedPacket;
+      assertEquals(reverse(request.getText()), response.getText());
+   }
+
+   public void testSendBlockingWithTimeout() throws Exception
+   {
+      client.setBlockingRequestTimeout(500, MILLISECONDS);
+      serverPacketHandler.setSleepTime(1000, MILLISECONDS);
+
+      AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
+      packet.setVersion((byte) 1);
+
+      try
+      {
+         client.sendBlocking(packet);
+         fail("a IOException should be thrown");
+      } catch (IOException e)
+      {
+      }
+   }
+
+   // TestCase implementation ---------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      startServer(TestSupport.PORT, TRANSPORT);
+      startClient(TestSupport.PORT, TRANSPORT);
+
+      serverPacketHandler = new ReversePacketHandler();
+      PacketDispatcher.server.register(serverPacketHandler);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
+      client.disconnect();
+      stopServer();
+   }
+}

Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/ReversePacketHandler.java (from rev 3448, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReversePacketHandler.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/ReversePacketHandler.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/ReversePacketHandler.java	2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.integration;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static junit.framework.Assert.fail;
+import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_ID_SET;
+import static org.jboss.test.messaging.core.remoting.integration.TestSupport.reverse;
+
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.test.messaging.core.remoting.TestPacketHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class ReversePacketHandler extends TestPacketHandler
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private int sleepTime;
+   private TimeUnit timeUnit;
+   private PacketSender lastSender;
+ 
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void setSleepTime(int sleepTime, TimeUnit timeUnit)
+   {
+      this.sleepTime = sleepTime;
+      this.timeUnit = timeUnit;
+   }
+   
+   public PacketSender getLastSender()
+   {
+      return lastSender;
+   }
+   
+   // TestPacketHandler overrides -----------------------------------
+   
+   @Override
+   protected void doHandle(AbstractPacket packet, PacketSender sender)
+   {
+      Assert.assertTrue(packet instanceof TextPacket);
+
+      lastSender = sender;
+
+      if (sleepTime > 0)
+      {
+         try
+         {
+            Thread.sleep(MILLISECONDS.convert(sleepTime, timeUnit));
+         } catch (InterruptedException e)
+         {
+            fail();
+         }
+      }
+      
+      TextPacket message = (TextPacket) packet;
+      if (message.isRequest() || message.getCallbackID() != NO_ID_SET)
+      {
+         TextPacket response = new TextPacket(reverse(message.getText()));
+         response.normalize(message);
+         sender.send(response);
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TargetHandlerTest.java (from rev 3446, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TargetHandlerTest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TargetHandlerTest.java	2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.integration;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.List;
+
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.test.messaging.core.remoting.TestPacketHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class TargetHandlerTest extends TestSupport
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private ReversePacketHandler serverPacketHandler;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testClientHandlePacketSentByServer() throws Exception
+   {
+      TestPacketHandler clientHandler = new TestPacketHandler();
+      PacketDispatcher.client.register(clientHandler);
+
+      serverPacketHandler.expectMessage(1);
+      clientHandler.expectMessage(1);
+
+      TextPacket packet = new TextPacket(
+            "testClientHandlePacketSentByServer from client");
+      packet.setVersion((byte) 1);
+      packet.setTargetID(serverPacketHandler.getID());
+      // send a packet to create a sender when the server
+      // handles the packet
+      client.sendOneWay(packet);
+
+      assertTrue(serverPacketHandler.await(2, SECONDS));
+
+      assertNotNull(serverPacketHandler.getLastSender());
+      PacketSender sender = serverPacketHandler.getLastSender();
+      TextPacket packetFromServer = new TextPacket(
+            "testClientHandlePacketSentByServer from server");
+      packetFromServer.setVersion((byte) 1);
+      packetFromServer.setTargetID(clientHandler.getID());
+      sender.send(packetFromServer);
+
+      assertTrue(clientHandler.await(2, SECONDS));
+
+      List<TextPacket> packets = clientHandler.getPackets();
+      assertEquals(1, packets.size());
+      TextPacket packetReceivedByClient = (TextPacket) packets.get(0);
+      assertEquals(packetFromServer.getText(), packetReceivedByClient.getText());
+   }
+
+   // TestCase overrides --------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      startServer(PORT, TRANSPORT);
+      startClient(PORT, TRANSPORT);
+      
+      serverPacketHandler = new ReversePacketHandler();
+      PacketDispatcher.server.register(serverPacketHandler);
+   }
+
+   public void tearDown() throws Exception
+   {
+      PacketDispatcher.server.unregister(serverPacketHandler.getID());
+
+      client.disconnect();
+      stopServer();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Copied: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TestSupport.java (from rev 3446, branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TestSupport.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TestSupport.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/integration/TestSupport.java	2007-12-10 13:12:48 UTC (rev 3459)
@@ -0,0 +1,109 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.core.remoting.integration;
+
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.integration.MinaConnector;
+import org.jboss.messaging.core.remoting.integration.MinaService;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public abstract class TestSupport extends TestCase
+{
+   // Constants -----------------------------------------------------
+
+   public static final int MANY_MESSAGES = 500;
+
+   /**
+    * Configurable by system property <code>transport.type</code>, default is
+    * TCP
+    */
+   public final static TransportType TRANSPORT;
+
+   // Attributes ----------------------------------------------------
+
+   Client client;
+
+   private MinaService service;
+
+   public static final int PORT = 9090;
+
+   // Static --------------------------------------------------------
+
+   static
+   {
+      String transportType = System.getProperty("transport.type", TCP
+            .toString());
+      TRANSPORT = TransportType.valueOf(transportType);
+      info("Default transport is " + TRANSPORT);
+   }
+
+   static String reverse(String text)
+   {
+      // Reverse text
+      StringBuffer buf = new StringBuffer(text.length());
+      for (int i = text.length() - 1; i >= 0; i--)
+      {
+         buf.append(text.charAt(i));
+      }
+      return buf.toString();
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   void startServer(int port, TransportType transport) throws Exception
+   {
+      startServer(port, transport, false);
+   }
+
+   void startServer(int port, TransportType transport, boolean useSSL)
+         throws Exception
+   {
+      service = new MinaService();
+      service.setPort(port);
+      service.start();
+   }
+   
+   void stopServer()
+   {
+      service.stop();
+   }
+
+   void startClient(int port, TransportType transport) throws Exception
+   {
+      startClient(port, transport, false);
+   }
+
+   void startClient(int port, TransportType transport, boolean useSSL)
+         throws Exception
+   {
+      client = new Client(new MinaConnector());
+      client.connect("localhost", port, transport, useSSL);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private static void info(String info)
+   {
+      System.out.format("### %-50s ###\n", info);
+   }
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list