[jboss-cvs] JBoss Messaging SVN: r3376 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/client/delegate and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 28 08:24:40 EST 2007


Author: jmesnil
Date: 2007-11-28 08:24:40 -0500 (Wed, 28 Nov 2007)
New Revision: 3376

Modified:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/ServerHandler.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
* replaced MINA's IoSession by our own PacketReplier interface in the new remoting API
* tweaked MINA port to be able to run remote-tests

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -23,7 +23,6 @@
 
 import javax.jms.MessageListener;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.delegate.DelegateSupport;
@@ -40,6 +39,7 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -130,7 +130,7 @@
             return consumerID;
          }
 
-         public void handle(AbstractPacket packet, IoSession session)
+         public void handle(AbstractPacket packet, PacketReplier replier)
          {
             try
             {

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -21,6 +21,7 @@
  */
 package org.jboss.jms.client.delegate;
 
+import static org.jboss.messaging.core.remoting.Constants.PORT;
 import static org.jboss.messaging.core.remoting.TransportType.TCP;
 
 import java.io.DataInputStream;
@@ -39,7 +40,6 @@
 import org.jboss.jms.exception.MessagingNetworkFailureException;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.wireformat.JMSWireFormat;
-import org.jboss.messaging.core.remoting.Constants;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
 import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackRequest;
@@ -365,9 +365,10 @@
       
       try
       {
+         System.err.println("### " + serverLocatorURI);
          InvokerLocator locator = new InvokerLocator(serverLocatorURI);
          client = new org.jboss.messaging.core.remoting.Client();
-         client.connect(locator.getHost(), Constants.PORT, TCP);
+         client.connect(locator.getHost(), locator.getPort() + 1000, TCP);
       }
       catch (Exception e)
       {

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -34,7 +34,6 @@
 import javax.jms.JMSException;
 import javax.jms.Message;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.jms.delegate.BrowserEndpoint;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.selector.Selector;
@@ -44,6 +43,7 @@
 import org.jboss.messaging.core.contract.Filter;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
 import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
@@ -273,7 +273,7 @@
          return id;
       }
       
-      public void handle(AbstractPacket packet, IoSession session)
+      public void handle(AbstractPacket packet, PacketReplier replier)
       {
          try
          {
@@ -282,20 +282,20 @@
             {
                BrowserHasNextMessageResponse response = new BrowserHasNextMessageResponse(hasNextMessage());
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_BROWSER_NEXTMESSAGE)
             {
                JBossMessage message = nextMessage();
                BrowserNextMessageResponse response = new BrowserNextMessageResponse(message);
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_BROWSER_RESET)
             {
                reset();
                
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_CLOSING)
             {
                ClosingRequest request = (ClosingRequest) packet;
@@ -303,14 +303,14 @@
                
                ClosingResponse response = new ClosingResponse(id);
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_CLOSE)
                {
                   close();
                   
                   NullPacket response = new NullPacket();
                   response.normalize(packet);
-                  session.write(response);
+                  replier.reply(response);
             } else {
                log.error("Unsupported packet for browser: " + packet);
             }
@@ -318,7 +318,7 @@
          {
             JMSExceptionMessage message = new JMSExceptionMessage(e);
             message.normalize(packet);
-            session.write(message);
+            replier.reply(message);
          }
       } 
       

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -29,7 +29,6 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.jms.delegate.ConsumerEndpoint;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.message.JBossMessage;
@@ -51,6 +50,7 @@
 import org.jboss.messaging.core.impl.SimpleDelivery;
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
 import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
@@ -645,22 +645,21 @@
       sessionEndpoint.promptDelivery(messageQueue);
    }
    
-   
-   private IoSession session;
+   private PacketReplier replier;
 
-   private void setSession(IoSession session)
+   private void setReplier(PacketReplier replier)
    {
-      this.session = session;
+      this.replier = replier;
    }   
 
    public void deliver(DeliverMessage message)
    {
-      if (session != null)
+      if (replier != null)
       {
          message.setTargetID(id);
-         session.write(message);
+         replier.reply(message);
       } else {
-         log.error("No session to deliver message");
+         log.error("No replier to deliver message to consumer");
       }
    }
 
@@ -673,14 +672,15 @@
          return id;
       }
 
-      public void handle(AbstractPacket packet, IoSession session)
+      public void handle(AbstractPacket packet, PacketReplier replier)
       {
-         setSession(session);
          try
          {
             PacketType type = packet.getType();
             if (type == MSG_CHANGERATE)
             {
+               setReplier(replier);
+
                ChangeRateMessage message = (ChangeRateMessage) packet;
                changeRate(message.getRate());
             } else if (type == REQ_CLOSING)
@@ -689,14 +689,16 @@
                long id = closing(request.getSequence());
                ClosingResponse response = new ClosingResponse(id);
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_CLOSE)
             {
                close();
                
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
+               
+               setReplier(null);
             } else {
                log.error("unsupported packet by server consumer endpoint: "  + packet);
             }
@@ -704,7 +706,7 @@
          {
             JMSExceptionMessage message = new JMSExceptionMessage(e);
             message.normalize(packet);
-            session.write(message);
+            replier.reply(message);
          }
       }
 

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -35,15 +35,14 @@
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.delegate.ConnectionEndpoint;
 import org.jboss.jms.delegate.IDBlock;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.tx.MessagingXid;
 import org.jboss.jms.tx.TransactionRequest;
-import org.jboss.messaging.core.remoting.Assert;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
@@ -177,7 +176,7 @@
          return id;
       }
 
-      public void handle(AbstractPacket packet, IoSession session)
+      public void handle(AbstractPacket packet, PacketReplier replier)
       {
          try
          {
@@ -188,14 +187,14 @@
                ClientSessionDelegate sessionDelegate = (ClientSessionDelegate) createSessionDelegate(request.isTransacted(), request.getAcknowledgementMode(), request.isXA());
                CreateSessionResponse response = new CreateSessionResponse(sessionDelegate.getID(), sessionDelegate.getDupsOKBatchSize(), sessionDelegate.isStrictTck());
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_IDBLOCK)
             {
                IDBlockRequest request = (IDBlockRequest) packet;
                IDBlock idBlock = getIdBlock(request.getSize());
                IDBlockResponse response = new IDBlockResponse(idBlock.getLow(), idBlock.getHigh());
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_STARTCONNECTION)
             {
                start();
@@ -205,46 +204,46 @@
                
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);               
+               replier.reply(response);               
             } else if (type == REQ_CLOSING)
             {
                org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
                long id = closing(request.getSequence());
                ClosingResponse response = new ClosingResponse(id);
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_CLOSE)
             {
                close();
                
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);               
+               replier.reply(response);               
             } else if (type == MSG_SENDTRANSACTION)
             {
                SendTransactionMessage message = (SendTransactionMessage) packet;
                sendTransaction(message.getTransactionRequest(), message.checkForDuplicates());
                NullPacket response = new NullPacket();
                response.normalize(message);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_GETPREPAREDTRANSACTIONS)
             {
                MessagingXid[] xids = getPreparedTransactions();
                GetPreparedTransactionsResponse response = new GetPreparedTransactionsResponse(xids);
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_GETCLIENTID)
             {
                GetClientIDResponse response = new GetClientIDResponse(getClientID());
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_SETCLIENTID)
             {
                SetClientIDMessage message = (SetClientIDMessage) packet;
                setClientID(message.getClientID());
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else 
             {
                System.err.println("Unsupported packet for connection: " + packet);
@@ -253,7 +252,7 @@
          {
             JMSExceptionMessage message = new JMSExceptionMessage(e);
             message.normalize(packet);
-            session.write(message);
+            replier.reply(message);
          }
       }
       

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -27,7 +27,6 @@
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
 import org.jboss.jms.delegate.CreateConnectionResult;
 import org.jboss.jms.delegate.TopologyResult;
@@ -35,6 +34,7 @@
 import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
 import org.jboss.messaging.core.remoting.Assert;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
@@ -165,7 +165,7 @@
          return id;
       }
 
-      public void handle(AbstractPacket packet, IoSession session)
+      public void handle(AbstractPacket packet, PacketReplier replier)
       {
          try
          {
@@ -178,21 +178,21 @@
 
                CreateConnectionResponse response = new CreateConnectionResponse(del.getDelegate().getID(), del.getDelegate().getServerID());
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_GETCLIENTAOPSTACK)
             {
                byte[] stack = getClientAOPStack();
 
                GetClientAOPStackResponse response = new GetClientAOPStackResponse(stack);
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_GETTOPOLOGY)
             {
                TopologyResult topology = getTopology();
 
                GetTopologyResponse response = new GetTopologyResponse(topology);
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else
             {
                System.err.println("unhandled packet:" + packet);
@@ -201,7 +201,7 @@
          {
             JMSExceptionMessage message = new JMSExceptionMessage(e);
             message.normalize(packet);
-            session.write(message);
+            replier.reply(message);
          }
       }
       

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -40,7 +40,6 @@
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.jms.client.delegate.ClientBrowserDelegate;
 import org.jboss.jms.client.delegate.ClientConsumerDelegate;
 import org.jboss.jms.delegate.Ack;
@@ -56,6 +55,7 @@
 import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
 import org.jboss.jms.server.endpoint.SessionInternalEndpoint;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
@@ -227,7 +227,7 @@
          return id;
       }
 
-      public void handle(AbstractPacket packet, IoSession session)
+      public void handle(AbstractPacket packet, PacketReplier replier)
       {
          try
          {
@@ -239,14 +239,14 @@
 
                NullPacket response = new NullPacket();
                response.normalize(message);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_CREATECONSUMER)
             {
                CreateConsumerRequest request = (CreateConsumerRequest) packet;
                ClientConsumerDelegate consumer = (ClientConsumerDelegate) createConsumerDelegate(request.getDestination(), request.getSelector(), request.isNoLocal(), request.getSubscriptionName(), request.isConnectionConsumer(), request.isAutoFlowControl());
                CreateConsumerResponse response = new CreateConsumerResponse(consumer.getID(), consumer.getBufferSize(), consumer.getMaxDeliveries(), consumer.getRedeliveryDelay());
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_CREATEDESTINATION)
             {
                CreateDestinationRequest request = (CreateDestinationRequest) packet;
@@ -260,21 +260,21 @@
                }
                CreateDestinationResponse response = new CreateDestinationResponse(destination);
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_CREATEBROWSER)
             {
                CreateBrowserRequest request = (CreateBrowserRequest) packet;
                ClientBrowserDelegate browser = (ClientBrowserDelegate) createBrowserDelegate(request.getDestination(), request.getSelector());
                CreateBrowserResponse response = new CreateBrowserResponse(browser.getID());
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == REQ_ACKDELIVERY)
             {
                AcknowledgeDeliveryRequest request = (AcknowledgeDeliveryRequest) packet;
                boolean acknowledged = acknowledgeDelivery(new DefaultAck(request.getDeliveryID()));
                AcknowledgeDeliveryResponse response = new AcknowledgeDeliveryResponse(acknowledged);
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_ACKDELIVERIES)
             {
                AcknowledgeDeliveriesMessage message = (AcknowledgeDeliveriesMessage) packet;
@@ -282,7 +282,7 @@
                
                NullPacket p = new NullPacket();
                p.normalize(message);
-               session.write(p);
+               replier.reply(p);
             } else if (type == MSG_RECOVERDELIVERIES)
             {
                RecoverDeliveriesMessage message = (RecoverDeliveriesMessage) packet;
@@ -290,7 +290,7 @@
 
                NullPacket p = new NullPacket();
                p.normalize(message);
-               session.write(p);
+               replier.reply(p);
             } else if (type == MSG_CANCELDELIVERY)
             {
                CancelDeliveryMessage message = (CancelDeliveryMessage) packet;
@@ -298,7 +298,7 @@
                
                NullPacket p = new NullPacket();
                p.normalize(message);
-               session.write(p);
+               replier.reply(p);
             } else if (type == MSG_CANCELDELIVERIES)
             {
                CancelDeliveriesMessage message = (CancelDeliveriesMessage) packet;
@@ -306,21 +306,21 @@
                
                NullPacket p = new NullPacket();
                p.normalize(message);
-               session.write(p);
+               replier.reply(p);
             } else if (type == REQ_CLOSING)
             {
                org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
                long id = closing(request.getSequence());
                ClosingResponse response = new ClosingResponse(id);
                response.normalize(request);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_CLOSE)
             {
                close();
                
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_UNSUBSCRIBE)
             {
                UnsubscribeMessage message = (UnsubscribeMessage) packet;
@@ -328,7 +328,7 @@
                
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);
+               replier.reply(response);
             } else if (type == MSG_ADDTEMPORARYDESTINATION)
             {
                AddTemporaryDestinationMessage message = (AddTemporaryDestinationMessage) packet;
@@ -336,7 +336,7 @@
                
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);     
+               replier.reply(response);     
             } else if (type == MSG_DELETETEMPORARYDESTINATION)
             {
                DeleteTemporaryDestinationMessage message = (DeleteTemporaryDestinationMessage) packet;
@@ -344,7 +344,7 @@
                
                NullPacket response = new NullPacket();
                response.normalize(packet);
-               session.write(response);     
+               replier.reply(response);     
             } else {
                //log.error("Unsupported packet for session: " + packet);
             }
@@ -352,7 +352,7 @@
          {
             JMSExceptionMessage message = new JMSExceptionMessage(e);
             message.normalize(packet);
-            session.write(message);
+            replier.reply(message);
          }
       }
 

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -10,7 +10,6 @@
 
 import java.util.UUID;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 
 /**
@@ -55,7 +54,7 @@
       return id;
    }
 
-   public abstract void handle(AbstractPacket packet, IoSession session);
+   public abstract void handle(AbstractPacket packet, PacketReplier replier);
 
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -74,6 +74,7 @@
       assert port > 0;
       assert transport != null;
 
+      System.err.println("### connect to " + host + ":" + port + " ###");
       connector = new NioSocketConnector();
 
       MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -6,7 +6,6 @@
  */
 package org.jboss.messaging.core.remoting;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 
 /**
@@ -36,5 +35,5 @@
     */
    String getID();
 
-   void handle(AbstractPacket packet, IoSession session);
+   void handle(AbstractPacket packet, PacketReplier replier);
 }

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -11,6 +11,7 @@
 import org.apache.mina.filter.reqres.Response;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 
 /**
@@ -33,7 +34,7 @@
    // IoHandlerAdapter overrides ------------------------------------
 
    @Override
-   public void messageReceived(IoSession session, Object message)
+   public void messageReceived(final IoSession session, Object message)
          throws Exception
    {
       if (message instanceof AbstractPacket)
@@ -45,7 +46,13 @@
          if (handler != null)
          {
             System.err.println("ClientHandler.messageReceived() handler: " + handler);
-            handler.handle(packet, session);
+            handler.handle(packet, new PacketReplier() {
+               public void reply(AbstractPacket p)
+               {
+                  session.write(p);
+               }
+               
+            });
          } else
          {
             System.err

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/ServerHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/ServerHandler.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/ServerHandler.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -11,6 +11,7 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 
 /**
@@ -56,7 +57,12 @@
       PacketHandler handler = PacketDispatcher.server.getHandler(targetID);
       if (handler != null)
       {
-         handler.handle(packet, session);
+         handler.handle(packet, new PacketReplier() {
+            public void reply(AbstractPacket p)
+            {
+               session.write(p);
+            }
+         });
          return;
       } else
       {

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -14,9 +14,9 @@
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.mina.common.IoSession;
 import org.jboss.messaging.core.remoting.AbstractPacketHandler;
 import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
@@ -55,6 +55,7 @@
    public void testSendOneWay() throws Exception
    {
       TextPacket packet = new TextPacket("testSendOneWay");
+      packet.setVersion((byte)1);
       client.sendOneWay(packet);
 
       Thread.sleep(300);
@@ -71,6 +72,7 @@
       for (int i = 0; i < MANY_MESSAGES; i++)
       {
          packets[i] = new TextPacket("testSendManyOneWay " + i);
+         packets[i].setVersion((byte)1);
          client.sendOneWay(packets[i]);
       }
 
@@ -96,6 +98,7 @@
       TestClientHandler callbackHandler = new TestClientHandler();
 
       TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
+      packet.setVersion((byte)1);
 
       client.sendOneWay(packet, callbackHandler);
       assertEquals(0, callbackHandler.getPackets().size());
@@ -110,6 +113,7 @@
    public void testSendBlocking() throws Exception
    {
       TextPacket request = new TextPacket("testSendBlocking");
+      request.setVersion((byte)1);
 
       AbstractPacket receivedPacket = client.sendBlocking(request);
 
@@ -125,6 +129,7 @@
       serverHandler.setSleepTime(7, SECONDS);
       
       AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
+      packet.setVersion((byte)1);
 
       try
       {
@@ -168,7 +173,7 @@
          packets = new ArrayList<TextPacket>();
       }
 
-      public void handle(AbstractPacket packet, IoSession session)
+      public void handle(AbstractPacket packet, PacketReplier replier)
       {
          packets.add((TextPacket) packet);
       }

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -91,6 +91,7 @@
                Thread.sleep(sleepTime);
                TextPacket p = new TextPacket(reverse(incomingPacket.getText()));
                p.setCorrelationID(incomingPacket.getCorrelationID());
+               p.setVersion(incomingPacket.getVersion());
                if (!NO_ID_SET.equals(incomingPacket.getCallbackID()))
                {
                   p.setTargetID(incomingPacket.getCallbackID());

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -14,6 +14,7 @@
 import org.apache.mina.common.IoSession;
 import org.jboss.messaging.core.remoting.AbstractPacketHandler;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketReplier;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
 
@@ -40,9 +41,11 @@
 
       PacketDispatcher.client.register(targetHandler);
 
+      TextPacket packet = new TextPacket(
+      "testClientHandlePacketSentByServer from client");
+      packet.setVersion((byte)1);
       // send a packet to create the IoSession on the server
-      client.sendOneWay(new TextPacket(
-            "testClientHandlePacketSentByServer from client"));
+      client.sendOneWay(packet);
 
       Thread.sleep(300);
 
@@ -50,6 +53,7 @@
       IoSession serverSession = serverHandler.getSessions().get(0);
       TextPacket packetFromServer = new TextPacket(
             "testClientHandlePacketSentByServer from server");
+      packetFromServer.setVersion((byte)1);
       packetFromServer.setTargetID(targetHandler.getID());
       serverSession.write(packetFromServer);
 
@@ -92,7 +96,7 @@
          packets = new ArrayList<TextPacket>();
       }
 
-      public void handle(AbstractPacket packet, IoSession session)
+      public void handle(AbstractPacket packet, PacketReplier replier)
       {
          packets.add((TextPacket) packet);
       }

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java	2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java	2007-11-28 13:24:40 UTC (rev 3376)
@@ -21,9 +21,6 @@
 */
 package org.jboss.test.messaging.tools.container;
 
-import static org.jboss.messaging.core.remoting.Constants.PORT;
-import static org.jboss.remoting.transport.PortUtil.findFreePort;
-
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.InputStream;
@@ -99,8 +96,6 @@
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
-import sun.management.StringFlag;
-
 import com.arjuna.ats.arjuna.recovery.RecoveryManager;
 
 /**
@@ -1418,14 +1413,19 @@
 
       log.debug("started " + objectName);
       
-      startMINAServer();
+      if (!transport.equals("http"))
+      {
+         startMINAServer(locator.getPort() + 1000);
+      }
    }
 
 
-   private void startMINAServer() throws Exception
+   private void startMINAServer(int port) throws Exception
    {
       if (acceptor == null)
       {
+         info("Starting MINA on port " + port);
+
          acceptor = new NioSocketAcceptor();
 
          // Prepare the configuration
@@ -1436,7 +1436,7 @@
          acceptor.getFilterChain().addLast("logger", new LoggingFilter());
 
          // Bind
-         acceptor.setLocalAddress(new InetSocketAddress(PORT));
+         acceptor.setLocalAddress(new InetSocketAddress("127.0.0.1", port));
          acceptor.setReuseAddress(true);
          acceptor.getSessionConfig().setReuseAddress(true);
          acceptor.getSessionConfig().setKeepAlive(true);
@@ -1445,7 +1445,7 @@
          acceptor.setHandler(new ServerHandler());
          acceptor.bind();
 
-         info("Started MINA on port " + PORT);
+         info("MINA started");
       }
    }
 
@@ -1455,6 +1455,7 @@
       {
          acceptor.unbind();
          acceptor.dispose();
+         acceptor = null;
          
          info("Stopped MINA ");
       }    




More information about the jboss-cvs-commits mailing list