[jboss-cvs] JBoss Messaging SVN: r3359 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/client/delegate and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 22 11:49:46 EST 2007
Author: jmesnil
Date: 2007-11-22 11:49:46 -0500 (Thu, 22 Nov 2007)
New Revision: 3359
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddTemporaryDestinationMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DeleteTemporaryDestinationMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UnsubscribeMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AddTemporaryDestinationMessage.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/DeleteTemporaryDestinationMessage.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UnsubscribeMessage.java
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/StateCreationAspect.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -31,6 +31,7 @@
import org.jboss.jms.client.delegate.ClientProducerDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.BrowserState;
import org.jboss.jms.client.state.ConnectionState;
@@ -98,11 +99,11 @@
// connection closing by ConnectionAspect
// FIXME get rif of JBR code
-// ConsolidatedRemotingConnectionListener listener =
-// new ConsolidatedRemotingConnectionListener();
-//
-// remotingConnection.addConnectionListener(listener);
+ ConsolidatedRemotingConnectionListener listener =
+ new ConsolidatedRemotingConnectionListener();
+ remotingConnection.addConnectionListener(listener);
+
if (versionToUse == null)
{
throw new IllegalStateException("Connection version is null");
@@ -116,7 +117,7 @@
new ConnectionState(serverID, connectionDelegate,
remotingConnection, versionToUse, idGenerator);
- // listener.setConnectionState(connectionState);
+ listener.setConnectionState(connectionState);
connectionDelegate.setState(connectionState);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -51,14 +51,11 @@
import org.jboss.jms.message.ObjectMessageProxy;
import org.jboss.jms.message.StreamMessageProxy;
import org.jboss.jms.message.TextMessageProxy;
-import org.jboss.jms.wireformat.RequestSupport;
-import org.jboss.jms.wireformat.SessionAddTemporaryDestinationRequest;
-import org.jboss.jms.wireformat.SessionDeleteTemporaryDestinationRequest;
-import org.jboss.jms.wireformat.SessionUnsubscribeRequest;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
@@ -69,8 +66,10 @@
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
+import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
/**
* The client-side Session delegate class.
@@ -216,9 +215,11 @@
public void addTemporaryDestination(JBossDestination destination) throws JMSException
{
- RequestSupport req = new SessionAddTemporaryDestinationRequest(id, version, destination);
-
- doInvoke(client, req);
+ sendBlocking(new AddTemporaryDestinationMessage(destination));
+
+// RequestSupport req = new SessionAddTemporaryDestinationRequest(id, version, destination);
+//
+// doInvoke(client, req);
}
/**
@@ -371,9 +372,11 @@
public void deleteTemporaryDestination(JBossDestination destination) throws JMSException
{
- RequestSupport req = new SessionDeleteTemporaryDestinationRequest(id, version, destination);
-
- doInvoke(client, req);
+ sendBlocking(new DeleteTemporaryDestinationMessage(destination));
+
+// RequestSupport req = new SessionDeleteTemporaryDestinationRequest(id, version, destination);
+//
+// doInvoke(client, req);
}
/**
@@ -450,9 +453,11 @@
public void unsubscribe(String subscriptionName) throws JMSException
{
- RequestSupport req = new SessionUnsubscribeRequest(id, version, subscriptionName);
-
- doInvoke(client, req);
+ sendBlocking(new UnsubscribeMessage(subscriptionName));
+
+// RequestSupport req = new SessionUnsubscribeRequest(id, version, subscriptionName);
+//
+// doInvoke(client, req);
}
/**
@@ -511,22 +516,8 @@
sstate.incNpSendSequence();
}
- try
- {
- SendMessage message = new SendMessage(m, checkForDuplicates, seq);
- message.setTargetID(id);
-
- if (seq == -1)
- {
- newClient.sendBlocking(message);
- } else {
- newClient.sendOneWay(message);
- }
- } catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ SendMessage message = new SendMessage(m, checkForDuplicates, seq);
+ sendBlocking(message);
// RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, seq);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -21,16 +21,6 @@
*/
package org.jboss.jms.server.endpoint;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDTRANSACTION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STOPCONNECTION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATESESSION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_IDBLOCK;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -45,7 +35,6 @@
import javax.jms.JMSException;
import javax.jms.Session;
-import org.apache.mina.common.IoSession;
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.remoting.CallbackManager;
@@ -75,18 +64,6 @@
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
-import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
-import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
-import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
-import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
-import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
-import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
-import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.messaging.util.Util;
@@ -291,7 +268,7 @@
Dispatcher.instance.registerTarget(sessionID, sessionAdvised);
- PacketDispatcher.server.register(ep.new ServerSessionEndpointPacketHandler());
+ PacketDispatcher.server.register(advised.new SessionAdvisedPacketHandler(sessionID));
log.trace("created and registered " + ep);
@@ -843,84 +820,4 @@
// Inner classes --------------------------------------------------------------------------------
- class ServerConnectionEndpointPacketHandler implements PacketHandler
- {
-
- public String getID()
- {
- return id;
- }
-
- public void handle(AbstractPacket packet, IoSession session)
- {
- try
- {
- PacketType type = packet.getType();
- if (type == REQ_CREATESESSION)
- {
- CreateSessionRequest request = (CreateSessionRequest) packet;
- ClientSessionDelegate sessionDelegate = (ClientSessionDelegate) createSessionDelegate(request.isTransacted(), request.getAcknowledgementMode(), request.isXA());
- CreateSessionResponse response = new CreateSessionResponse(sessionDelegate.getID(), sessionDelegate.getDupsOKBatchSize(), sessionDelegate.isStrictTck());
- response.normalize(request);
- session.write(response);
- } else if (type == REQ_IDBLOCK)
- {
- IDBlockRequest request = (IDBlockRequest) packet;
- IDBlock idBlock = getIdBlock(request.getSize());
- IDBlockResponse response = new IDBlockResponse(idBlock.getLow(), idBlock.getHigh());
- response.normalize(request);
- session.write(response);
- } else if (type == MSG_STARTCONNECTION)
- {
- start();
- } else if (type == MSG_STOPCONNECTION)
- {
- stop();
-
- NullPacket response = new NullPacket();
- response.normalize(packet);
- session.write(response);
- } else if (type == REQ_CLOSING)
- {
- org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
- long id = closing(request.getSequence());
- ClosingResponse response = new ClosingResponse(id);
- response.normalize(request);
- session.write(response);
- } else if (type == MSG_CLOSE)
- {
- close();
-
- NullPacket response = new NullPacket();
- response.normalize(packet);
- session.write(response);
- } else if (type == MSG_SENDTRANSACTION)
- {
- SendTransactionMessage message = (SendTransactionMessage) packet;
- sendTransaction(message.getTransactionRequest(), message.checkForDuplicates());
- NullPacket response = new NullPacket();
- response.normalize(message);
- session.write(response);
- } else if (type == REQ_GETCLIENTID)
- {
- GetClientIDResponse response = new GetClientIDResponse(getClientID());
- response.normalize(packet);
- session.write(response);
- } else if (type == MSG_SETCLIENTID)
- {
- SetClientIDMessage message = (SetClientIDMessage) packet;
- setClientID(message.getClientID());
- NullPacket response = new NullPacket();
- response.normalize(packet);
- session.write(response);
- } else
- {
- log.error("Unsupported packet for connection: " + packet);
- }
- } catch (JMSException e)
- {
- e.printStackTrace();
- }
- }
- }
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -21,13 +21,10 @@
*/
package org.jboss.jms.server.endpoint;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
-
import java.util.Map;
import javax.jms.JMSException;
-import org.apache.mina.common.IoSession;
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
@@ -41,11 +38,6 @@
import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
-import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
@@ -270,7 +262,7 @@
Dispatcher.instance.registerTarget(connectionID, connAdvised);
- PacketDispatcher.server.register(endpoint.new ServerConnectionEndpointPacketHandler());
+ PacketDispatcher.server.register(connAdvised.new ConnectionAdvisedPacketHandler(connectionID));
log.trace("created and registered " + endpoint);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -55,6 +55,7 @@
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.ExceptionUtil;
@@ -701,8 +702,9 @@
}
} catch (JMSException e)
{
- // TODO Auto-generated catch block
- e.printStackTrace();
+ JMSExceptionMessage message = new JMSExceptionMessage(e);
+ message.normalize(packet);
+ session.write(message);
}
}
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -22,11 +22,14 @@
package org.jboss.jms.server.endpoint;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
@@ -102,6 +105,7 @@
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
@@ -111,12 +115,14 @@
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
+import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.messaging.util.MessageQueueNameHelper;
@@ -2438,119 +2444,4 @@
delList.add(deliveryId);
}
}
-
- public class ServerSessionEndpointPacketHandler implements PacketHandler {
-
- public String getID()
- {
- return id;
- }
-
- public void handle(AbstractPacket packet, IoSession session)
- {
- try
- {
- PacketType type = packet.getType();
- if (type == MSG_SENDMESSAGE)
- {
- SendMessage message = (SendMessage) packet;
- send(message.getMessage(), message.checkForDuplicates(), message.getSequence());
- if (message.getSequence() == -1)
- {
- NullPacket response = new NullPacket();
- response.normalize(message);
- session.write(response);
- }
- } else if (type == REQ_CREATECONSUMER)
- {
- CreateConsumerRequest request = (CreateConsumerRequest) packet;
- ClientConsumerDelegate consumer = (ClientConsumerDelegate) createConsumerDelegate(request.getDestination(), request.getSelector(), request.isNoLocal(), request.getSubscriptionName(), request.isConnectionConsumer(), request.isAutoFlowControl());
- CreateConsumerResponse response = new CreateConsumerResponse(consumer.getID(), consumer.getBufferSize(), consumer.getMaxDeliveries(), consumer.getRedeliveryDelay());
- response.normalize(request);
- session.write(response);
- } else if (type == REQ_CREATEDESTINATION)
- {
- CreateDestinationRequest request = (CreateDestinationRequest) packet;
- JBossDestination destination;
- if (request.isQueue())
- {
- destination = createQueue(request.getName());
- } else
- {
- destination = createTopic(request.getName());
- }
- CreateDestinationResponse response = new CreateDestinationResponse(destination);
- response.normalize(request);
- session.write(response);
- } else if (type == REQ_CREATEBROWSER)
- {
- CreateBrowserRequest request = (CreateBrowserRequest) packet;
- ClientBrowserDelegate browser = (ClientBrowserDelegate) createBrowserDelegate(request.getDestination(), request.getSelector());
- CreateBrowserResponse response = new CreateBrowserResponse(browser.getID());
- response.normalize(request);
- session.write(response);
- } else if (type == REQ_ACKDELIVERY)
- {
- AcknowledgeDeliveryRequest request = (AcknowledgeDeliveryRequest) packet;
- boolean acknowledged = acknowledgeDelivery(new DefaultAck(request.getDeliveryID()));
- AcknowledgeDeliveryResponse response = new AcknowledgeDeliveryResponse(acknowledged);
- response.normalize(request);
- session.write(response);
- } else if (type == MSG_ACKDELIVERIES)
- {
- AcknowledgeDeliveriesMessage message = (AcknowledgeDeliveriesMessage) packet;
- acknowledgeDeliveries(message.getAcks());
-
- NullPacket p = new NullPacket();
- p.normalize(message);
- session.write(p);
- } else if (type == MSG_RECOVERDELIVERIES)
- {
- RecoverDeliveriesMessage message = (RecoverDeliveriesMessage) packet;
- recoverDeliveries(message.getDeliveries(), message.getSessionID());
-
- NullPacket p = new NullPacket();
- p.normalize(message);
- session.write(p);
- } else if (type == MSG_CANCELDELIVERY)
- {
- CancelDeliveryMessage message = (CancelDeliveryMessage) packet;
- cancelDelivery(message.getCancel());
-
- NullPacket p = new NullPacket();
- p.normalize(message);
- session.write(p);
- } else if (type == MSG_CANCELDELIVERIES)
- {
- CancelDeliveriesMessage message = (CancelDeliveriesMessage) packet;
- cancelDeliveries(message.getCancels());
-
- NullPacket p = new NullPacket();
- p.normalize(message);
- session.write(p);
- } else if (type == REQ_CLOSING)
- {
- org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
- long id = closing(request.getSequence());
- ClosingResponse response = new ClosingResponse(id);
- response.normalize(request);
- session.write(response);
- } else if (type == MSG_CLOSE)
- {
- close();
-
- NullPacket response = new NullPacket();
- response.normalize(packet);
- session.write(response);
- } else {
- log.error("Unsupported packet for session: " + packet);
- }
- } catch (JMSException e)
- {
- JMSExceptionMessage message = new JMSExceptionMessage(e);
- message.normalize(packet);
- session.write(message);
- }
- }
- }
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -21,13 +21,38 @@
*/
package org.jboss.jms.server.endpoint.advised;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDTRANSACTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STOPCONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_IDBLOCK;
+
import javax.jms.JMSException;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.jms.delegate.IDBlock;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.tx.MessagingXid;
import org.jboss.jms.tx.TransactionRequest;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
+import org.jboss.messaging.core.remoting.wireformat.IDBlockRequest;
+import org.jboss.messaging.core.remoting.wireformat.IDBlockResponse;
+import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
/**
* The server-side advised instance corresponding to a Connection. It is bound to the AOP
@@ -130,4 +155,94 @@
// Private -------------------------------------------------------
// Inner Classes -------------------------------------------------
+
+ public class ConnectionAdvisedPacketHandler implements PacketHandler
+ {
+
+ private String id;
+
+ public ConnectionAdvisedPacketHandler(String id)
+ {
+ this.id = id;
+ }
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public void handle(AbstractPacket packet, IoSession session)
+ {
+ try
+ {
+ PacketType type = packet.getType();
+ if (type == REQ_CREATESESSION)
+ {
+ CreateSessionRequest request = (CreateSessionRequest) packet;
+ ClientSessionDelegate sessionDelegate = (ClientSessionDelegate) createSessionDelegate(request.isTransacted(), request.getAcknowledgementMode(), request.isXA());
+ CreateSessionResponse response = new CreateSessionResponse(sessionDelegate.getID(), sessionDelegate.getDupsOKBatchSize(), sessionDelegate.isStrictTck());
+ response.normalize(request);
+ session.write(response);
+ } else if (type == REQ_IDBLOCK)
+ {
+ IDBlockRequest request = (IDBlockRequest) packet;
+ IDBlock idBlock = getIdBlock(request.getSize());
+ IDBlockResponse response = new IDBlockResponse(idBlock.getLow(), idBlock.getHigh());
+ response.normalize(request);
+ session.write(response);
+ } else if (type == MSG_STARTCONNECTION)
+ {
+ start();
+ } else if (type == MSG_STOPCONNECTION)
+ {
+ stop();
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
+ } else if (type == REQ_CLOSING)
+ {
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
+ long id = closing(request.getSequence());
+ ClosingResponse response = new ClosingResponse(id);
+ response.normalize(request);
+ session.write(response);
+ } else if (type == MSG_CLOSE)
+ {
+ close();
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
+ } else if (type == MSG_SENDTRANSACTION)
+ {
+ SendTransactionMessage message = (SendTransactionMessage) packet;
+ sendTransaction(message.getTransactionRequest(), message.checkForDuplicates());
+ NullPacket response = new NullPacket();
+ response.normalize(message);
+ session.write(response);
+ } else if (type == REQ_GETCLIENTID)
+ {
+ GetClientIDResponse response = new GetClientIDResponse(getClientID());
+ response.normalize(packet);
+ session.write(response);
+ } else if (type == MSG_SETCLIENTID)
+ {
+ SetClientIDMessage message = (SetClientIDMessage) packet;
+ setClientID(message.getClientID());
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
+ } else
+ {
+ System.err.println("Unsupported packet for connection: " + packet);
+ }
+ } catch (JMSException e)
+ {
+ JMSExceptionMessage message = new JMSExceptionMessage(e);
+ message.normalize(packet);
+ session.write(message);
+ }
+ }
+ }
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -36,6 +36,7 @@
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
@@ -162,27 +163,28 @@
public void handle(AbstractPacket packet, IoSession session)
{
- PacketType type = packet.getType();
- if (type == REQ_CREATECONNECTION)
+ try
{
- CreateConnectionRequest request = (CreateConnectionRequest) packet;
- try
+ PacketType type = packet.getType();
+ if (type == REQ_CREATECONNECTION)
{
+ CreateConnectionRequest request = (CreateConnectionRequest) packet;
CreateConnectionResult del =
createConnectionDelegate(request.getUsername(), request.getPassword(), request.getFailedNodeID(), null, request.getClientVMID(), request.getVersion(), null);
CreateConnectionResponse response = new CreateConnectionResponse(del.getDelegate().getID(), del.getDelegate().getServerID());
response.normalize(request);
session.write(response);
- } catch (Exception e)
+
+ } else
{
- // TODO Auto-generated catch block
- e.printStackTrace();
+ System.err.println("unhandled packet:" + packet);
}
-
- } else
+ } catch (JMSException e)
{
- System.err.println("unhandled packet:" + packet);
+ JMSExceptionMessage message = new JMSExceptionMessage(e);
+ message.normalize(packet);
+ session.write(message);
}
}
+ }
}
-}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -21,14 +21,33 @@
*/
package org.jboss.jms.server.endpoint.advised;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
+
import java.util.List;
import javax.jms.JMSException;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.client.delegate.ClientBrowserDelegate;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.delegate.Ack;
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.Cancel;
import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.delegate.DefaultAck;
import org.jboss.jms.delegate.SessionEndpoint;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
@@ -36,6 +55,28 @@
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.jms.server.endpoint.SessionInternalEndpoint;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
+import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
+import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.SendMessage;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
/**
* The server-side advised instance corresponding to a Session. It is bound to the AOP
@@ -172,4 +213,147 @@
// Inner Classes -------------------------------------------------
+ public class SessionAdvisedPacketHandler implements PacketHandler {
+
+ private String id;
+
+ public SessionAdvisedPacketHandler(String id)
+ {
+ this.id = id;
+ }
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public void handle(AbstractPacket packet, IoSession session)
+ {
+ try
+ {
+ PacketType type = packet.getType();
+ if (type == MSG_SENDMESSAGE)
+ {
+ SendMessage message = (SendMessage) packet;
+ send(message.getMessage(), message.checkForDuplicates(), message.getSequence());
+
+ NullPacket response = new NullPacket();
+ response.normalize(message);
+ session.write(response);
+ } else if (type == REQ_CREATECONSUMER)
+ {
+ CreateConsumerRequest request = (CreateConsumerRequest) packet;
+ ClientConsumerDelegate consumer = (ClientConsumerDelegate) createConsumerDelegate(request.getDestination(), request.getSelector(), request.isNoLocal(), request.getSubscriptionName(), request.isConnectionConsumer(), request.isAutoFlowControl());
+ CreateConsumerResponse response = new CreateConsumerResponse(consumer.getID(), consumer.getBufferSize(), consumer.getMaxDeliveries(), consumer.getRedeliveryDelay());
+ response.normalize(request);
+ session.write(response);
+ } else if (type == REQ_CREATEDESTINATION)
+ {
+ CreateDestinationRequest request = (CreateDestinationRequest) packet;
+ JBossDestination destination;
+ if (request.isQueue())
+ {
+ destination = createQueue(request.getName());
+ } else
+ {
+ destination = createTopic(request.getName());
+ }
+ CreateDestinationResponse response = new CreateDestinationResponse(destination);
+ response.normalize(request);
+ session.write(response);
+ } else if (type == REQ_CREATEBROWSER)
+ {
+ CreateBrowserRequest request = (CreateBrowserRequest) packet;
+ ClientBrowserDelegate browser = (ClientBrowserDelegate) createBrowserDelegate(request.getDestination(), request.getSelector());
+ CreateBrowserResponse response = new CreateBrowserResponse(browser.getID());
+ response.normalize(request);
+ session.write(response);
+ } else if (type == REQ_ACKDELIVERY)
+ {
+ AcknowledgeDeliveryRequest request = (AcknowledgeDeliveryRequest) packet;
+ boolean acknowledged = acknowledgeDelivery(new DefaultAck(request.getDeliveryID()));
+ AcknowledgeDeliveryResponse response = new AcknowledgeDeliveryResponse(acknowledged);
+ response.normalize(request);
+ session.write(response);
+ } else if (type == MSG_ACKDELIVERIES)
+ {
+ AcknowledgeDeliveriesMessage message = (AcknowledgeDeliveriesMessage) packet;
+ acknowledgeDeliveries(message.getAcks());
+
+ NullPacket p = new NullPacket();
+ p.normalize(message);
+ session.write(p);
+ } else if (type == MSG_RECOVERDELIVERIES)
+ {
+ RecoverDeliveriesMessage message = (RecoverDeliveriesMessage) packet;
+ recoverDeliveries(message.getDeliveries(), message.getSessionID());
+
+ NullPacket p = new NullPacket();
+ p.normalize(message);
+ session.write(p);
+ } else if (type == MSG_CANCELDELIVERY)
+ {
+ CancelDeliveryMessage message = (CancelDeliveryMessage) packet;
+ cancelDelivery(message.getCancel());
+
+ NullPacket p = new NullPacket();
+ p.normalize(message);
+ session.write(p);
+ } else if (type == MSG_CANCELDELIVERIES)
+ {
+ CancelDeliveriesMessage message = (CancelDeliveriesMessage) packet;
+ cancelDeliveries(message.getCancels());
+
+ NullPacket p = new NullPacket();
+ p.normalize(message);
+ session.write(p);
+ } else if (type == REQ_CLOSING)
+ {
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
+ long id = closing(request.getSequence());
+ ClosingResponse response = new ClosingResponse(id);
+ response.normalize(request);
+ session.write(response);
+ } else if (type == MSG_CLOSE)
+ {
+ close();
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
+ } else if (type == MSG_UNSUBSCRIBE)
+ {
+ UnsubscribeMessage message = (UnsubscribeMessage) packet;
+ unsubscribe(message.getSubscriptionName());
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
+ } else if (type == MSG_ADDTEMPORARYDESTINATION)
+ {
+ AddTemporaryDestinationMessage message = (AddTemporaryDestinationMessage) packet;
+ addTemporaryDestination(message.getDestination());
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
+ } else if (type == MSG_DELETETEMPORARYDESTINATION)
+ {
+ DeleteTemporaryDestinationMessage message = (DeleteTemporaryDestinationMessage) packet;
+ deleteTemporaryDestination(message.getDestination());
+
+ NullPacket response = new NullPacket();
+ response.normalize(packet);
+ session.write(response);
+ } else {
+ //log.error("Unsupported packet for session: " + packet);
+ }
+ } catch (JMSException e)
+ {
+ JMSExceptionMessage message = new JMSExceptionMessage(e);
+ message.normalize(packet);
+ session.write(message);
+ }
+ }
+ }
}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddTemporaryDestinationMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddTemporaryDestinationMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddTemporaryDestinationMessageCodec.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class AddTemporaryDestinationMessageCodec extends
+ AbstractPacketCodec<AddTemporaryDestinationMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public AddTemporaryDestinationMessageCodec()
+ {
+ super(MSG_ADDTEMPORARYDESTINATION);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, AddTemporaryDestinationMessage message,
+ IoBuffer out) throws Exception
+ {
+ byte[] destination = encode(message.getDestination());
+
+ int bodyLength = INT_LENGTH + destination.length;
+
+ out.putInt(bodyLength);
+ out.putInt(destination.length);
+ out.put(destination);
+ }
+
+ @Override
+ protected AddTemporaryDestinationMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ int destinationLength = in.getInt();
+ byte[] b = new byte[destinationLength];
+ in.get(b);
+ JBossDestination destination = decode(b);
+
+ return new AddTemporaryDestinationMessage(destination);
+ }
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DeleteTemporaryDestinationMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DeleteTemporaryDestinationMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/DeleteTemporaryDestinationMessageCodec.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class DeleteTemporaryDestinationMessageCodec extends
+ AbstractPacketCodec<DeleteTemporaryDestinationMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DeleteTemporaryDestinationMessageCodec()
+ {
+ super(MSG_DELETETEMPORARYDESTINATION);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, DeleteTemporaryDestinationMessage message,
+ IoBuffer out) throws Exception
+ {
+ byte[] destination = encode(message.getDestination());
+
+ int bodyLength = INT_LENGTH + destination.length;
+
+ out.putInt(bodyLength);
+ out.putInt(destination.length);
+ out.put(destination);
+ }
+
+ @Override
+ protected DeleteTemporaryDestinationMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ int destinationLength = in.getInt();
+ byte[] b = new byte[destinationLength];
+ in.get(b);
+ JBossDestination destination = decode(b);
+
+ return new DeleteTemporaryDestinationMessage(destination);
+ }
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -22,6 +22,7 @@
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageRequest;
import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageRequest;
@@ -43,6 +44,7 @@
import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
@@ -58,6 +60,7 @@
import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -164,8 +167,16 @@
addCodec(BrowserNextMessageResponse.class,
BrowserNextMessageResponseCodec.class);
- }
+ addCodec(UnsubscribeMessage.class, UnsubscribeMessageCodec.class);
+
+ addCodec(AddTemporaryDestinationMessage.class,
+ AddTemporaryDestinationMessageCodec.class);
+
+ addCodec(DeleteTemporaryDestinationMessage.class,
+ DeleteTemporaryDestinationMessageCodec.class);
+}
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
@@ -185,8 +196,8 @@
private void addCodecForEmptyPacket(PacketType type,
Class<? extends AbstractPacket> clazz)
{
- AbstractPacketCodec<AbstractPacket> codec = createCodecForEmptyPacket(type,
- clazz);
+ AbstractPacketCodec<AbstractPacket> codec = createCodecForEmptyPacket(
+ type, clazz);
super.addMessageDecoder(codec);
super.addMessageEncoder(clazz, codec);
}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UnsubscribeMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UnsubscribeMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/UnsubscribeMessageCodec.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -0,0 +1,64 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class UnsubscribeMessageCodec extends
+ AbstractPacketCodec<UnsubscribeMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public UnsubscribeMessageCodec()
+ {
+ super(MSG_UNSUBSCRIBE);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, UnsubscribeMessage message,
+ IoBuffer out) throws Exception
+ {
+ String subscriptionName = message.getSubscriptionName();
+
+ out.putInt(sizeof(subscriptionName));
+ putString(out, subscriptionName);
+ }
+
+ @Override
+ protected UnsubscribeMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ String subscriptionName = getString(in);
+
+ return new UnsubscribeMessage(subscriptionName);
+ }
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AddTemporaryDestinationMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AddTemporaryDestinationMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/AddTemporaryDestinationMessage.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import org.jboss.jms.destination.JBossDestination;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class AddTemporaryDestinationMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final JBossDestination destination;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public AddTemporaryDestinationMessage(JBossDestination destination)
+ {
+ super(PacketType.MSG_ADDTEMPORARYDESTINATION);
+
+ assert destination != null;
+
+ this.destination = destination;
+ }
+
+ // Public --------------------------------------------------------
+
+ public JBossDestination getDestination()
+ {
+ return destination;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", destination=" + destination + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/DeleteTemporaryDestinationMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/DeleteTemporaryDestinationMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/DeleteTemporaryDestinationMessage.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
+
+import org.jboss.jms.destination.JBossDestination;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class DeleteTemporaryDestinationMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final JBossDestination destination;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DeleteTemporaryDestinationMessage(JBossDestination destination)
+ {
+ super(MSG_DELETETEMPORARYDESTINATION);
+
+ assert destination != null;
+
+ this.destination = destination;
+ }
+
+ // Public --------------------------------------------------------
+
+ public JBossDestination getDestination()
+ {
+ return destination;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", destination=" + destination + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -19,50 +19,53 @@
MSG_JMSEXCEPTION ((byte) 2),
TEXT ((byte) 3),
// Connection factory
- REQ_CREATECONNECTION ((byte) 4),
- RESP_CREATECONNECTION ((byte) 5),
+ REQ_CREATECONNECTION ((byte)10),
+ RESP_CREATECONNECTION ((byte)11),
// Connection
- REQ_IDBLOCK ((byte) 6),
- RESP_IDBLOCK ((byte) 7),
- REQ_CREATESESSION ((byte) 8),
- RESP_CREATESESSION ((byte) 9),
- MSG_STARTCONNECTION ((byte)10),
- MSG_STOPCONNECTION ((byte)11),
- MSG_SENDTRANSACTION ((byte)12),
- RESP_SENDTRANSACTION ((byte)13),
- REQ_GETCLIENTID ((byte)14),
- RESP_GETCLIENTID ((byte)15),
- MSG_SETCLIENTID ((byte)16),
+ REQ_IDBLOCK ((byte)20),
+ RESP_IDBLOCK ((byte)21),
+ REQ_CREATESESSION ((byte)22),
+ RESP_CREATESESSION ((byte)23),
+ MSG_STARTCONNECTION ((byte)24),
+ MSG_STOPCONNECTION ((byte)25),
+ MSG_SENDTRANSACTION ((byte)26),
+ RESP_SENDTRANSACTION ((byte)27),
+ REQ_GETCLIENTID ((byte)28),
+ RESP_GETCLIENTID ((byte)29),
+ MSG_SETCLIENTID ((byte)30),
// Session
- REQ_CREATECONSUMER ((byte)17),
- RESP_CREATECONSUMER ((byte)18),
- REQ_CREATEDESTINATION ((byte)21),
- RESP_CREATEDESTINATION ((byte)22),
- REQ_CREATEBROWSER ((byte)23),
- RESP_CREATEBROWSER ((byte)24),
- MSG_SENDMESSAGE ((byte)25),
- MSG_DELIVERMESSAGE ((byte)26),
- REQ_ACKDELIVERY ((byte)27),
- RESP_ACKDELIVERY ((byte)28),
- MSG_ACKDELIVERIES ((byte)29),
- RESP_ACKDELIVERIES ((byte)30),
- MSG_RECOVERDELIVERIES ((byte)31),
- MSG_CANCELDELIVERY ((byte)32),
- MSG_CANCELDELIVERIES ((byte)33),
+ REQ_CREATECONSUMER ((byte)40),
+ RESP_CREATECONSUMER ((byte)41),
+ REQ_CREATEDESTINATION ((byte)42),
+ RESP_CREATEDESTINATION ((byte)43),
+ MSG_ADDTEMPORARYDESTINATION ((byte)44),
+ MSG_DELETETEMPORARYDESTINATION((byte)45),
+ REQ_CREATEBROWSER ((byte)46),
+ RESP_CREATEBROWSER ((byte)47),
+ MSG_SENDMESSAGE ((byte)48),
+ MSG_DELIVERMESSAGE ((byte)49),
+ REQ_ACKDELIVERY ((byte)50),
+ RESP_ACKDELIVERY ((byte)51),
+ MSG_ACKDELIVERIES ((byte)52),
+ RESP_ACKDELIVERIES ((byte)53),
+ MSG_RECOVERDELIVERIES ((byte)54),
+ MSG_CANCELDELIVERY ((byte)55),
+ MSG_CANCELDELIVERIES ((byte)56),
+ MSG_UNSUBSCRIBE ((byte)57),
// Consumer
- MSG_CHANGERATE ((byte)34),
- // Browser
- MSG_BROWSER_RESET ((byte)35),
- REQ_BROWSER_HASNEXTMESSAGE ((byte)36),
- RESP_BROWSER_HASNEXTMESSAGE ((byte)37),
- REQ_BROWSER_NEXTMESSAGEBLOCK ((byte)38),
- RESP_BROWSER_NEXTMESSAGEBLOCK ((byte)39),
- REQ_BROWSER_NEXTMESSAGE ((byte)40),
- RESP_BROWSER_NEXTMESSAGE ((byte)41),
+ MSG_CHANGERATE ((byte)70),
+ // Browse
+ MSG_BROWSER_RESET ((byte)80),
+ REQ_BROWSER_HASNEXTMESSAGE ((byte)81),
+ RESP_BROWSER_HASNEXTMESSAGE ((byte)82),
+ REQ_BROWSER_NEXTMESSAGEBLOCK ((byte)83),
+ RESP_BROWSER_NEXTMESSAGEBLOCK ((byte)84),
+ REQ_BROWSER_NEXTMESSAGE ((byte)85),
+ RESP_BROWSER_NEXTMESSAGE ((byte)86),
// Misc
- REQ_CLOSING ((byte)42),
- RESP_CLOSING ((byte)43),
- MSG_CLOSE ((byte)44);
+ REQ_CLOSING ((byte)90),
+ RESP_CLOSING ((byte)91),
+ MSG_CLOSE ((byte)92);
public final byte type;
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UnsubscribeMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UnsubscribeMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UnsubscribeMessage.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class UnsubscribeMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String subscriptionName;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public UnsubscribeMessage(String subscriptionName)
+ {
+ super(MSG_UNSUBSCRIBE);
+
+ assert subscriptionName != null;
+
+ this.subscriptionName = subscriptionName;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getSubscriptionName()
+ {
+ return subscriptionName;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", subscriptionName=" + subscriptionName + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -8,11 +8,13 @@
import static java.lang.System.currentTimeMillis;
import static java.util.UUID.randomUUID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_BROWSER_RESET;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CHANGERATE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELIVERMESSAGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_JMSEXCEPTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
@@ -21,6 +23,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SETCLIENTID;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STOPCONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_HASNEXTMESSAGE;
@@ -74,6 +77,7 @@
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageRequest;
import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageRequest;
@@ -95,6 +99,7 @@
import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
+import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.GetClientIDRequest;
import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
@@ -110,6 +115,7 @@
import org.jboss.messaging.core.remoting.wireformat.StartConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -716,7 +722,7 @@
assertTrue(decodedPacket instanceof BrowserResetMessage);
assertEquals(MSG_BROWSER_RESET, decodedPacket.getType());
}
-
+
public void testBrowserHasNextMessageRequest() throws Exception
{
BrowserHasNextMessageRequest request = new BrowserHasNextMessageRequest();
@@ -729,7 +735,8 @@
public void testBrowserHasNextMessageResponse() throws Exception
{
- BrowserHasNextMessageResponse response = new BrowserHasNextMessageResponse(false);
+ BrowserHasNextMessageResponse response = new BrowserHasNextMessageResponse(
+ false);
AbstractPacket decodedPacket = encodeAndDecode(response);
@@ -764,6 +771,52 @@
.getMessage().getMessageID());
}
+ public void testUnsubscribeMessage() throws Exception
+ {
+ UnsubscribeMessage message = new UnsubscribeMessage(
+ "testUnsubscribeMessage");
+
+ AbstractPacket decodedPacket = encodeAndDecode(message);
+
+ assertTrue(decodedPacket instanceof UnsubscribeMessage);
+ UnsubscribeMessage decodedMessage = (UnsubscribeMessage) decodedPacket;
+ assertEquals(MSG_UNSUBSCRIBE, decodedMessage.getType());
+ assertEquals(decodedMessage.getSubscriptionName(), message
+ .getSubscriptionName());
+ }
+
+ public void testAddTemporaryDestinationMessage() throws Exception
+ {
+ JBossDestination destination = new JBossQueue("testAddTemporaryDestinationMessage",
+ true);
+ AddTemporaryDestinationMessage message = new AddTemporaryDestinationMessage(
+ destination);
+
+ AbstractPacket decodedPacket = encodeAndDecode(message);
+
+ assertTrue(decodedPacket instanceof AddTemporaryDestinationMessage);
+
+ AddTemporaryDestinationMessage decodedMessage = (AddTemporaryDestinationMessage) decodedPacket;
+ assertEquals(MSG_ADDTEMPORARYDESTINATION, decodedMessage.getType());
+ assertEquals(message.getDestination(), decodedMessage.getDestination());
+ }
+
+ public void testDeleteTemporaryDestinationMessage() throws Exception
+ {
+ JBossDestination destination = new JBossQueue("testDeleteTemporaryDestinationMessage",
+ true);
+ DeleteTemporaryDestinationMessage message = new DeleteTemporaryDestinationMessage(
+ destination);
+
+ AbstractPacket decodedPacket = encodeAndDecode(message);
+
+ assertTrue(decodedPacket instanceof DeleteTemporaryDestinationMessage);
+
+ DeleteTemporaryDestinationMessage decodedMessage = (DeleteTemporaryDestinationMessage) decodedPacket;
+ assertEquals(MSG_DELETETEMPORARYDESTINATION, decodedMessage.getType());
+ assertEquals(message.getDestination(), decodedMessage.getDestination());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-11-22 10:41:17 UTC (rev 3358)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-11-22 16:49:46 UTC (rev 3359)
@@ -1445,7 +1445,7 @@
{
if (acceptor != null)
{
- acceptor.setDisconnectOnUnbind(true);
+ acceptor.setDisconnectOnUnbind(false);
acceptor.unbind();
}
}
More information about the jboss-cvs-commits
mailing list