[jboss-cvs] JBoss Messaging SVN: r3348 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/server/endpoint and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 20 10:22:55 EST 2007
Author: jmesnil
Date: 2007-11-20 10:22:54 -0500 (Tue, 20 Nov 2007)
New Revision: 3348
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveriesMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveryMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDRequestCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveriesMessage.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveryMessage.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDRequest.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDResponse.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/RecoverDeliveriesMessage.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/SetClientIDMessage.java
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
* added packets & codecs for the new wireformat to pass tests in ConnectionFactoryTest & AcknowledgementTest
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -43,21 +43,21 @@
import org.jboss.jms.tx.ResourceManagerFactory;
import org.jboss.jms.tx.TransactionRequest;
import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest;
-import org.jboss.jms.wireformat.ConnectionGetClientIDRequest;
import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsRequest;
-import org.jboss.jms.wireformat.ConnectionSetClientIDRequest;
import org.jboss.jms.wireformat.ConnectionStartRequest;
import org.jboss.jms.wireformat.ConnectionStopRequest;
import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.Client;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
import org.jboss.messaging.util.Version;
@@ -89,8 +89,6 @@
private transient Version versionToUse;
- private Client newclient;
-
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
@@ -143,7 +141,7 @@
client = thisState.getRemotingConnection().getRemotingClient();
- newclient = thisState.getRemotingConnection().getNewRemotingClient();
+ newClient = thisState.getRemotingConnection().getNewRemotingClient();
serverID = newDelegate.getServerID();
}
@@ -154,29 +152,36 @@
// client = ((ConnectionState)state).getRemotingConnection().getRemotingClient();
- newclient = ((ConnectionState)state).getRemotingConnection().getNewRemotingClient();
+ newClient = ((ConnectionState)state).getRemotingConnection().getNewRemotingClient();
}
// Closeable implementation ---------------------------------------------------------------------
public void close() throws JMSException
{
- assert newclient != null;
+ assert newClient != null;
CloseMessage message = new CloseMessage();
message.setTargetID(id);
- newclient.sendOneWay(message);
+ try
+ {
+ newClient.sendBlocking(message);
+ } catch (TimeoutException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
public long closing(long sequence) throws JMSException
{
- assert newclient != null;
+ assert newClient != null;
try
{
org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(sequence);
request.setTargetID(id);
- ClosingResponse response = (ClosingResponse) newclient.sendBlocking(request);
+ ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
return response.getID();
} catch (TimeoutException e)
{
@@ -204,13 +209,13 @@
int acknowledgmentMode,
boolean isXA) throws JMSException
{
- assert newclient != null;
+ assert newClient != null;
CreateSessionRequest request = new CreateSessionRequest(transacted, acknowledgmentMode, isXA);
request.setTargetID(id);
try {
- CreateSessionResponse response = (CreateSessionResponse) newclient.sendBlocking(request);
+ CreateSessionResponse response = (CreateSessionResponse) newClient.sendBlocking(request);
ClientSessionDelegate delegate = new ClientSessionDelegate(response.getSessionID(), response.getDupsOKBatchSize(), response.isStrictTCK());
return delegate;
} catch (Exception e) {
@@ -227,9 +232,23 @@
public String getClientID() throws JMSException
{
- RequestSupport req = new ConnectionGetClientIDRequest(id, version);
+ assert newClient != null;
- return (String)doInvoke(client, req);
+ try
+ {
+ GetClientIDRequest request = new GetClientIDRequest();
+ request.setTargetID(id);
+ GetClientIDResponse response = (GetClientIDResponse) newClient.sendBlocking(request);
+ return response.getClientID();
+ } catch (TimeoutException e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+
+// RequestSupport req = new ConnectionGetClientIDRequest(id, version);
+//
+// return (String)doInvoke(client, req);
}
/**
@@ -253,11 +272,18 @@
public void sendTransaction(TransactionRequest tr,
boolean checkForDuplicates) throws JMSException
{
- assert newclient != null;
+ assert newClient != null;
- SendTransactionMessage request = new SendTransactionMessage(tr, checkForDuplicates);
- request.setTargetID(id);
- newclient.sendOneWay(request);
+ try
+ {
+ SendTransactionMessage request = new SendTransactionMessage(tr, checkForDuplicates);
+ request.setTargetID(id);
+ newClient.sendBlocking(request);
+ } catch (TimeoutException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
// RequestSupport req =
// new ConnectionSendTransactionRequest(id, version, request, checkForDuplicates);
//
@@ -266,9 +292,20 @@
public void setClientID(String clientID) throws JMSException
{
- RequestSupport req = new ConnectionSetClientIDRequest(id, version, clientID);
+ assert newClient != null;
- doInvoke(client, req);
+ try
+ {
+ SetClientIDMessage message = new SetClientIDMessage(clientID);
+ message.setTargetID(id);
+ newClient.sendBlocking(message);
+ } catch (TimeoutException e)
+ {
+ e.printStackTrace();
+ }
+// RequestSupport req = new ConnectionSetClientIDRequest(id, version, clientID);
+//
+// doInvoke(client, req);
}
/**
@@ -282,11 +319,11 @@
public void start() throws JMSException
{
- assert newclient != null;
+ assert newClient != null;
StartConnectionMessage message = new StartConnectionMessage();
message.setTargetID(id);
- newclient.sendOneWay(message);
+ newClient.sendOneWay(message);
// RequestSupport req = new ConnectionStartRequest(id, version);
//
// doInvoke(client, req);
@@ -331,13 +368,13 @@
public IDBlock getIdBlock(int size) throws JMSException
{
- assert newclient != null;
+ assert newClient != null;
IDBlockRequest request = new IDBlockRequest(size);
request.setTargetID(id);
try
{
- IDBlockResponse response = (IDBlockResponse) newclient.sendBlocking(request);
+ IDBlockResponse response = (IDBlockResponse) newClient.sendBlocking(request);
return new IDBlock(response.getLow(), response.getHigh());
} catch (TimeoutException e)
{
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -25,6 +25,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.util.concurrent.TimeoutException;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -41,6 +42,8 @@
import org.jboss.messaging.core.remoting.Client;
import org.jboss.messaging.core.remoting.Constants;
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
/**
* The client-side Consumer delegate class.
@@ -128,16 +131,42 @@
public void close() throws JMSException
{
- RequestSupport req = new CloseRequest(id, version);
+ assert newClient != null;
+
+ CloseMessage message = new CloseMessage();
+ message.setTargetID(id);
+ try
+ {
+ newClient.sendBlocking(message);
+ } catch (TimeoutException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
- doInvoke(client, req);
+// RequestSupport req = new CloseRequest(id, version);
+//
+// doInvoke(client, req);
}
public long closing(long sequence) throws JMSException
{
- RequestSupport req = new ClosingRequest(sequence, id, version);
+ assert newClient != null;
- return ((Long)doInvoke(client, req)).longValue();
+ try
+ {
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(sequence);
+ request.setTargetID(id);
+ ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
+ return response.getID();
+ } catch (TimeoutException e)
+ {
+ e.printStackTrace();
+ return -1;
+ }
+// RequestSupport req = new ClosingRequest(sequence, id, version);
+//
+// return ((Long)doInvoke(client, req)).longValue();
}
// ConsumerDelegate implementation --------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -52,26 +52,26 @@
import org.jboss.jms.message.ObjectMessageProxy;
import org.jboss.jms.message.StreamMessageProxy;
import org.jboss.jms.message.TextMessageProxy;
-import org.jboss.jms.wireformat.CloseRequest;
-import org.jboss.jms.wireformat.ClosingRequest;
import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.jms.wireformat.SessionAddTemporaryDestinationRequest;
-import org.jboss.jms.wireformat.SessionCancelDeliveriesRequest;
-import org.jboss.jms.wireformat.SessionCancelDeliveryRequest;
import org.jboss.jms.wireformat.SessionCreateBrowserDelegateRequest;
import org.jboss.jms.wireformat.SessionCreateQueueRequest;
import org.jboss.jms.wireformat.SessionCreateTopicRequest;
import org.jboss.jms.wireformat.SessionDeleteTemporaryDestinationRequest;
-import org.jboss.jms.wireformat.SessionRecoverDeliveriesRequest;
import org.jboss.jms.wireformat.SessionUnsubscribeRequest;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
/**
@@ -162,16 +162,45 @@
public void close() throws JMSException
{
- RequestSupport req = new CloseRequest(id, version);
-
- doInvoke(client, req);
+ assert newClient != null;
+
+ CloseMessage message = new CloseMessage();
+ message.setTargetID(id);
+ try
+ {
+ newClient.sendBlocking(message);
+ } catch (TimeoutException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+//
+// RequestSupport req = new CloseRequest(id, version);
+//
+// doInvoke(client, req);
}
public long closing(long sequence) throws JMSException
- {
- RequestSupport req = new ClosingRequest(((SessionState)state).getNPSendSequence(), id, version);
+ {
+ assert newClient != null;
- return ((Long)doInvoke(client, req)).longValue();
+ long seq = ((SessionState)state).getNPSendSequence();
+
+ try
+ {
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(seq);
+ request.setTargetID(id);
+ ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
+ return response.getID();
+ } catch (TimeoutException e)
+ {
+ e.printStackTrace();
+ return -1;
+ }
+
+// RequestSupport req = new ClosingRequest(((SessionState)state).getNPSendSequence(), id, version);
+//
+// return ((Long)doInvoke(client, req)).longValue();
}
// SessionDelegate implementation ---------------------------------------------------------------
@@ -529,7 +558,12 @@
SendMessage message = new SendMessage(m, checkForDuplicates, seq);
message.setTargetID(id);
- newClient.sendOneWay(message);
+ if (seq == -1)
+ {
+ newClient.sendBlocking(message);
+ } else {
+ newClient.sendOneWay(message);
+ }
} catch (Exception e)
{
// TODO Auto-generated catch block
@@ -551,23 +585,62 @@
public void cancelDeliveries(List cancels) throws JMSException
{
- RequestSupport req = new SessionCancelDeliveriesRequest(id, version, cancels);
-
- doInvoke(client, req);
+ assert newClient != null;
+
+ CancelDeliveriesMessage message = new CancelDeliveriesMessage(cancels);
+ message.setTargetID(id);
+
+ try
+ {
+ newClient.sendBlocking(message);
+ } catch (TimeoutException e)
+ {
+ e.printStackTrace();
+ }
+
+// RequestSupport req = new SessionCancelDeliveriesRequest(id, version, cancels);
+//
+// doInvoke(client, req);
}
public void cancelDelivery(Cancel cancel) throws JMSException
{
- RequestSupport req = new SessionCancelDeliveryRequest(id, version, cancel);
-
- doInvoke(client, req);
+ assert newClient != null;
+
+ CancelDeliveryMessage message = new CancelDeliveryMessage(cancel);
+ message.setTargetID(id);
+
+ try
+ {
+ newClient.sendBlocking(message);
+ } catch (TimeoutException e)
+ {
+ e.printStackTrace();
+ }
+
+// RequestSupport req = new SessionCancelDeliveryRequest(id, version, cancel);
+//
+// doInvoke(client, req);
}
- public void recoverDeliveries(List acks, String sessionID) throws JMSException
+ public void recoverDeliveries(List deliveries, String sessionID) throws JMSException
{
- RequestSupport req = new SessionRecoverDeliveriesRequest(id, version, acks, sessionID);
-
- doInvoke(client, req);
+ assert newClient != null;
+
+ try
+ {
+ RecoverDeliveriesMessage message = new RecoverDeliveriesMessage(deliveries, sessionID);
+ message.setTargetID(id);
+ newClient.sendBlocking(message);
+ } catch (TimeoutException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+// RequestSupport req = new SessionRecoverDeliveriesRequest(id, version, acks, sessionID);
+//
+// doInvoke(client, req);
}
// Streamable overrides -------------------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -23,9 +23,11 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDTRANSACTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_IDBLOCK;
import java.util.ArrayList;
@@ -77,10 +79,13 @@
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.messaging.util.Util;
@@ -877,10 +882,29 @@
} else if (type == MSG_CLOSE)
{
close();
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
} else if (type == MSG_SENDTRANSACTION)
{
SendTransactionMessage message = (SendTransactionMessage) packet;
sendTransaction(message.getTransactionRequest(), message.checkForDuplicates());
+ NullPacket response = new NullPacket();
+ response.normalize(message);
+ session.write(response);
+ } else if (type == REQ_GETCLIENTID)
+ {
+ GetClientIDResponse response = new GetClientIDResponse(getClientID());
+ response.normalize(packet);
+ session.write(response);
+ } else if (type == MSG_SETCLIENTID)
+ {
+ SetClientIDMessage message = (SetClientIDMessage) packet;
+ setClientID(message.getClientID());
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
} else
{
log.error("Unsupported packet for connection: " + packet);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -22,6 +22,8 @@
package org.jboss.jms.server.endpoint;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CHANGERATE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
@@ -51,7 +53,9 @@
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.ExceptionUtil;
@@ -678,6 +682,20 @@
{
ChangeRateMessage message = (ChangeRateMessage) packet;
changeRate(message.getRate());
+ } else if (type == REQ_CLOSING)
+ {
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
+ long id = closing(request.getSequence());
+ ClosingResponse response = new ClosingResponse(id);
+ response.normalize(request);
+ session.write(response);
+ } else if (type == MSG_CLOSE)
+ {
+ close();
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
} else {
log.error("unsupported packet by server consumer endpoint: " + packet);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -22,8 +22,13 @@
package org.jboss.jms.server.endpoint;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
import java.lang.ref.WeakReference;
@@ -95,11 +100,15 @@
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.GUIDGenerator;
@@ -2437,6 +2446,12 @@
{
SendMessage message = (SendMessage) packet;
send(message.getMessage(), message.checkForDuplicates(), message.getSequence());
+ if (message.getSequence() == -1)
+ {
+ NullPacket response = new NullPacket();
+ response.normalize(message);
+ session.write(response);
+ }
} else if (type == REQ_CREATECONSUMER)
{
CreateConsumerRequest request = (CreateConsumerRequest) packet;
@@ -2459,6 +2474,44 @@
NullPacket p = new NullPacket();
p.normalize(message);
session.write(p);
+ } else if (type == MSG_RECOVERDELIVERIES)
+ {
+ RecoverDeliveriesMessage message = (RecoverDeliveriesMessage) packet;
+ recoverDeliveries(message.getDeliveries(), message.getSessionID());
+
+ NullPacket p = new NullPacket();
+ p.normalize(message);
+ session.write(p);
+ } else if (type == MSG_CANCELDELIVERY)
+ {
+ CancelDeliveryMessage message = (CancelDeliveryMessage) packet;
+ cancelDelivery(message.getCancel());
+
+ NullPacket p = new NullPacket();
+ p.normalize(message);
+ session.write(p);
+ } else if (type == MSG_CANCELDELIVERIES)
+ {
+ CancelDeliveriesMessage message = (CancelDeliveriesMessage) packet;
+ cancelDeliveries(message.getCancels());
+
+ NullPacket p = new NullPacket();
+ p.normalize(message);
+ session.write(p);
+ } else if (type == REQ_CLOSING)
+ {
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
+ long id = closing(request.getSequence());
+ ClosingResponse response = new ClosingResponse(id);
+ response.normalize(request);
+ session.write(response);
+ } else if (type == MSG_CLOSE)
+ {
+ close();
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
} else {
log.error("Unsupported packet for session: " + packet);
}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveriesMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveriesMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveriesMessageCodec.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.delegate.Cancel;
+import org.jboss.jms.delegate.DefaultCancel;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$F</tt>
+ */
+public class CancelDeliveriesMessageCodec extends
+ AbstractPacketCodec<CancelDeliveriesMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CancelDeliveriesMessageCodec()
+ {
+ super(MSG_CANCELDELIVERIES);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session,
+ CancelDeliveriesMessage message, IoBuffer out) throws Exception
+ {
+ List<Cancel> cancels = message.getCancels();
+
+ int numOfCancels = cancels.size();
+ int cancelLength = LONG_LENGTH + INT_LENGTH + 1 + 1; // 2 booleans
+ int bodyLength = INT_LENGTH + cancelLength * numOfCancels;
+
+ out.putInt(bodyLength);
+ out.putInt(numOfCancels);
+ for (Cancel cancel : cancels)
+ {
+ out.putLong(cancel.getDeliveryId());
+ out.putInt(cancel.getDeliveryCount());
+ putBoolean(out, cancel.isExpired());
+ putBoolean(out, cancel.isReachedMaxDeliveryAttempts());
+ }
+ }
+
+ @Override
+ protected CancelDeliveriesMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ int numOfCancels = in.getInt();
+ List<Cancel> cancels = new ArrayList<Cancel>(numOfCancels);
+ for (int i = 0; i < numOfCancels; i++)
+ {
+ long deliveryID = in.getLong();
+ int deliveryCount = in.getInt();
+ boolean expired = getBoolean(in);
+ boolean reachedMaxDeliveryAttempts = getBoolean(in);
+
+ cancels.add(new DefaultCancel(deliveryID, deliveryCount, expired,
+ reachedMaxDeliveryAttempts));
+ }
+
+ return new CancelDeliveriesMessage(cancels);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveryMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveryMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CancelDeliveryMessageCodec.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.delegate.Cancel;
+import org.jboss.jms.delegate.DefaultCancel;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class CancelDeliveryMessageCodec extends
+ AbstractPacketCodec<CancelDeliveryMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CancelDeliveryMessageCodec()
+ {
+ super(MSG_CANCELDELIVERY);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, CancelDeliveryMessage message,
+ IoBuffer out) throws Exception
+ {
+ long deliveryID = message.getCancel().getDeliveryId();
+ int deliveryCount = message.getCancel().getDeliveryCount();
+ boolean expired = message.getCancel().isExpired();
+ boolean reachedMaxDeliveryAttempts = message.getCancel()
+ .isReachedMaxDeliveryAttempts();
+
+ int bodyLength = LONG_LENGTH + INT_LENGTH + 2;
+
+ out.putInt(bodyLength);
+ out.putLong(deliveryID);
+ out.putInt(deliveryCount);
+ putBoolean(out, expired);
+ putBoolean(out, reachedMaxDeliveryAttempts);
+ }
+
+ @Override
+ protected CancelDeliveryMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ long deliveryID = in.getLong();
+ int deliveryCount = in.getInt();
+ boolean expired = getBoolean(in);
+ boolean reachedMaxDeliveryAttempts = getBoolean(in);
+
+ Cancel cancel = new DefaultCancel(deliveryID, deliveryCount, expired,
+ reachedMaxDeliveryAttempts);
+ return new CancelDeliveryMessage(cancel);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDRequestCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDRequestCodec.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class GetClientIDRequestCodec extends
+ AbstractPacketCodec<GetClientIDRequest>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public GetClientIDRequestCodec()
+ {
+ super(REQ_GETCLIENTID);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, GetClientIDRequest request,
+ IoBuffer out) throws Exception
+ {
+ // no body
+ out.putInt(0);
+ }
+
+ @Override
+ protected GetClientIDRequest decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ in.getInt(); // skip body length
+
+ return new GetClientIDRequest();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetClientIDResponseCodec.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETCLIENTID;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class GetClientIDResponseCodec extends
+ AbstractPacketCodec<GetClientIDResponse>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public GetClientIDResponseCodec()
+ {
+ super(RESP_GETCLIENTID);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, GetClientIDResponse response,
+ IoBuffer out) throws Exception
+ {
+ String clientID = response.getClientID();
+
+ out.putInt(sizeof(clientID));
+ putString(out, clientID);
+ }
+
+ @Override
+ protected GetClientIDResponse decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ String clientID = getString(in);
+
+ return new GetClientIDResponse(clientID);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -10,6 +10,8 @@
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
@@ -21,11 +23,15 @@
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
@@ -62,10 +68,6 @@
super.addMessageEncoder(CreateConnectionResponse.class,
ConnectionFactoryCreateConnectionResponseCodec.class);
- // super.addMessageDecoder(ClientDeliveryCodec.class);
- // super.addMessageEncoder(ClientDelivery.class,
- // ClientDeliveryCodec.class);
- //
super.addMessageDecoder(CreateSessionRequestCodec.class);
super.addMessageEncoder(CreateSessionRequest.class,
CreateSessionRequestCodec.class);
@@ -82,6 +84,18 @@
.addMessageEncoder(IDBlockResponse.class,
IDBlockResponseCodec.class);
+ super.addMessageDecoder(GetClientIDRequestCodec.class);
+ super.addMessageEncoder(GetClientIDRequest.class,
+ GetClientIDRequestCodec.class);
+
+ super.addMessageDecoder(GetClientIDResponseCodec.class);
+ super.addMessageEncoder(GetClientIDResponse.class,
+ GetClientIDResponseCodec.class);
+
+ super.addMessageDecoder(SetClientIDMessageCodec.class);
+ super.addMessageEncoder(SetClientIDMessage.class,
+ SetClientIDMessageCodec.class);
+
super.addMessageDecoder(SendMessageCodec.class);
super.addMessageEncoder(SendMessage.class, SendMessageCodec.class);
@@ -116,18 +130,33 @@
super.addMessageEncoder(AcknowledgeDeliveriesMessage.class,
AcknowledgeDeliveriesRequestCodec.class);
+ super.addMessageDecoder(RecoverDeliveriesMessageCodec.class);
+ super.addMessageEncoder(RecoverDeliveriesMessage.class,
+ RecoverDeliveriesMessageCodec.class);
+
+ super.addMessageDecoder(CancelDeliveryMessageCodec.class);
+ super.addMessageEncoder(CancelDeliveryMessage.class,
+ CancelDeliveryMessageCodec.class);
+
+ super.addMessageDecoder(CancelDeliveriesMessageCodec.class);
+ super.addMessageEncoder(CancelDeliveriesMessage.class,
+ CancelDeliveriesMessageCodec.class);
+
super.addMessageDecoder(ClosingRequestCodec.class);
super.addMessageEncoder(ClosingRequest.class, ClosingRequestCodec.class);
super.addMessageDecoder(ClosingResponseCodec.class);
- super.addMessageEncoder(ClosingResponse.class, ClosingResponseCodec.class);
+ super
+ .addMessageEncoder(ClosingResponse.class,
+ ClosingResponseCodec.class);
super.addMessageDecoder(CloseMessageCodec.class);
super.addMessageEncoder(CloseMessage.class, CloseMessageCodec.class);
-
+
super.addMessageDecoder(SendTransactionRequestCodec.class);
- super.addMessageEncoder(SendTransactionMessage.class, SendTransactionRequestCodec.class);
-}
+ super.addMessageEncoder(SendTransactionMessage.class,
+ SendTransactionRequestCodec.class);
+ }
// Public --------------------------------------------------------
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/RecoverDeliveriesMessageCodec.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,123 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.delegate.DeliveryRecovery;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class RecoverDeliveriesMessageCodec extends
+ AbstractPacketCodec<RecoverDeliveriesMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public RecoverDeliveriesMessageCodec()
+ {
+ super(PacketType.MSG_RECOVERDELIVERIES);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session,
+ RecoverDeliveriesMessage message, IoBuffer out) throws Exception
+ {
+ List<DeliveryRecovery> deliveries = message.getDeliveries();
+ String sessionID = message.getSessionID();
+ byte[] encodedDeliveries = encode(deliveries);
+
+ int bodyLength = INT_LENGTH + INT_LENGTH + encodedDeliveries.length
+ + sizeof(sessionID);
+
+ out.putInt(bodyLength);
+ out.putInt(deliveries.size());
+ out.putInt(encodedDeliveries.length);
+ out.put(encodedDeliveries);
+ putString(out, sessionID);
+ }
+
+ @Override
+ protected RecoverDeliveriesMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ int deliveriesSize = in.getInt();
+ int encodedDeliveriesSize = in.getInt();
+ byte[] encodedDeliveries = new byte[encodedDeliveriesSize];
+ in.get(encodedDeliveries);
+ List<DeliveryRecovery> deliveries = decode(deliveriesSize,
+ encodedDeliveries);
+ String sessionID = getString(in);
+
+ return new RecoverDeliveriesMessage(deliveries, sessionID);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private static byte[] encode(List<DeliveryRecovery> deliveries)
+ throws Exception
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ for (DeliveryRecovery delivery : deliveries)
+ {
+ delivery.write(dos);
+ }
+ baos.flush();
+ return baos.toByteArray();
+ }
+
+ private List<DeliveryRecovery> decode(int size, byte[] encodedDeliveries)
+ throws Exception
+ {
+ ByteArrayInputStream bais = new ByteArrayInputStream(encodedDeliveries);
+ DataInputStream dis = new DataInputStream(bais);
+
+ List<DeliveryRecovery> deliveries = new ArrayList<DeliveryRecovery>();
+ for (int i = 0; i < size; i++)
+ {
+ DeliveryRecovery delivery = new DeliveryRecovery();
+ delivery.read(dis);
+ deliveries.add(delivery);
+ }
+ return deliveries;
+ }
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SetClientIDMessageCodec.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class SetClientIDMessageCodec extends
+ AbstractPacketCodec<SetClientIDMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SetClientIDMessageCodec()
+ {
+ super(MSG_SETCLIENTID);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, SetClientIDMessage message,
+ IoBuffer out) throws Exception
+ {
+ String clientID = message.getClientID();
+
+ out.putInt(sizeof(clientID));
+ putString(out, clientID);
+ }
+
+ @Override
+ protected SetClientIDMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ String clientID = getString(in);
+
+ return new SetClientIDMessage(clientID);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveriesMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveriesMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveriesMessage.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,64 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+
+import java.util.List;
+
+import org.jboss.jms.delegate.Cancel;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class CancelDeliveriesMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final List<Cancel> cancels;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CancelDeliveriesMessage(List<Cancel> cancels)
+ {
+ super(MSG_CANCELDELIVERIES);
+
+ assert cancels != null;
+ assert cancels.size() > 0;
+
+ this.cancels = cancels;
+ }
+
+ // Public --------------------------------------------------------
+
+ public List<Cancel> getCancels()
+ {
+ return cancels;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", cancels=" + cancels + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveryMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveryMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CancelDeliveryMessage.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+
+import org.jboss.jms.delegate.Cancel;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class CancelDeliveryMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final Cancel cancel;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CancelDeliveryMessage(Cancel cancel)
+ {
+ super(MSG_CANCELDELIVERY);
+
+ assert cancel != null;
+
+ this.cancel = cancel;
+ }
+
+ // Public --------------------------------------------------------
+
+ public Cancel getCancel()
+ {
+ return cancel;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", cancel=" + cancel + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDRequest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDRequest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDRequest.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class GetClientIDRequest extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public GetClientIDRequest()
+ {
+ super(REQ_GETCLIENTID);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDResponse.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDResponse.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/GetClientIDResponse.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETCLIENTID;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class GetClientIDResponse extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String clientID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public GetClientIDResponse(String clientID)
+ {
+ super(RESP_GETCLIENTID);
+
+ assertValidID(clientID);
+
+ this.clientID = clientID;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getClientID()
+ {
+ return clientID;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", clientID=" + clientID + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -28,21 +28,27 @@
MSG_STARTCONNECTION((byte)10),
MSG_SENDTRANSACTION((byte)11),
RESP_SENDTRANSACTION((byte)12),
+ REQ_GETCLIENTID((byte)13),
+ RESP_GETCLIENTID((byte)14),
+ MSG_SETCLIENTID((byte)15),
// Session
- REQ_CREATECONSUMER((byte)13),
- RESP_CREATECONSUMER((byte)14),
- MSG_SENDMESSAGE((byte)15),
- MSG_DELIVERMESSAGE((byte)16),
- REQ_ACKDELIVERY((byte)17),
- RESP_ACKDELIVERY((byte)18),
- MSG_ACKDELIVERIES((byte)19),
- RESP_ACKDELIVERIES((byte)20),
+ REQ_CREATECONSUMER((byte)16),
+ RESP_CREATECONSUMER((byte)17),
+ MSG_SENDMESSAGE((byte)18),
+ MSG_DELIVERMESSAGE((byte)19),
+ REQ_ACKDELIVERY((byte)20),
+ RESP_ACKDELIVERY((byte)21),
+ MSG_ACKDELIVERIES((byte)22),
+ RESP_ACKDELIVERIES((byte)23),
+ MSG_RECOVERDELIVERIES((byte)24),
+ MSG_CANCELDELIVERY((byte)25),
+ MSG_CANCELDELIVERIES((byte)26),
// Consumer
- MSG_CHANGERATE((byte)21),
+ MSG_CHANGERATE((byte)27),
// Misc
- REQ_CLOSING((byte)22),
- RESP_CLOSING((byte)23),
- MSG_CLOSE((byte)24);
+ REQ_CLOSING((byte)28),
+ RESP_CLOSING((byte)29),
+ MSG_CLOSE((byte)30);
private byte type;
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/RecoverDeliveriesMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/RecoverDeliveriesMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/RecoverDeliveriesMessage.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
+
+import java.util.List;
+
+import org.jboss.jms.delegate.DeliveryRecovery;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class RecoverDeliveriesMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final List<DeliveryRecovery> deliveries;
+ private final String sessionID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public RecoverDeliveriesMessage(List<DeliveryRecovery> deliveries,
+ String sessionID)
+ {
+ super(MSG_RECOVERDELIVERIES);
+
+ assert deliveries != null;
+ assert deliveries.size() > 0;
+ assertValidID(sessionID);
+
+ this.deliveries = deliveries;
+ this.sessionID = sessionID;
+ }
+
+ // Public --------------------------------------------------------
+
+ public List<DeliveryRecovery> getDeliveries()
+ {
+ return deliveries;
+ }
+
+ public String getSessionID()
+ {
+ return sessionID;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", deliveries=" + deliveries + ", sessionID="
+ + sessionID + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/SetClientIDMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/SetClientIDMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/SetClientIDMessage.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class SetClientIDMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String clientID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SetClientIDMessage(String clientID)
+ {
+ super(MSG_SETCLIENTID);
+
+ assertValidID(clientID);
+
+ this.clientID = clientID;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getClientID()
+ {
+ return clientID;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", clientID=" + clientID + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-19 22:37:10 UTC (rev 3347)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-20 15:22:54 UTC (rev 3348)
@@ -6,12 +6,17 @@
*/
package org.jboss.test.messaging.core.remoting.wireformat;
+import static java.lang.System.currentTimeMillis;
import static java.util.UUID.randomUUID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CHANGERATE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELIVERMESSAGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDTRANSACTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
@@ -19,12 +24,14 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_IDBLOCK;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_ACKDELIVERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CLOSING;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATECONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATECONSUMER;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETCLIENTID;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_IDBLOCK;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.TEXT;
@@ -38,7 +45,10 @@
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.jboss.jms.delegate.Ack;
+import org.jboss.jms.delegate.Cancel;
import org.jboss.jms.delegate.DefaultAck;
+import org.jboss.jms.delegate.DefaultCancel;
+import org.jboss.jms.delegate.DeliveryRecovery;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.message.JBossMessage;
@@ -50,6 +60,8 @@
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
@@ -61,12 +73,16 @@
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
@@ -94,6 +110,41 @@
}
}
+ private static void assertSameDeliveries(List<DeliveryRecovery> expected,
+ List<DeliveryRecovery> actual)
+ {
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++)
+ {
+ DeliveryRecovery expectedDelivery = expected.get(i);
+ DeliveryRecovery actualDelivery = actual.get(i);
+ assertEquals(expectedDelivery.getDeliveryID(), actualDelivery
+ .getDeliveryID());
+ assertEquals(expectedDelivery.getMessageID(), actualDelivery
+ .getMessageID());
+ assertEquals(expectedDelivery.getQueueName(), actualDelivery
+ .getQueueName());
+ }
+ }
+
+ private static void assertSameCancels(List<Cancel> expected,
+ List<Cancel> actual)
+ {
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++)
+ {
+ Cancel expectedCancel = expected.get(i);
+ Cancel actualCancel = actual.get(i);
+ assertEquals(expectedCancel.getDeliveryId(), actualCancel
+ .getDeliveryId());
+ assertEquals(expectedCancel.getDeliveryCount(), actualCancel
+ .getDeliveryCount());
+ assertEquals(expectedCancel.isExpired(), actualCancel.isExpired());
+ assertEquals(expectedCancel.isReachedMaxDeliveryAttempts(),
+ actualCancel.isReachedMaxDeliveryAttempts());
+ }
+ }
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@@ -102,7 +153,7 @@
{
NullPacket packet = new NullPacket();
packet.setCallbackID(randomUUID().toString());
- packet.setCorrelationID(System.currentTimeMillis());
+ packet.setCorrelationID(currentTimeMillis());
packet.setTargetID(randomUUID().toString());
AbstractPacket decodedPacket = encodeAndDecode(packet);
@@ -217,8 +268,8 @@
public void testIDBlockResponse() throws Exception
{
- IDBlockResponse response = new IDBlockResponse(
- System.currentTimeMillis(), System.currentTimeMillis() * 2);
+ IDBlockResponse response = new IDBlockResponse(currentTimeMillis(),
+ currentTimeMillis() * 2);
AbstractPacket decodedPacket = encodeAndDecode(response);
@@ -233,7 +284,7 @@
public void testSendMessage() throws Exception
{
SendMessage packet = new SendMessage(new JBossMessage(System
- .currentTimeMillis()), true, System.currentTimeMillis());
+ .currentTimeMillis()), true, currentTimeMillis());
AbstractPacket p = encodeAndDecode(packet);
@@ -276,7 +327,7 @@
{
CreateConsumerResponse response = new CreateConsumerResponse(randomUUID()
- .toString(), 23, 42, System.currentTimeMillis());
+ .toString(), 23, 42, currentTimeMillis());
AbstractPacket decodedPacket = encodeAndDecode(response);
@@ -315,9 +366,9 @@
public void testDeliverMessage() throws Exception
{
- Message msg = new JBossMessage(System.currentTimeMillis());
+ Message msg = new JBossMessage(currentTimeMillis());
DeliverMessage message = new DeliverMessage(msg, randomUUID().toString(),
- System.currentTimeMillis(), 23);
+ currentTimeMillis(), 23);
AbstractPacket decodedPacket = encodeAndDecode(message);
@@ -335,7 +386,7 @@
public void testAcknowledgeDeliveryRequest() throws Exception
{
AcknowledgeDeliveryRequest request = new AcknowledgeDeliveryRequest(
- System.currentTimeMillis());
+ currentTimeMillis());
AbstractPacket decodedPacket = encodeAndDecode(request);
@@ -379,7 +430,7 @@
public void testClosingRequest() throws Exception
{
- ClosingRequest request = new ClosingRequest(System.currentTimeMillis());
+ ClosingRequest request = new ClosingRequest(currentTimeMillis());
AbstractPacket decodedPacket = encodeAndDecode(request);
@@ -418,19 +469,113 @@
TransactionRequest tr = new TransactionRequest(
TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
- SendTransactionMessage request = new SendTransactionMessage(tr, true);
+ SendTransactionMessage message = new SendTransactionMessage(tr, true);
- AbstractPacket decodedPacket = encodeAndDecode(request);
+ AbstractPacket decodedPacket = encodeAndDecode(message);
assertTrue(decodedPacket instanceof SendTransactionMessage);
- SendTransactionMessage decodedRequest = (SendTransactionMessage) decodedPacket;
- assertEquals(MSG_SENDTRANSACTION, decodedRequest.getType());
- assertEquals(request.getTransactionRequest().getRequestType(),
- decodedRequest.getTransactionRequest().getRequestType());
- assertEquals(request.checkForDuplicates(), decodedRequest
+ SendTransactionMessage decodedMessage = (SendTransactionMessage) decodedPacket;
+ assertEquals(MSG_SENDTRANSACTION, decodedMessage.getType());
+ assertEquals(message.getTransactionRequest().getRequestType(),
+ decodedMessage.getTransactionRequest().getRequestType());
+ assertEquals(message.checkForDuplicates(), decodedMessage
.checkForDuplicates());
}
+ public void testGetClientIDRequest() throws Exception
+ {
+ GetClientIDRequest request = new GetClientIDRequest();
+
+ AbstractPacket decodedPacket = encodeAndDecode(request);
+
+ assertTrue(decodedPacket instanceof GetClientIDRequest);
+ assertEquals(REQ_GETCLIENTID, decodedPacket.getType());
+ }
+
+ public void testGetClientIDResponse() throws Exception
+ {
+ GetClientIDResponse response = new GetClientIDResponse(randomUUID()
+ .toString());
+
+ AbstractPacket decodedPacket = encodeAndDecode(response);
+
+ assertTrue(decodedPacket instanceof GetClientIDResponse);
+ GetClientIDResponse decodedResponse = (GetClientIDResponse) decodedPacket;
+ assertEquals(RESP_GETCLIENTID, decodedResponse.getType());
+ assertEquals(response.getClientID(), decodedResponse.getClientID());
+ }
+
+ public void testSetClientIDMessage() throws Exception
+ {
+ SetClientIDMessage message = new SetClientIDMessage(randomUUID()
+ .toString());
+
+ AbstractPacket decodedPacket = encodeAndDecode(message);
+
+ assertTrue(decodedPacket instanceof SetClientIDMessage);
+ SetClientIDMessage decodedMessage = (SetClientIDMessage) decodedPacket;
+ assertEquals(MSG_SETCLIENTID, decodedMessage.getType());
+ assertEquals(message.getClientID(), decodedMessage.getClientID());
+ }
+
+ public void testRecoverDeliveriesMessage() throws Exception
+ {
+ List<DeliveryRecovery> deliveries = new ArrayList<DeliveryRecovery>();
+ deliveries.add(new DeliveryRecovery(currentTimeMillis(),
+ currentTimeMillis(), randomUUID().toString()));
+ deliveries.add(new DeliveryRecovery(currentTimeMillis(),
+ currentTimeMillis(), randomUUID().toString()));
+ deliveries.add(new DeliveryRecovery(currentTimeMillis(),
+ currentTimeMillis(), randomUUID().toString()));
+
+ RecoverDeliveriesMessage message = new RecoverDeliveriesMessage(
+ deliveries, randomUUID().toString());
+
+ AbstractPacket decodedPacket = encodeAndDecode(message);
+
+ assertTrue(decodedPacket instanceof RecoverDeliveriesMessage);
+ RecoverDeliveriesMessage decodedMessage = (RecoverDeliveriesMessage) decodedPacket;
+ assertEquals(MSG_RECOVERDELIVERIES, decodedMessage.getType());
+ assertSameDeliveries(message.getDeliveries(), decodedMessage
+ .getDeliveries());
+ assertEquals(message.getSessionID(), decodedMessage.getSessionID());
+ }
+
+ public void testCancelDeliveryMessage() throws Exception
+ {
+ Cancel cancel = new DefaultCancel(currentTimeMillis(), 23, true, false);
+ CancelDeliveryMessage message = new CancelDeliveryMessage(cancel);
+
+ AbstractPacket decodedPacket = encodeAndDecode(message);
+
+ assertTrue(decodedPacket instanceof CancelDeliveryMessage);
+ CancelDeliveryMessage decodedMessage = (CancelDeliveryMessage) decodedPacket;
+ assertEquals(MSG_CANCELDELIVERY, decodedMessage.getType());
+ Cancel expected = message.getCancel();
+ Cancel actual = decodedMessage.getCancel();
+ assertEquals(expected.getDeliveryId(), actual.getDeliveryId());
+ assertEquals(expected.getDeliveryCount(), actual.getDeliveryCount());
+ assertEquals(expected.isExpired(), actual.isExpired());
+ assertEquals(expected.isReachedMaxDeliveryAttempts(), actual
+ .isReachedMaxDeliveryAttempts());
+ }
+
+ public void testCancelDeliveriesMessage() throws Exception
+ {
+ List<Cancel> cancels = new ArrayList<Cancel>();
+ cancels.add(new DefaultCancel(currentTimeMillis(), 23, true, false));
+ cancels.add(new DefaultCancel(currentTimeMillis(), 33, false, true));
+
+ CancelDeliveriesMessage message = new CancelDeliveriesMessage(cancels);
+
+ AbstractPacket decodedPacket = encodeAndDecode(message);
+
+ assertTrue(decodedPacket instanceof CancelDeliveriesMessage);
+ CancelDeliveriesMessage decodedMessage = (CancelDeliveriesMessage) decodedPacket;
+ assertEquals(MSG_CANCELDELIVERIES, decodedMessage.getType());
+ assertSameCancels(message.getCancels(), decodedMessage.getCancels());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -459,8 +604,11 @@
ProtocolDecoder decoder = new PacketCodecFactory().getDecoder(session);
decoder.decode(session, buffer, session.getDecoderOutput());
+ session.getDecoderOutput().flush();
+
Object o = session.getDecoderOutputQueue().poll();
+ assertNotNull(o);
assertTrue(o instanceof AbstractPacket);
return (AbstractPacket) o;
More information about the jboss-cvs-commits
mailing list