[jboss-cvs] JBoss Messaging SVN: r3348 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/server/endpoint and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 20 10:22:55 EST 2007


Author: jmesnil
Date: 2007-11-20 10:22:54 -0500 (Tue, 20 Nov 2007)
New Revision: 3348

Added:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveriesMessageCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveryMessageCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDRequestCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveriesMessage.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveryMessage.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDRequest.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDResponse.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/RecoverDeliveriesMessage.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/SetClientIDMessage.java
Modified:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
* added packets & codecs for the new wireformat to pass tests in ConnectionFactoryTest & AcknowledgementTest

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -43,21 +43,21 @@
 import org.jboss.jms.tx.ResourceManagerFactory;
 import org.jboss.jms.tx.TransactionRequest;
 import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest;
-import org.jboss.jms.wireformat.ConnectionGetClientIDRequest;
 import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsRequest;
-import org.jboss.jms.wireformat.ConnectionSetClientIDRequest;
 import org.jboss.jms.wireformat.ConnectionStartRequest;
 import org.jboss.jms.wireformat.ConnectionStopRequest;
 import org.jboss.jms.wireformat.RequestSupport;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.Client;
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
 import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
 import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
 import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
 import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
 import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
 import org.jboss.messaging.util.Version;
 
@@ -89,8 +89,6 @@
 
    private transient Version versionToUse;
 
-   private Client newclient;
-
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
@@ -143,7 +141,7 @@
 
       client = thisState.getRemotingConnection().getRemotingClient();
       
-      newclient = thisState.getRemotingConnection().getNewRemotingClient();
+      newClient = thisState.getRemotingConnection().getNewRemotingClient();
 
       serverID = newDelegate.getServerID();
    }
@@ -154,29 +152,36 @@
 
 //      client = ((ConnectionState)state).getRemotingConnection().getRemotingClient();
       
-      newclient =  ((ConnectionState)state).getRemotingConnection().getNewRemotingClient();
+      newClient =  ((ConnectionState)state).getRemotingConnection().getNewRemotingClient();
    }
 
    // Closeable implementation ---------------------------------------------------------------------
 
    public void close() throws JMSException
    {
-      assert newclient != null;
+      assert newClient != null;
       
       CloseMessage message = new CloseMessage();
       message.setTargetID(id);
-      newclient.sendOneWay(message);
+      try
+      {
+         newClient.sendBlocking(message);
+      } catch (TimeoutException e)
+      {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+      }
    }
 
    public long closing(long sequence) throws JMSException
    {
-      assert newclient != null;
+      assert newClient != null;
 
       try
       {
          org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(sequence);
          request.setTargetID(id);
-         ClosingResponse response = (ClosingResponse) newclient.sendBlocking(request);
+         ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
          return response.getID();
       } catch (TimeoutException e)
       {
@@ -204,13 +209,13 @@
                                                 int acknowledgmentMode,
                                                 boolean isXA) throws JMSException
    {
-      assert newclient != null;
+      assert newClient != null;
 
       CreateSessionRequest request = new CreateSessionRequest(transacted, acknowledgmentMode, isXA);
       request.setTargetID(id);
       
       try {
-         CreateSessionResponse response = (CreateSessionResponse) newclient.sendBlocking(request);         
+         CreateSessionResponse response = (CreateSessionResponse) newClient.sendBlocking(request);         
          ClientSessionDelegate delegate = new ClientSessionDelegate(response.getSessionID(), response.getDupsOKBatchSize(), response.isStrictTCK());
          return delegate;
       } catch (Exception e) {
@@ -227,9 +232,23 @@
 
    public String getClientID() throws JMSException
    {
-      RequestSupport req = new ConnectionGetClientIDRequest(id, version);
+      assert newClient != null;
 
-      return (String)doInvoke(client, req);
+      try
+      {
+         GetClientIDRequest request = new GetClientIDRequest();
+         request.setTargetID(id);
+         GetClientIDResponse response = (GetClientIDResponse) newClient.sendBlocking(request);
+         return response.getClientID();
+      } catch (TimeoutException e)
+      {
+         e.printStackTrace();
+         return null;
+      }
+      
+//      RequestSupport req = new ConnectionGetClientIDRequest(id, version);
+//
+//      return (String)doInvoke(client, req);
    }
 
    /**
@@ -253,11 +272,18 @@
    public void sendTransaction(TransactionRequest tr,
                                boolean checkForDuplicates) throws JMSException
    {
-      assert newclient != null;
+      assert newClient != null;
 
-      SendTransactionMessage request = new SendTransactionMessage(tr, checkForDuplicates);
-      request.setTargetID(id);
-      newclient.sendOneWay(request);
+      try
+      {
+         SendTransactionMessage request = new SendTransactionMessage(tr, checkForDuplicates);
+         request.setTargetID(id);
+         newClient.sendBlocking(request);
+      } catch (TimeoutException e)
+      {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+      }
 //      RequestSupport req =
 //         new ConnectionSendTransactionRequest(id, version, request, checkForDuplicates);
 //
@@ -266,9 +292,20 @@
 
    public void setClientID(String clientID) throws JMSException
    {
-      RequestSupport req = new ConnectionSetClientIDRequest(id, version, clientID);
+      assert newClient != null;
 
-      doInvoke(client, req);
+      try
+      {
+         SetClientIDMessage message = new SetClientIDMessage(clientID);
+         message.setTargetID(id);
+         newClient.sendBlocking(message);
+      } catch (TimeoutException e)
+      {
+         e.printStackTrace();
+      }
+//      RequestSupport req = new ConnectionSetClientIDRequest(id, version, clientID);
+//
+//      doInvoke(client, req);
    }
 
    /**
@@ -282,11 +319,11 @@
 
    public void start() throws JMSException
    {
-      assert newclient != null;
+      assert newClient != null;
 
       StartConnectionMessage message = new StartConnectionMessage();
       message.setTargetID(id);
-      newclient.sendOneWay(message);
+      newClient.sendOneWay(message);
 //      RequestSupport req = new ConnectionStartRequest(id, version);
 //
 //      doInvoke(client, req);
@@ -331,13 +368,13 @@
    
    public IDBlock getIdBlock(int size) throws JMSException
    {
-      assert newclient != null;
+      assert newClient != null;
 
       IDBlockRequest request = new IDBlockRequest(size);
       request.setTargetID(id);
       try
       {
-         IDBlockResponse response = (IDBlockResponse) newclient.sendBlocking(request);
+         IDBlockResponse response = (IDBlockResponse) newClient.sendBlocking(request);
          return new IDBlock(response.getLow(), response.getHigh());
       } catch (TimeoutException e)
       {

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -25,6 +25,7 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.util.concurrent.TimeoutException;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -41,6 +42,8 @@
 import org.jboss.messaging.core.remoting.Client;
 import org.jboss.messaging.core.remoting.Constants;
 import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
 
 /**
  * The client-side Consumer delegate class.
@@ -128,16 +131,42 @@
 
    public void close() throws JMSException
    {
-      RequestSupport req = new CloseRequest(id, version);
+      assert newClient != null;
+      
+      CloseMessage message = new CloseMessage();
+      message.setTargetID(id);
+      try
+      {
+         newClient.sendBlocking(message);
+      } catch (TimeoutException e)
+      {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+      }
 
-      doInvoke(client, req);
+//      RequestSupport req = new CloseRequest(id, version);
+//
+//      doInvoke(client, req);
    }
 
    public long closing(long sequence) throws JMSException
    {
-      RequestSupport req = new ClosingRequest(sequence, id, version);
+      assert newClient != null;
 
-      return ((Long)doInvoke(client, req)).longValue();
+      try
+      {
+         org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(sequence);
+         request.setTargetID(id);
+         ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
+         return response.getID();
+      } catch (TimeoutException e)
+      {
+         e.printStackTrace();
+         return -1;
+      }
+//      RequestSupport req = new ClosingRequest(sequence, id, version);
+//
+//      return ((Long)doInvoke(client, req)).longValue();
    }
 
    // ConsumerDelegate implementation --------------------------------------------------------------

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -52,26 +52,26 @@
 import org.jboss.jms.message.ObjectMessageProxy;
 import org.jboss.jms.message.StreamMessageProxy;
 import org.jboss.jms.message.TextMessageProxy;
-import org.jboss.jms.wireformat.CloseRequest;
-import org.jboss.jms.wireformat.ClosingRequest;
 import org.jboss.jms.wireformat.RequestSupport;
 import org.jboss.jms.wireformat.SessionAddTemporaryDestinationRequest;
-import org.jboss.jms.wireformat.SessionCancelDeliveriesRequest;
-import org.jboss.jms.wireformat.SessionCancelDeliveryRequest;
 import org.jboss.jms.wireformat.SessionCreateBrowserDelegateRequest;
 import org.jboss.jms.wireformat.SessionCreateQueueRequest;
 import org.jboss.jms.wireformat.SessionCreateTopicRequest;
 import org.jboss.jms.wireformat.SessionDeleteTemporaryDestinationRequest;
-import org.jboss.jms.wireformat.SessionRecoverDeliveriesRequest;
 import org.jboss.jms.wireformat.SessionUnsubscribeRequest;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
 import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.SendMessage;
 
 /**
@@ -162,16 +162,45 @@
 
    public void close() throws JMSException
    {
-      RequestSupport req = new CloseRequest(id, version);
-
-      doInvoke(client, req);
+      assert newClient != null;
+      
+      CloseMessage message = new CloseMessage();
+      message.setTargetID(id);
+      try
+      {
+         newClient.sendBlocking(message);
+      } catch (TimeoutException e)
+      {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+      }
+//
+//      RequestSupport req = new CloseRequest(id, version);
+//
+//      doInvoke(client, req);
    }
 
    public long closing(long sequence) throws JMSException
-   {   	   	
-      RequestSupport req = new ClosingRequest(((SessionState)state).getNPSendSequence(), id, version);
+   {   	   
+      assert newClient != null;
 
-      return ((Long)doInvoke(client, req)).longValue();
+      long seq = ((SessionState)state).getNPSendSequence();
+      
+      try
+      {
+         org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(seq);
+         request.setTargetID(id);
+         ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
+         return response.getID();
+      } catch (TimeoutException e)
+      {
+         e.printStackTrace();
+         return -1;
+      }
+      
+//      RequestSupport req = new ClosingRequest(((SessionState)state).getNPSendSequence(), id, version);
+//
+//      return ((Long)doInvoke(client, req)).longValue();
    }
 
    // SessionDelegate implementation ---------------------------------------------------------------
@@ -529,7 +558,12 @@
          SendMessage message = new SendMessage(m, checkForDuplicates, seq);
          message.setTargetID(id);
          
-         newClient.sendOneWay(message);
+         if (seq == -1)
+         {
+            newClient.sendBlocking(message);
+         } else {
+            newClient.sendOneWay(message);
+         }
       } catch (Exception e)
       {
          // TODO Auto-generated catch block
@@ -551,23 +585,62 @@
 
    public void cancelDeliveries(List cancels) throws JMSException
    {
-      RequestSupport req = new SessionCancelDeliveriesRequest(id, version, cancels);
-
-      doInvoke(client, req);
+      assert newClient != null;
+      
+      CancelDeliveriesMessage message = new CancelDeliveriesMessage(cancels);
+      message.setTargetID(id);
+      
+      try
+      {
+         newClient.sendBlocking(message);
+      } catch (TimeoutException e)
+      {
+         e.printStackTrace();
+      }
+      
+//      RequestSupport req = new SessionCancelDeliveriesRequest(id, version, cancels);
+//
+//      doInvoke(client, req);
    }
 
    public void cancelDelivery(Cancel cancel) throws JMSException
    {
-      RequestSupport req = new SessionCancelDeliveryRequest(id, version, cancel);
-
-      doInvoke(client, req);
+      assert newClient != null;
+      
+      CancelDeliveryMessage message = new CancelDeliveryMessage(cancel);
+      message.setTargetID(id);
+      
+      try
+      {
+         newClient.sendBlocking(message);
+      } catch (TimeoutException e)
+      {
+         e.printStackTrace();
+      }
+      
+//      RequestSupport req = new SessionCancelDeliveryRequest(id, version, cancel);
+//
+//      doInvoke(client, req);
    }
 
-   public void recoverDeliveries(List acks, String sessionID) throws JMSException
+   public void recoverDeliveries(List deliveries, String sessionID) throws JMSException
    {
-      RequestSupport req = new SessionRecoverDeliveriesRequest(id, version, acks, sessionID);
-
-      doInvoke(client, req);
+      assert newClient != null;
+      
+      try
+      {
+         RecoverDeliveriesMessage message = new RecoverDeliveriesMessage(deliveries, sessionID);
+         message.setTargetID(id);
+         newClient.sendBlocking(message);
+      } catch (TimeoutException e)
+      {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+      }
+      
+//      RequestSupport req = new SessionRecoverDeliveriesRequest(id, version, acks, sessionID);
+//
+//      doInvoke(client, req);
    }
 
    // Streamable overrides -------------------------------------------------------------------------

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -23,9 +23,11 @@
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDTRANSACTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_IDBLOCK;
 
 import java.util.ArrayList;
@@ -77,10 +79,13 @@
 import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
 import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
 import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
 import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.GUIDGenerator;
 import org.jboss.messaging.util.Util;
@@ -877,10 +882,29 @@
             } else if (type == MSG_CLOSE)
             {
                close();
+               
+               NullPacket response = new NullPacket();
+               response.normalize(packet);
+               session.write(response);               
             } else if (type == MSG_SENDTRANSACTION)
             {
                SendTransactionMessage message = (SendTransactionMessage) packet;
                sendTransaction(message.getTransactionRequest(), message.checkForDuplicates());
+               NullPacket response = new NullPacket();
+               response.normalize(message);
+               session.write(response);
+            } else if (type == REQ_GETCLIENTID)
+            {
+               GetClientIDResponse response = new GetClientIDResponse(getClientID());
+               response.normalize(packet);
+               session.write(response);
+            } else if (type == MSG_SETCLIENTID)
+            {
+               SetClientIDMessage message = (SetClientIDMessage) packet;
+               setClientID(message.getClientID());
+               NullPacket response = new NullPacket();
+               response.normalize(packet);
+               session.write(response);
             } else 
             {
                log.error("Unsupported packet for connection: " + packet);

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -22,6 +22,8 @@
 package org.jboss.jms.server.endpoint;
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CHANGERATE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
 
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidSelectorException;
@@ -51,7 +53,9 @@
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
 import org.jboss.messaging.util.ExceptionUtil;
 
@@ -678,6 +682,20 @@
             {
                ChangeRateMessage message = (ChangeRateMessage) packet;
                changeRate(message.getRate());
+            } else if (type == REQ_CLOSING)
+            {
+               org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
+               long id = closing(request.getSequence());
+               ClosingResponse response = new ClosingResponse(id);
+               response.normalize(request);
+               session.write(response);
+            } else if (type == MSG_CLOSE)
+            {
+               close();
+               
+               NullPacket response = new NullPacket();
+               response.normalize(packet);
+               session.write(response);
             } else {
                log.error("unsupported packet by server consumer endpoint: "  + packet);
             }

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -22,8 +22,13 @@
 package org.jboss.jms.server.endpoint;
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
 
 import java.lang.ref.WeakReference;
@@ -95,11 +100,15 @@
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
 import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.SendMessage;
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.GUIDGenerator;
@@ -2437,6 +2446,12 @@
             {
                SendMessage message = (SendMessage) packet;
                send(message.getMessage(), message.checkForDuplicates(), message.getSequence());
+               if (message.getSequence() == -1)
+               {
+                  NullPacket response = new NullPacket();
+                  response.normalize(message);
+                  session.write(response);
+               }
             } else if (type == REQ_CREATECONSUMER)
             {
                CreateConsumerRequest request = (CreateConsumerRequest) packet;
@@ -2459,6 +2474,44 @@
                NullPacket p = new NullPacket();
                p.normalize(message);
                session.write(p);
+            } else if (type == MSG_RECOVERDELIVERIES)
+            {
+               RecoverDeliveriesMessage message = (RecoverDeliveriesMessage) packet;
+               recoverDeliveries(message.getDeliveries(), message.getSessionID());
+
+               NullPacket p = new NullPacket();
+               p.normalize(message);
+               session.write(p);
+            } else if (type == MSG_CANCELDELIVERY)
+            {
+               CancelDeliveryMessage message = (CancelDeliveryMessage) packet;
+               cancelDelivery(message.getCancel());
+               
+               NullPacket p = new NullPacket();
+               p.normalize(message);
+               session.write(p);
+            } else if (type == MSG_CANCELDELIVERIES)
+            {
+               CancelDeliveriesMessage message = (CancelDeliveriesMessage) packet;
+               cancelDeliveries(message.getCancels());
+               
+               NullPacket p = new NullPacket();
+               p.normalize(message);
+               session.write(p);
+            } else if (type == REQ_CLOSING)
+            {
+               org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
+               long id = closing(request.getSequence());
+               ClosingResponse response = new ClosingResponse(id);
+               response.normalize(request);
+               session.write(response);
+            } else if (type == MSG_CLOSE)
+            {
+               close();
+               
+               NullPacket response = new NullPacket();
+               response.normalize(packet);
+               session.write(response);
             } else {
                log.error("Unsupported packet for session: " + packet);
             }

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveriesMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveriesMessageCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveriesMessageCodec.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.delegate.Cancel;
+import org.jboss.jms.delegate.DefaultCancel;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$F</tt>
+ */
+public class CancelDeliveriesMessageCodec extends
+      AbstractPacketCodec<CancelDeliveriesMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public CancelDeliveriesMessageCodec()
+   {
+      super(MSG_CANCELDELIVERIES);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session,
+         CancelDeliveriesMessage message, IoBuffer out) throws Exception
+   {
+      List<Cancel> cancels = message.getCancels();
+
+      int numOfCancels = cancels.size();
+      int cancelLength = LONG_LENGTH + INT_LENGTH + 1 + 1; // 2 booleans
+      int bodyLength = INT_LENGTH + cancelLength * numOfCancels;
+
+      out.putInt(bodyLength);
+      out.putInt(numOfCancels);
+      for (Cancel cancel : cancels)
+      {
+         out.putLong(cancel.getDeliveryId());
+         out.putInt(cancel.getDeliveryCount());
+         putBoolean(out, cancel.isExpired());
+         putBoolean(out, cancel.isReachedMaxDeliveryAttempts());
+      }
+   }
+
+   @Override
+   protected CancelDeliveriesMessage decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      int numOfCancels = in.getInt();
+      List<Cancel> cancels = new ArrayList<Cancel>(numOfCancels);
+      for (int i = 0; i < numOfCancels; i++)
+      {
+         long deliveryID = in.getLong();
+         int deliveryCount = in.getInt();
+         boolean expired = getBoolean(in);
+         boolean reachedMaxDeliveryAttempts = getBoolean(in);
+
+         cancels.add(new DefaultCancel(deliveryID, deliveryCount, expired,
+               reachedMaxDeliveryAttempts));
+      }
+
+      return new CancelDeliveriesMessage(cancels);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveryMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveryMessageCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveryMessageCodec.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.delegate.Cancel;
+import org.jboss.jms.delegate.DefaultCancel;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class CancelDeliveryMessageCodec extends
+      AbstractPacketCodec<CancelDeliveryMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public CancelDeliveryMessageCodec()
+   {
+      super(MSG_CANCELDELIVERY);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session, CancelDeliveryMessage message,
+         IoBuffer out) throws Exception
+   {
+      long deliveryID = message.getCancel().getDeliveryId();
+      int deliveryCount = message.getCancel().getDeliveryCount();
+      boolean expired = message.getCancel().isExpired();
+      boolean reachedMaxDeliveryAttempts = message.getCancel()
+            .isReachedMaxDeliveryAttempts();
+
+      int bodyLength = LONG_LENGTH + INT_LENGTH + 2;
+
+      out.putInt(bodyLength);
+      out.putLong(deliveryID);
+      out.putInt(deliveryCount);
+      putBoolean(out, expired);
+      putBoolean(out, reachedMaxDeliveryAttempts);
+   }
+
+   @Override
+   protected CancelDeliveryMessage decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      long deliveryID = in.getLong();
+      int deliveryCount = in.getInt();
+      boolean expired = getBoolean(in);
+      boolean reachedMaxDeliveryAttempts = getBoolean(in);
+
+      Cancel cancel = new DefaultCancel(deliveryID, deliveryCount, expired,
+            reachedMaxDeliveryAttempts);
+      return new CancelDeliveryMessage(cancel);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDRequestCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDRequestCodec.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class GetClientIDRequestCodec extends
+      AbstractPacketCodec<GetClientIDRequest>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public GetClientIDRequestCodec()
+   {
+      super(REQ_GETCLIENTID);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session, GetClientIDRequest request,
+         IoBuffer out) throws Exception
+   {
+      // no body
+      out.putInt(0);
+   }
+
+   @Override
+   protected GetClientIDRequest decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      in.getInt(); // skip body length
+
+      return new GetClientIDRequest();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETCLIENTID;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class GetClientIDResponseCodec extends
+      AbstractPacketCodec<GetClientIDResponse>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public GetClientIDResponseCodec()
+   {
+      super(RESP_GETCLIENTID);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session, GetClientIDResponse response,
+         IoBuffer out) throws Exception
+   {
+      String clientID = response.getClientID();
+
+      out.putInt(sizeof(clientID));
+      putString(out, clientID);
+   }
+
+   @Override
+   protected GetClientIDResponse decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      String clientID = getString(in);
+
+      return new GetClientIDResponse(clientID);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -10,6 +10,8 @@
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
 import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
 import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
@@ -21,11 +23,15 @@
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
 import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
 import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.SendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
 import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
 
@@ -62,10 +68,6 @@
       super.addMessageEncoder(CreateConnectionResponse.class,
             ConnectionFactoryCreateConnectionResponseCodec.class);
 
-      // super.addMessageDecoder(ClientDeliveryCodec.class);
-      // super.addMessageEncoder(ClientDelivery.class,
-      // ClientDeliveryCodec.class);
-      //
       super.addMessageDecoder(CreateSessionRequestCodec.class);
       super.addMessageEncoder(CreateSessionRequest.class,
             CreateSessionRequestCodec.class);
@@ -82,6 +84,18 @@
             .addMessageEncoder(IDBlockResponse.class,
                   IDBlockResponseCodec.class);
 
+      super.addMessageDecoder(GetClientIDRequestCodec.class);
+      super.addMessageEncoder(GetClientIDRequest.class,
+            GetClientIDRequestCodec.class);
+
+      super.addMessageDecoder(GetClientIDResponseCodec.class);
+      super.addMessageEncoder(GetClientIDResponse.class,
+            GetClientIDResponseCodec.class);
+
+      super.addMessageDecoder(SetClientIDMessageCodec.class);
+      super.addMessageEncoder(SetClientIDMessage.class,
+            SetClientIDMessageCodec.class);
+
       super.addMessageDecoder(SendMessageCodec.class);
       super.addMessageEncoder(SendMessage.class, SendMessageCodec.class);
 
@@ -116,18 +130,33 @@
       super.addMessageEncoder(AcknowledgeDeliveriesMessage.class,
             AcknowledgeDeliveriesRequestCodec.class);
 
+      super.addMessageDecoder(RecoverDeliveriesMessageCodec.class);
+      super.addMessageEncoder(RecoverDeliveriesMessage.class,
+            RecoverDeliveriesMessageCodec.class);
+
+      super.addMessageDecoder(CancelDeliveryMessageCodec.class);
+      super.addMessageEncoder(CancelDeliveryMessage.class,
+            CancelDeliveryMessageCodec.class);
+
+      super.addMessageDecoder(CancelDeliveriesMessageCodec.class);
+      super.addMessageEncoder(CancelDeliveriesMessage.class,
+            CancelDeliveriesMessageCodec.class);
+
       super.addMessageDecoder(ClosingRequestCodec.class);
       super.addMessageEncoder(ClosingRequest.class, ClosingRequestCodec.class);
 
       super.addMessageDecoder(ClosingResponseCodec.class);
-      super.addMessageEncoder(ClosingResponse.class, ClosingResponseCodec.class);
+      super
+            .addMessageEncoder(ClosingResponse.class,
+                  ClosingResponseCodec.class);
 
       super.addMessageDecoder(CloseMessageCodec.class);
       super.addMessageEncoder(CloseMessage.class, CloseMessageCodec.class);
-      
+
       super.addMessageDecoder(SendTransactionRequestCodec.class);
-      super.addMessageEncoder(SendTransactionMessage.class, SendTransactionRequestCodec.class);
-}
+      super.addMessageEncoder(SendTransactionMessage.class,
+            SendTransactionRequestCodec.class);
+   }
 
    // Public --------------------------------------------------------
 

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,123 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.delegate.DeliveryRecovery;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class RecoverDeliveriesMessageCodec extends
+      AbstractPacketCodec<RecoverDeliveriesMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public RecoverDeliveriesMessageCodec()
+   {
+      super(PacketType.MSG_RECOVERDELIVERIES);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session,
+         RecoverDeliveriesMessage message, IoBuffer out) throws Exception
+   {
+      List<DeliveryRecovery> deliveries = message.getDeliveries();
+      String sessionID = message.getSessionID();
+      byte[] encodedDeliveries = encode(deliveries);
+
+      int bodyLength = INT_LENGTH + INT_LENGTH + encodedDeliveries.length
+            + sizeof(sessionID);
+
+      out.putInt(bodyLength);
+      out.putInt(deliveries.size());
+      out.putInt(encodedDeliveries.length);
+      out.put(encodedDeliveries);
+      putString(out, sessionID);
+   }
+
+   @Override
+   protected RecoverDeliveriesMessage decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      int deliveriesSize = in.getInt();
+      int encodedDeliveriesSize = in.getInt();
+      byte[] encodedDeliveries = new byte[encodedDeliveriesSize];
+      in.get(encodedDeliveries);
+      List<DeliveryRecovery> deliveries = decode(deliveriesSize,
+            encodedDeliveries);
+      String sessionID = getString(in);
+
+      return new RecoverDeliveriesMessage(deliveries, sessionID);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private static byte[] encode(List<DeliveryRecovery> deliveries)
+         throws Exception
+   {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+      for (DeliveryRecovery delivery : deliveries)
+      {
+         delivery.write(dos);
+      }
+      baos.flush();
+      return baos.toByteArray();
+   }
+
+   private List<DeliveryRecovery> decode(int size, byte[] encodedDeliveries)
+         throws Exception
+   {
+      ByteArrayInputStream bais = new ByteArrayInputStream(encodedDeliveries);
+      DataInputStream dis = new DataInputStream(bais);
+
+      List<DeliveryRecovery> deliveries = new ArrayList<DeliveryRecovery>();
+      for (int i = 0; i < size; i++)
+      {
+         DeliveryRecovery delivery = new DeliveryRecovery();
+         delivery.read(dis);
+         deliveries.add(delivery);
+      }
+      return deliveries;
+   }
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class SetClientIDMessageCodec extends
+      AbstractPacketCodec<SetClientIDMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SetClientIDMessageCodec()
+   {
+      super(MSG_SETCLIENTID);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session, SetClientIDMessage message,
+         IoBuffer out) throws Exception
+   {
+      String clientID = message.getClientID();
+
+      out.putInt(sizeof(clientID));
+      putString(out, clientID);
+   }
+
+   @Override
+   protected SetClientIDMessage decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      String clientID = getString(in);
+
+      return new SetClientIDMessage(clientID);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveriesMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveriesMessage.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveriesMessage.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,64 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+
+import java.util.List;
+
+import org.jboss.jms.delegate.Cancel;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class CancelDeliveriesMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final List<Cancel> cancels;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public CancelDeliveriesMessage(List<Cancel> cancels)
+   {
+      super(MSG_CANCELDELIVERIES);
+
+      assert cancels != null;
+      assert cancels.size() > 0;
+
+      this.cancels = cancels;
+   }
+
+   // Public --------------------------------------------------------
+
+   public List<Cancel> getCancels()
+   {
+      return cancels;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", cancels=" + cancels + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveryMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveryMessage.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveryMessage.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+
+import org.jboss.jms.delegate.Cancel;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class CancelDeliveryMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final Cancel cancel;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public CancelDeliveryMessage(Cancel cancel)
+   {
+      super(MSG_CANCELDELIVERY);
+
+      assert cancel != null;
+
+      this.cancel = cancel;
+   }
+
+   // Public --------------------------------------------------------
+
+   public Cancel getCancel()
+   {
+      return cancel;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", cancel=" + cancel + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDRequest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDRequest.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDRequest.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class GetClientIDRequest extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public GetClientIDRequest()
+   {
+      super(REQ_GETCLIENTID);
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDResponse.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDResponse.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDResponse.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETCLIENTID;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class GetClientIDResponse extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String clientID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public GetClientIDResponse(String clientID)
+   {
+      super(RESP_GETCLIENTID);
+
+      assertValidID(clientID);
+
+      this.clientID = clientID;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getClientID()
+   {
+      return clientID;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", clientID=" + clientID + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -28,21 +28,27 @@
    MSG_STARTCONNECTION((byte)10),
    MSG_SENDTRANSACTION((byte)11),
    RESP_SENDTRANSACTION((byte)12),
+   REQ_GETCLIENTID((byte)13),
+   RESP_GETCLIENTID((byte)14),
+   MSG_SETCLIENTID((byte)15),
    // Session
-   REQ_CREATECONSUMER((byte)13),
-   RESP_CREATECONSUMER((byte)14),   
-   MSG_SENDMESSAGE((byte)15),
-   MSG_DELIVERMESSAGE((byte)16),
-   REQ_ACKDELIVERY((byte)17),
-   RESP_ACKDELIVERY((byte)18),
-   MSG_ACKDELIVERIES((byte)19),
-   RESP_ACKDELIVERIES((byte)20),
+   REQ_CREATECONSUMER((byte)16),
+   RESP_CREATECONSUMER((byte)17),   
+   MSG_SENDMESSAGE((byte)18),
+   MSG_DELIVERMESSAGE((byte)19),
+   REQ_ACKDELIVERY((byte)20),
+   RESP_ACKDELIVERY((byte)21),
+   MSG_ACKDELIVERIES((byte)22),
+   RESP_ACKDELIVERIES((byte)23),
+   MSG_RECOVERDELIVERIES((byte)24),
+   MSG_CANCELDELIVERY((byte)25),
+   MSG_CANCELDELIVERIES((byte)26),
    // Consumer
-   MSG_CHANGERATE((byte)21),
+   MSG_CHANGERATE((byte)27),
    // Misc
-   REQ_CLOSING((byte)22),
-   RESP_CLOSING((byte)23),
-   MSG_CLOSE((byte)24);
+   REQ_CLOSING((byte)28),
+   RESP_CLOSING((byte)29),
+   MSG_CLOSE((byte)30);
    
    
    private byte type;

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/RecoverDeliveriesMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/RecoverDeliveriesMessage.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/RecoverDeliveriesMessage.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
+
+import java.util.List;
+
+import org.jboss.jms.delegate.DeliveryRecovery;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class RecoverDeliveriesMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final List<DeliveryRecovery> deliveries;
+   private final String sessionID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public RecoverDeliveriesMessage(List<DeliveryRecovery> deliveries,
+         String sessionID)
+   {
+      super(MSG_RECOVERDELIVERIES);
+
+      assert deliveries != null;
+      assert deliveries.size() > 0;
+      assertValidID(sessionID);
+
+      this.deliveries = deliveries;
+      this.sessionID = sessionID;
+   }
+
+   // Public --------------------------------------------------------
+
+   public List<DeliveryRecovery> getDeliveries()
+   {
+      return deliveries;
+   }
+
+   public String getSessionID()
+   {
+      return sessionID;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", deliveries=" + deliveries + ", sessionID="
+            + sessionID + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/SetClientIDMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/SetClientIDMessage.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/SetClientIDMessage.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class SetClientIDMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String clientID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SetClientIDMessage(String clientID)
+   {
+      super(MSG_SETCLIENTID);
+
+      assertValidID(clientID);
+
+      this.clientID = clientID;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getClientID()
+   {
+      return clientID;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", clientID=" + clientID + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java	2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java	2007-11-20 15:22:54 UTC (rev 3348)
@@ -6,12 +6,17 @@
  */
 package org.jboss.test.messaging.core.remoting.wireformat;
 
+import static java.lang.System.currentTimeMillis;
 import static java.util.UUID.randomUUID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CHANGERATE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELIVERMESSAGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDTRANSACTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
@@ -19,12 +24,14 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_IDBLOCK;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_ACKDELIVERY;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CLOSING;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATECONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATECONSUMER;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETCLIENTID;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_IDBLOCK;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
 
@@ -38,7 +45,10 @@
 import org.apache.mina.filter.codec.ProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolEncoder;
 import org.jboss.jms.delegate.Ack;
+import org.jboss.jms.delegate.Cancel;
 import org.jboss.jms.delegate.DefaultAck;
+import org.jboss.jms.delegate.DefaultCancel;
+import org.jboss.jms.delegate.DeliveryRecovery;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.message.JBossMessage;
@@ -50,6 +60,8 @@
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
 import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
 import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
@@ -61,12 +73,16 @@
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
 import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
 import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.SendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
 import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
 
@@ -94,6 +110,41 @@
       }
    }
 
+   private static void assertSameDeliveries(List<DeliveryRecovery> expected,
+         List<DeliveryRecovery> actual)
+   {
+      assertEquals(expected.size(), actual.size());
+      for (int i = 0; i < expected.size(); i++)
+      {
+         DeliveryRecovery expectedDelivery = expected.get(i);
+         DeliveryRecovery actualDelivery = actual.get(i);
+         assertEquals(expectedDelivery.getDeliveryID(), actualDelivery
+               .getDeliveryID());
+         assertEquals(expectedDelivery.getMessageID(), actualDelivery
+               .getMessageID());
+         assertEquals(expectedDelivery.getQueueName(), actualDelivery
+               .getQueueName());
+      }
+   }
+
+   private static void assertSameCancels(List<Cancel> expected,
+         List<Cancel> actual)
+   {
+      assertEquals(expected.size(), actual.size());
+      for (int i = 0; i < expected.size(); i++)
+      {
+         Cancel expectedCancel = expected.get(i);
+         Cancel actualCancel = actual.get(i);
+         assertEquals(expectedCancel.getDeliveryId(), actualCancel
+               .getDeliveryId());
+         assertEquals(expectedCancel.getDeliveryCount(), actualCancel
+               .getDeliveryCount());
+         assertEquals(expectedCancel.isExpired(), actualCancel.isExpired());
+         assertEquals(expectedCancel.isReachedMaxDeliveryAttempts(),
+               actualCancel.isReachedMaxDeliveryAttempts());
+      }
+   }
+
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
@@ -102,7 +153,7 @@
    {
       NullPacket packet = new NullPacket();
       packet.setCallbackID(randomUUID().toString());
-      packet.setCorrelationID(System.currentTimeMillis());
+      packet.setCorrelationID(currentTimeMillis());
       packet.setTargetID(randomUUID().toString());
 
       AbstractPacket decodedPacket = encodeAndDecode(packet);
@@ -217,8 +268,8 @@
 
    public void testIDBlockResponse() throws Exception
    {
-      IDBlockResponse response = new IDBlockResponse(
-            System.currentTimeMillis(), System.currentTimeMillis() * 2);
+      IDBlockResponse response = new IDBlockResponse(currentTimeMillis(),
+            currentTimeMillis() * 2);
 
       AbstractPacket decodedPacket = encodeAndDecode(response);
 
@@ -233,7 +284,7 @@
    public void testSendMessage() throws Exception
    {
       SendMessage packet = new SendMessage(new JBossMessage(System
-            .currentTimeMillis()), true, System.currentTimeMillis());
+            .currentTimeMillis()), true, currentTimeMillis());
 
       AbstractPacket p = encodeAndDecode(packet);
 
@@ -276,7 +327,7 @@
    {
 
       CreateConsumerResponse response = new CreateConsumerResponse(randomUUID()
-            .toString(), 23, 42, System.currentTimeMillis());
+            .toString(), 23, 42, currentTimeMillis());
 
       AbstractPacket decodedPacket = encodeAndDecode(response);
 
@@ -315,9 +366,9 @@
 
    public void testDeliverMessage() throws Exception
    {
-      Message msg = new JBossMessage(System.currentTimeMillis());
+      Message msg = new JBossMessage(currentTimeMillis());
       DeliverMessage message = new DeliverMessage(msg, randomUUID().toString(),
-            System.currentTimeMillis(), 23);
+            currentTimeMillis(), 23);
 
       AbstractPacket decodedPacket = encodeAndDecode(message);
 
@@ -335,7 +386,7 @@
    public void testAcknowledgeDeliveryRequest() throws Exception
    {
       AcknowledgeDeliveryRequest request = new AcknowledgeDeliveryRequest(
-            System.currentTimeMillis());
+            currentTimeMillis());
 
       AbstractPacket decodedPacket = encodeAndDecode(request);
 
@@ -379,7 +430,7 @@
 
    public void testClosingRequest() throws Exception
    {
-      ClosingRequest request = new ClosingRequest(System.currentTimeMillis());
+      ClosingRequest request = new ClosingRequest(currentTimeMillis());
 
       AbstractPacket decodedPacket = encodeAndDecode(request);
 
@@ -418,19 +469,113 @@
       TransactionRequest tr = new TransactionRequest(
             TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
 
-      SendTransactionMessage request = new SendTransactionMessage(tr, true);
+      SendTransactionMessage message = new SendTransactionMessage(tr, true);
 
-      AbstractPacket decodedPacket = encodeAndDecode(request);
+      AbstractPacket decodedPacket = encodeAndDecode(message);
 
       assertTrue(decodedPacket instanceof SendTransactionMessage);
-      SendTransactionMessage decodedRequest = (SendTransactionMessage) decodedPacket;
-      assertEquals(MSG_SENDTRANSACTION, decodedRequest.getType());
-      assertEquals(request.getTransactionRequest().getRequestType(),
-            decodedRequest.getTransactionRequest().getRequestType());
-      assertEquals(request.checkForDuplicates(), decodedRequest
+      SendTransactionMessage decodedMessage = (SendTransactionMessage) decodedPacket;
+      assertEquals(MSG_SENDTRANSACTION, decodedMessage.getType());
+      assertEquals(message.getTransactionRequest().getRequestType(),
+            decodedMessage.getTransactionRequest().getRequestType());
+      assertEquals(message.checkForDuplicates(), decodedMessage
             .checkForDuplicates());
    }
 
+   public void testGetClientIDRequest() throws Exception
+   {
+      GetClientIDRequest request = new GetClientIDRequest();
+
+      AbstractPacket decodedPacket = encodeAndDecode(request);
+
+      assertTrue(decodedPacket instanceof GetClientIDRequest);
+      assertEquals(REQ_GETCLIENTID, decodedPacket.getType());
+   }
+
+   public void testGetClientIDResponse() throws Exception
+   {
+      GetClientIDResponse response = new GetClientIDResponse(randomUUID()
+            .toString());
+
+      AbstractPacket decodedPacket = encodeAndDecode(response);
+
+      assertTrue(decodedPacket instanceof GetClientIDResponse);
+      GetClientIDResponse decodedResponse = (GetClientIDResponse) decodedPacket;
+      assertEquals(RESP_GETCLIENTID, decodedResponse.getType());
+      assertEquals(response.getClientID(), decodedResponse.getClientID());
+   }
+
+   public void testSetClientIDMessage() throws Exception
+   {
+      SetClientIDMessage message = new SetClientIDMessage(randomUUID()
+            .toString());
+
+      AbstractPacket decodedPacket = encodeAndDecode(message);
+
+      assertTrue(decodedPacket instanceof SetClientIDMessage);
+      SetClientIDMessage decodedMessage = (SetClientIDMessage) decodedPacket;
+      assertEquals(MSG_SETCLIENTID, decodedMessage.getType());
+      assertEquals(message.getClientID(), decodedMessage.getClientID());
+   }
+
+   public void testRecoverDeliveriesMessage() throws Exception
+   {
+      List<DeliveryRecovery> deliveries = new ArrayList<DeliveryRecovery>();
+      deliveries.add(new DeliveryRecovery(currentTimeMillis(),
+            currentTimeMillis(), randomUUID().toString()));
+      deliveries.add(new DeliveryRecovery(currentTimeMillis(),
+            currentTimeMillis(), randomUUID().toString()));
+      deliveries.add(new DeliveryRecovery(currentTimeMillis(),
+            currentTimeMillis(), randomUUID().toString()));
+
+      RecoverDeliveriesMessage message = new RecoverDeliveriesMessage(
+            deliveries, randomUUID().toString());
+
+      AbstractPacket decodedPacket = encodeAndDecode(message);
+
+      assertTrue(decodedPacket instanceof RecoverDeliveriesMessage);
+      RecoverDeliveriesMessage decodedMessage = (RecoverDeliveriesMessage) decodedPacket;
+      assertEquals(MSG_RECOVERDELIVERIES, decodedMessage.getType());
+      assertSameDeliveries(message.getDeliveries(), decodedMessage
+            .getDeliveries());
+      assertEquals(message.getSessionID(), decodedMessage.getSessionID());
+   }
+
+   public void testCancelDeliveryMessage() throws Exception
+   {
+      Cancel cancel = new DefaultCancel(currentTimeMillis(), 23, true, false);
+      CancelDeliveryMessage message = new CancelDeliveryMessage(cancel);
+
+      AbstractPacket decodedPacket = encodeAndDecode(message);
+
+      assertTrue(decodedPacket instanceof CancelDeliveryMessage);
+      CancelDeliveryMessage decodedMessage = (CancelDeliveryMessage) decodedPacket;
+      assertEquals(MSG_CANCELDELIVERY, decodedMessage.getType());
+      Cancel expected = message.getCancel();
+      Cancel actual = decodedMessage.getCancel();
+      assertEquals(expected.getDeliveryId(), actual.getDeliveryId());
+      assertEquals(expected.getDeliveryCount(), actual.getDeliveryCount());
+      assertEquals(expected.isExpired(), actual.isExpired());
+      assertEquals(expected.isReachedMaxDeliveryAttempts(), actual
+            .isReachedMaxDeliveryAttempts());
+   }
+
+   public void testCancelDeliveriesMessage() throws Exception
+   {
+      List<Cancel> cancels = new ArrayList<Cancel>();
+      cancels.add(new DefaultCancel(currentTimeMillis(), 23, true, false));
+      cancels.add(new DefaultCancel(currentTimeMillis(), 33, false, true));
+
+      CancelDeliveriesMessage message = new CancelDeliveriesMessage(cancels);
+
+      AbstractPacket decodedPacket = encodeAndDecode(message);
+
+      assertTrue(decodedPacket instanceof CancelDeliveriesMessage);
+      CancelDeliveriesMessage decodedMessage = (CancelDeliveriesMessage) decodedPacket;
+      assertEquals(MSG_CANCELDELIVERIES, decodedMessage.getType());
+      assertSameCancels(message.getCancels(), decodedMessage.getCancels());
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -459,8 +604,11 @@
       ProtocolDecoder decoder = new PacketCodecFactory().getDecoder(session);
       decoder.decode(session, buffer, session.getDecoderOutput());
 
+      session.getDecoderOutput().flush();
+
       Object o = session.getDecoderOutputQueue().poll();
 
+      assertNotNull(o);
       assertTrue(o instanceof AbstractPacket);
 
       return (AbstractPacket) o;




More information about the jboss-cvs-commits mailing list