[jboss-cvs] JBoss Messaging SVN: r3353 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/server/endpoint and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 21 05:42:02 EST 2007
Author: jmesnil
Date: 2007-11-21 05:42:02 -0500 (Wed, 21 Nov 2007)
New Revision: 3353
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationRequestCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateDestinationRequest.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateDestinationResponse.java
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
* added packets & codecs to create destination (both queue & topic)
* factorized code to send a packet and block until the server responds in DelegateSupport.sendBlocking(AbstractPacket):AbstractPacket
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -23,7 +23,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.util.concurrent.TimeoutException;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
@@ -42,7 +41,6 @@
import org.jboss.jms.tx.MessagingXid;
import org.jboss.jms.tx.ResourceManagerFactory;
import org.jboss.jms.tx.TransactionRequest;
-import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest;
import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsRequest;
import org.jboss.jms.wireformat.ConnectionStartRequest;
import org.jboss.jms.wireformat.RequestSupport;
@@ -159,35 +157,14 @@
public void close() throws JMSException
{
- assert newClient != null;
-
- CloseMessage message = new CloseMessage();
- message.setTargetID(id);
- try
- {
- newClient.sendBlocking(message);
- } catch (TimeoutException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ sendBlocking(new CloseMessage());
}
public long closing(long sequence) throws JMSException
{
- assert newClient != null;
-
- try
- {
- org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(sequence);
- request.setTargetID(id);
- ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
- return response.getID();
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- return -1;
- }
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(sequence);
+ ClosingResponse response = (ClosingResponse) sendBlocking(request);
+ return response.getID();
}
// ConnectionDelegate implementation ------------------------------------------------------------
@@ -209,42 +186,23 @@
int acknowledgmentMode,
boolean isXA) throws JMSException
{
- assert newClient != null;
-
CreateSessionRequest request = new CreateSessionRequest(transacted, acknowledgmentMode, isXA);
- request.setTargetID(id);
-
- try {
- CreateSessionResponse response = (CreateSessionResponse) newClient.sendBlocking(request);
- ClientSessionDelegate delegate = new ClientSessionDelegate(response.getSessionID(), response.getDupsOKBatchSize(), response.isStrictTCK());
- return delegate;
- } catch (Exception e) {
- e.printStackTrace();
- }
+ CreateSessionResponse response = (CreateSessionResponse) sendBlocking(request);
+ ClientSessionDelegate delegate = new ClientSessionDelegate(response.getSessionID(), response.getDupsOKBatchSize(), response.isStrictTCK());
+ return delegate;
- RequestSupport req =
- new ConnectionCreateSessionDelegateRequest(id, version, transacted,
- acknowledgmentMode, isXA);
-
- return (SessionDelegate)doInvoke(client, req);
+// RequestSupport req =
+// new ConnectionCreateSessionDelegateRequest(id, version, transacted,
+// acknowledgmentMode, isXA);
+//
+// return (SessionDelegate)doInvoke(client, req);
}
public String getClientID() throws JMSException
{
- assert newClient != null;
-
- try
- {
- GetClientIDRequest request = new GetClientIDRequest();
- request.setTargetID(id);
- GetClientIDResponse response = (GetClientIDResponse) newClient.sendBlocking(request);
- return response.getClientID();
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- return null;
- }
+ GetClientIDResponse response = (GetClientIDResponse) sendBlocking(new GetClientIDRequest());
+ return response.getClientID();
// RequestSupport req = new ConnectionGetClientIDRequest(id, version);
//
@@ -272,18 +230,8 @@
public void sendTransaction(TransactionRequest tr,
boolean checkForDuplicates) throws JMSException
{
- assert newClient != null;
-
- try
- {
- SendTransactionMessage request = new SendTransactionMessage(tr, checkForDuplicates);
- request.setTargetID(id);
- newClient.sendBlocking(request);
- } catch (TimeoutException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ sendBlocking(new SendTransactionMessage(tr, checkForDuplicates));
+
// RequestSupport req =
// new ConnectionSendTransactionRequest(id, version, request, checkForDuplicates);
//
@@ -292,17 +240,8 @@
public void setClientID(String clientID) throws JMSException
{
- assert newClient != null;
-
- try
- {
- SetClientIDMessage message = new SetClientIDMessage(clientID);
- message.setTargetID(id);
- newClient.sendBlocking(message);
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- }
+ sendBlocking(new SetClientIDMessage(clientID));
+
// RequestSupport req = new ConnectionSetClientIDRequest(id, version, clientID);
//
// doInvoke(client, req);
@@ -338,18 +277,8 @@
public void stop() throws JMSException
{
- assert newClient != null;
+ sendBlocking(new StopConnectionMessage());
- StopConnectionMessage message = new StopConnectionMessage();
- message.setTargetID(id);
- try
- {
- newClient.sendBlocking(message);
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- }
-
// RequestSupport req = new ConnectionStopRequest(id, version);
//
// doInvoke(client, req);
@@ -380,19 +309,9 @@
public IDBlock getIdBlock(int size) throws JMSException
{
- assert newClient != null;
-
IDBlockRequest request = new IDBlockRequest(size);
- request.setTargetID(id);
- try
- {
- IDBlockResponse response = (IDBlockResponse) newClient.sendBlocking(request);
- return new IDBlock(response.getLow(), response.getHigh());
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- return null;
- }
+ IDBlockResponse response = (IDBlockResponse) sendBlocking(request);
+ return new IDBlock(response.getLow(), response.getHigh());
}
// Public ---------------------------------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -21,9 +21,6 @@
*/
package org.jboss.jms.client.delegate;
-import static org.jboss.messaging.core.remoting.Constants.PORT;
-import static org.jboss.messaging.core.remoting.TransportType.TCP;
-
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.Serializable;
@@ -156,16 +153,25 @@
remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
remotingConnection.start();
- org.jboss.messaging.core.remoting.Client client = remotingConnection.getNewRemotingClient();
+ newClient = remotingConnection.getNewRemotingClient();
+
CreateConnectionRequest request = new CreateConnectionRequest(v, JMSClientVMIdentifier.instance, failedNodeID, username, password);
- request.setTargetID(id);
- CreateConnectionResponse response = (CreateConnectionResponse) client.sendBlocking(request);
+ CreateConnectionResponse response = (CreateConnectionResponse) sendBlocking(request);
ClientConnectionDelegate connectionDelegate = new ClientConnectionDelegate(response.getConnectionID(), response.getServerID());
res = new CreateConnectionResult(connectionDelegate);
- } catch (Throwable e)
+ } catch (Throwable t)
{
- e.printStackTrace();
- throw handleThrowable(e);
+ if (remotingConnection != null)
+ {
+ try
+ {
+ remotingConnection.stop();
+ }
+ catch (Throwable ignore)
+ {
+ }
+ }
+ throw handleThrowable(t);
}
// try
// {
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -21,11 +21,8 @@
*/
package org.jboss.jms.client.delegate;
-import static org.jboss.messaging.core.remoting.TransportType.TCP;
-
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.util.concurrent.TimeoutException;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -35,12 +32,7 @@
import org.jboss.jms.client.state.HierarchicalState;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.wireformat.CloseRequest;
-import org.jboss.jms.wireformat.ClosingRequest;
-import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.Client;
-import org.jboss.messaging.core.remoting.Constants;
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
@@ -131,18 +123,7 @@
public void close() throws JMSException
{
- assert newClient != null;
-
- CloseMessage message = new CloseMessage();
- message.setTargetID(id);
- try
- {
- newClient.sendBlocking(message);
- } catch (TimeoutException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ sendBlocking(new CloseMessage());
// RequestSupport req = new CloseRequest(id, version);
//
@@ -151,19 +132,10 @@
public long closing(long sequence) throws JMSException
{
- assert newClient != null;
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(sequence);
+ ClosingResponse response = (ClosingResponse) sendBlocking(request);
+ return response.getID();
- try
- {
- org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(sequence);
- request.setTargetID(id);
- ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
- return response.getID();
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- return -1;
- }
// RequestSupport req = new ClosingRequest(sequence, id, version);
//
// return ((Long)doInvoke(client, req)).longValue();
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -25,7 +25,6 @@
import java.io.DataOutputStream;
import java.io.Serializable;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
@@ -54,12 +53,9 @@
import org.jboss.jms.message.TextMessageProxy;
import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.jms.wireformat.SessionAddTemporaryDestinationRequest;
-import org.jboss.jms.wireformat.SessionCreateQueueRequest;
-import org.jboss.jms.wireformat.SessionCreateTopicRequest;
import org.jboss.jms.wireformat.SessionDeleteTemporaryDestinationRequest;
import org.jboss.jms.wireformat.SessionUnsubscribeRequest;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
@@ -71,7 +67,8 @@
import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.SendMessage;
@@ -132,7 +129,7 @@
super.synchronizeWith(nd);
- ClientSessionDelegate newDelegate = (ClientSessionDelegate)nd;
+ DelegateSupport newDelegate = (DelegateSupport)nd;
// synchronize server endpoint state
@@ -144,6 +141,8 @@
client = conn.getRemotingClient();
+ newClient = conn.getNewRemotingClient();
+
strictTck = conn.isStrictTck();
}
@@ -163,18 +162,7 @@
public void close() throws JMSException
{
- assert newClient != null;
-
- CloseMessage message = new CloseMessage();
- message.setTargetID(id);
- try
- {
- newClient.sendBlocking(message);
- } catch (TimeoutException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ sendBlocking(new CloseMessage());
//
// RequestSupport req = new CloseRequest(id, version);
//
@@ -183,22 +171,11 @@
public long closing(long sequence) throws JMSException
{
- assert newClient != null;
-
long seq = ((SessionState)state).getNPSendSequence();
+ org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(seq);
+ ClosingResponse response = (ClosingResponse) sendBlocking(request);
+ return response.getID();
- try
- {
- org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = new org.jboss.messaging.core.remoting.wireformat.ClosingRequest(seq);
- request.setTargetID(id);
- ClosingResponse response = (ClosingResponse) newClient.sendBlocking(request);
- return response.getID();
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- return -1;
- }
-
// RequestSupport req = new ClosingRequest(((SessionState)state).getNPSendSequence(), id, version);
//
// return ((Long)doInvoke(client, req)).longValue();
@@ -208,20 +185,10 @@
public boolean acknowledgeDelivery(Ack ack) throws JMSException
{
- assert newClient != null;
-
- try
- {
- AcknowledgeDeliveryRequest request = new AcknowledgeDeliveryRequest(ack.getDeliveryID());
- request.setTargetID(id);
-
- AcknowledgeDeliveryResponse response = (AcknowledgeDeliveryResponse) newClient.sendBlocking(request);
+ AcknowledgeDeliveryRequest request = new AcknowledgeDeliveryRequest(ack.getDeliveryID());
+ AcknowledgeDeliveryResponse response = (AcknowledgeDeliveryResponse) sendBlocking(request);
return response.isAcknowledged();
- } catch (Exception e)
- {
- e.printStackTrace();
- return false;
- }
+
//
// //FIXME
// RequestSupport req = new SessionAcknowledgeDeliveryRequest(id, version, ack);
@@ -231,19 +198,7 @@
public void acknowledgeDeliveries(List acks) throws JMSException
{
- assert newClient != null;
-
- try
- {
- AcknowledgeDeliveriesMessage message = new AcknowledgeDeliveriesMessage(acks);
- message.setTargetID(id);
- AbstractPacket response = newClient.sendBlocking(message);
-
- assert response instanceof NullPacket;
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- }
+ sendBlocking(new AcknowledgeDeliveriesMessage(acks));
// RequestSupport req = new SessionAcknowledgeDeliveriesRequest(id, version, acks);
//
@@ -287,20 +242,9 @@
public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
throws JMSException
{
- assert newClient != null;
-
CreateBrowserRequest request = new CreateBrowserRequest(queue, messageSelector);
- request.setTargetID(id);
- try
- {
- CreateBrowserResponse response = (CreateBrowserResponse) newClient.sendBlocking(request);
- return new ClientBrowserDelegate(response.getBrowserID());
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- return null;
- }
-
+ CreateBrowserResponse response = (CreateBrowserResponse) sendBlocking(request);
+ return new ClientBrowserDelegate(response.getBrowserID());
// RequestSupport req = new SessionCreateBrowserDelegateRequest(id, version, queue,
// messageSelector);
@@ -324,21 +268,11 @@
boolean noLocal, String subscriptionName,
boolean connectionConsumer, boolean started) throws JMSException
{
- assert newClient != null;
+ CreateConsumerRequest request = new CreateConsumerRequest(destination, selector, noLocal, subscriptionName, connectionConsumer, started);
+ CreateConsumerResponse response = (CreateConsumerResponse) sendBlocking(request);
- try
- {
- CreateConsumerRequest request = new CreateConsumerRequest(destination, selector, noLocal, subscriptionName, connectionConsumer, started);
- request.setTargetID(id);
- CreateConsumerResponse response = (CreateConsumerResponse) newClient.sendBlocking(request);
- ClientConsumerDelegate delegate = new ClientConsumerDelegate(response.getConsumerID(), response.getBufferSize(), response.getMaxDeliveries(), response.getRedeliveryDelay());
-
- return delegate;
- } catch (Exception e)
- {
- e.printStackTrace();
- return null;
- }
+ ClientConsumerDelegate delegate = new ClientConsumerDelegate(response.getConsumerID(), response.getBufferSize(), response.getMaxDeliveries(), response.getRedeliveryDelay());
+ return delegate;
}
/**
@@ -388,11 +322,15 @@
public JBossQueue createQueue(String queueName) throws JMSException
{
- RequestSupport req = new SessionCreateQueueRequest(id, version, queueName);
+ CreateDestinationRequest request = new CreateDestinationRequest(queueName, true);
+ CreateDestinationResponse response = (CreateDestinationResponse) sendBlocking(request);
+ return (JBossQueue) response.getDestination();
- return (JBossQueue)doInvoke(client, req);
+ // RequestSupport req = new SessionCreateQueueRequest(id, version, queueName);
+//
+// return (JBossQueue)doInvoke(client, req);
}
-
+
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
@@ -422,9 +360,13 @@
public JBossTopic createTopic(String topicName) throws JMSException
{
- RequestSupport req = new SessionCreateTopicRequest(id, version, topicName);
+ CreateDestinationRequest request = new CreateDestinationRequest(topicName, false);
+ CreateDestinationResponse response = (CreateDestinationResponse) sendBlocking(request);
+ return (JBossTopic) response.getDestination();
- return (JBossTopic)doInvoke(client, req);
+// RequestSupport req = new SessionCreateTopicRequest(id, version, topicName);
+//
+// return (JBossTopic)doInvoke(client, req);
}
public void deleteTemporaryDestination(JBossDestination destination) throws JMSException
@@ -601,19 +543,8 @@
public void cancelDeliveries(List cancels) throws JMSException
{
- assert newClient != null;
+ sendBlocking(new CancelDeliveriesMessage(cancels));
- CancelDeliveriesMessage message = new CancelDeliveriesMessage(cancels);
- message.setTargetID(id);
-
- try
- {
- newClient.sendBlocking(message);
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- }
-
// RequestSupport req = new SessionCancelDeliveriesRequest(id, version, cancels);
//
// doInvoke(client, req);
@@ -621,19 +552,8 @@
public void cancelDelivery(Cancel cancel) throws JMSException
{
- assert newClient != null;
+ sendBlocking(new CancelDeliveryMessage(cancel));
- CancelDeliveryMessage message = new CancelDeliveryMessage(cancel);
- message.setTargetID(id);
-
- try
- {
- newClient.sendBlocking(message);
- } catch (TimeoutException e)
- {
- e.printStackTrace();
- }
-
// RequestSupport req = new SessionCancelDeliveryRequest(id, version, cancel);
//
// doInvoke(client, req);
@@ -641,19 +561,8 @@
public void recoverDeliveries(List deliveries, String sessionID) throws JMSException
{
- assert newClient != null;
+ sendBlocking(new RecoverDeliveriesMessage(deliveries, sessionID));
- try
- {
- RecoverDeliveriesMessage message = new RecoverDeliveriesMessage(deliveries, sessionID);
- message.setTargetID(id);
- newClient.sendBlocking(message);
- } catch (TimeoutException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
// RequestSupport req = new SessionRecoverDeliveriesRequest(id, version, acks, sessionID);
//
// doInvoke(client, req);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -35,6 +35,7 @@
import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.jms.wireformat.ResponseSupport;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.util.Streamable;
import org.jboss.remoting.CannotConnectException;
import org.jboss.remoting.Client;
@@ -166,6 +167,22 @@
return doInvoke(client, req, true);
}
+ protected AbstractPacket sendBlocking(AbstractPacket request) throws JMSException
+ {
+ assert newClient != null;
+ assert request != null;
+
+ request.setTargetID(id);
+ try
+ {
+ AbstractPacket response = (AbstractPacket) newClient.sendBlocking(request);
+ return response;
+ } catch (Throwable t)
+ {
+ throw handleThrowable(t);
+ }
+ }
+
// Private --------------------------------------------------------------------------------------
private Object doInvoke(Client client, RequestSupport req, boolean oneWay) throws JMSException
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -31,6 +31,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
@@ -108,6 +109,8 @@
import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -2462,6 +2465,20 @@
CreateConsumerResponse response = new CreateConsumerResponse(consumer.getID(), consumer.getBufferSize(), consumer.getMaxDeliveries(), consumer.getRedeliveryDelay());
response.normalize(request);
session.write(response);
+ } else if (type == REQ_CREATEDESTINATION)
+ {
+ CreateDestinationRequest request = (CreateDestinationRequest) packet;
+ JBossDestination destination;
+ if (request.isQueue())
+ {
+ destination = createQueue(request.getName());
+ } else
+ {
+ destination = createTopic(request.getName());
+ }
+ CreateDestinationResponse response = new CreateDestinationResponse(destination);
+ response.normalize(request);
+ session.write(response);
} else if (type == REQ_CREATEBROWSER)
{
CreateBrowserRequest request = (CreateBrowserRequest) packet;
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationRequestCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationRequestCodec.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class CreateDestinationRequestCodec extends
+ AbstractPacketCodec<CreateDestinationRequest>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CreateDestinationRequestCodec()
+ {
+ super(REQ_CREATEDESTINATION);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session,
+ CreateDestinationRequest request, IoBuffer out) throws Exception
+ {
+ String name = request.getName();
+ boolean isQueue = request.isQueue();
+
+ int bodyLength = sizeof(name) + 1;
+
+ out.putInt(bodyLength);
+ putString(out, name);
+ putBoolean(out, isQueue);
+ }
+
+ @Override
+ protected CreateDestinationRequest decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ String name = getString(in);
+ boolean isQueue = getBoolean(in);
+
+ return new CreateDestinationRequest(name, isQueue);
+ }
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationResponseCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/CreateDestinationResponseCodec.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATEDESTINATION;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class CreateDestinationResponseCodec extends
+ AbstractPacketCodec<CreateDestinationResponse>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CreateDestinationResponseCodec()
+ {
+ super(RESP_CREATEDESTINATION);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, CreateDestinationResponse response,
+ IoBuffer out) throws Exception
+ {
+ byte[] destination = encode(response.getDestination());
+
+ int bodyLength = INT_LENGTH + destination.length;
+
+ out.putInt(bodyLength);
+ out.putInt(destination.length);
+ out.put(destination);
+ }
+
+ @Override
+ protected CreateDestinationResponse decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ int destinationLength = in.getInt();
+ byte[] b = new byte[destinationLength];
+ in.get(b);
+ JBossDestination destination = decode(b);
+
+ return new CreateDestinationResponse(destination);
+ }
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -22,6 +22,8 @@
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
@@ -106,6 +108,14 @@
super.addMessageEncoder(CreateConsumerRequest.class,
CreateConsumerRequestCodec.class);
+ super.addMessageDecoder(CreateDestinationRequestCodec.class);
+ super.addMessageEncoder(CreateDestinationRequest.class,
+ CreateDestinationRequestCodec.class);
+
+ super.addMessageDecoder(CreateDestinationResponseCodec.class);
+ super.addMessageEncoder(CreateDestinationResponse.class,
+ CreateDestinationResponseCodec.class);
+
super.addMessageDecoder(CreateConsumerResponseCodec.class);
super.addMessageEncoder(CreateConsumerResponse.class,
CreateConsumerResponseCodec.class);
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateDestinationRequest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateDestinationRequest.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateDestinationRequest.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class CreateDestinationRequest extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String name;
+ private final boolean isQueue;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CreateDestinationRequest(String name, boolean isQueue)
+ {
+ super(REQ_CREATEDESTINATION);
+
+ assert name != null;
+ assert name.length() > 0;
+
+ this.name = name;
+ this.isQueue = isQueue;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public boolean isQueue()
+ {
+ return isQueue;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", name=" + name + ", isQueue=" + isQueue + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateDestinationResponse.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateDestinationResponse.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateDestinationResponse.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_CREATEDESTINATION;
+
+import org.jboss.jms.destination.JBossDestination;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class CreateDestinationResponse extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final JBossDestination destination;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CreateDestinationResponse(JBossDestination destination)
+ {
+ super(RESP_CREATEDESTINATION);
+
+ assert destination != null;
+
+ this.destination = destination;
+ }
+
+ // Public --------------------------------------------------------
+
+ public JBossDestination getDestination()
+ {
+ return destination;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", destination=" + destination + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -35,23 +35,25 @@
// Session
REQ_CREATECONSUMER ((byte)17),
RESP_CREATECONSUMER ((byte)18),
- REQ_CREATEBROWSER ((byte)19),
- RESP_CREATEBROWSER ((byte)20),
- MSG_SENDMESSAGE ((byte)21),
- MSG_DELIVERMESSAGE ((byte)22),
- REQ_ACKDELIVERY ((byte)23),
- RESP_ACKDELIVERY ((byte)24),
- MSG_ACKDELIVERIES ((byte)25),
- RESP_ACKDELIVERIES ((byte)26),
- MSG_RECOVERDELIVERIES ((byte)27),
- MSG_CANCELDELIVERY ((byte)28),
- MSG_CANCELDELIVERIES ((byte)29),
+ REQ_CREATEDESTINATION ((byte)21),
+ RESP_CREATEDESTINATION ((byte)22),
+ REQ_CREATEBROWSER ((byte)23),
+ RESP_CREATEBROWSER ((byte)24),
+ MSG_SENDMESSAGE ((byte)25),
+ MSG_DELIVERMESSAGE ((byte)26),
+ REQ_ACKDELIVERY ((byte)27),
+ RESP_ACKDELIVERY ((byte)28),
+ MSG_ACKDELIVERIES ((byte)29),
+ RESP_ACKDELIVERIES ((byte)30),
+ MSG_RECOVERDELIVERIES ((byte)31),
+ MSG_CANCELDELIVERY ((byte)32),
+ MSG_CANCELDELIVERIES ((byte)33),
// Consumer
- MSG_CHANGERATE ((byte)30),
+ MSG_CHANGERATE ((byte)34),
// Misc
- REQ_CLOSING ((byte)32),
- RESP_CLOSING ((byte)33),
- MSG_CLOSE ((byte)34);
+ REQ_CLOSING ((byte)35),
+ RESP_CLOSING ((byte)36),
+ MSG_CLOSE ((byte)37);
private byte type;
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-21 00:12:06 UTC (rev 3352)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-21 10:42:02 UTC (rev 3353)
@@ -25,6 +25,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATESESSION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTID;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_IDBLOCK;
@@ -54,6 +55,7 @@
import org.jboss.jms.delegate.DeliveryRecovery;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
+import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.tx.ClientTransaction;
import org.jboss.jms.tx.TransactionRequest;
@@ -75,6 +77,8 @@
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
@@ -329,6 +333,51 @@
.isAutoFlowControl());
}
+ public void testCreateDestinationRequest() throws Exception
+ {
+ CreateDestinationRequest request = new CreateDestinationRequest(
+ "testCreateDestinationRequest", false);
+
+ AbstractPacket decodedPacket = encodeAndDecode(request);
+
+ assertTrue(decodedPacket instanceof CreateDestinationRequest);
+
+ CreateDestinationRequest decodedRequest = (CreateDestinationRequest) decodedPacket;
+ assertEquals(REQ_CREATEDESTINATION, decodedRequest.getType());
+ assertEquals(request.getName(), decodedRequest.getName());
+ assertEquals(request.isQueue(), decodedRequest.isQueue());
+ }
+
+ public void testCreateDestinationResponseForQueue() throws Exception
+ {
+ JBossDestination destination = new JBossQueue("testCreateQueueResponse", true);
+ CreateDestinationResponse response = new CreateDestinationResponse(destination);
+
+ AbstractPacket decodedPacket = encodeAndDecode(response);
+
+ assertTrue(decodedPacket instanceof CreateDestinationResponse);
+
+ CreateDestinationResponse decodedResponse = (CreateDestinationResponse) decodedPacket;
+ assertEquals(PacketType.RESP_CREATEDESTINATION, decodedResponse.getType());
+ assertTrue(decodedResponse.getDestination() instanceof JBossQueue);
+ assertEquals(response.getDestination(), decodedResponse.getDestination());
+ }
+
+ public void testCreateDestinationResponseForTopic() throws Exception
+ {
+ JBossDestination destination = new JBossTopic("testCreateDestinationResponseForTopic");
+ CreateDestinationResponse response = new CreateDestinationResponse(destination);
+
+ AbstractPacket decodedPacket = encodeAndDecode(response);
+
+ assertTrue(decodedPacket instanceof CreateDestinationResponse);
+
+ CreateDestinationResponse decodedResponse = (CreateDestinationResponse) decodedPacket;
+ assertEquals(PacketType.RESP_CREATEDESTINATION, decodedResponse.getType());
+ assertTrue(decodedResponse.getDestination() instanceof JBossTopic);
+ assertEquals(response.getDestination(), decodedResponse.getDestination());
+ }
+
public void testCreateConsumerResponse() throws Exception
{
@@ -357,7 +406,7 @@
assertTrue(decodedPacket instanceof StartConnectionMessage);
assertEquals(MSG_STARTCONNECTION, decodedPacket.getType());
}
-
+
public void testStopConnectionMessage() throws Exception
{
StopConnectionMessage packet = new StopConnectionMessage();
@@ -591,11 +640,11 @@
assertEquals(MSG_CANCELDELIVERIES, decodedMessage.getType());
assertSameCancels(message.getCancels(), decodedMessage.getCancels());
}
-
+
public void testCreateBrowserRequest() throws Exception
{
- JBossDestination destination = new JBossQueue(
- "testCreateBrowserRequest", true);
+ JBossDestination destination = new JBossQueue("testCreateBrowserRequest",
+ true);
CreateBrowserRequest request = new CreateBrowserRequest(destination,
"color = 'red'");
@@ -608,11 +657,12 @@
assertEquals(request.getDestination(), decodedRequest.getDestination());
assertEquals(request.getSelector(), decodedRequest.getSelector());
}
-
+
public void testCreateBrowserResponse() throws Exception
{
- CreateBrowserResponse response = new CreateBrowserResponse(randomUUID().toString());
-
+ CreateBrowserResponse response = new CreateBrowserResponse(randomUUID()
+ .toString());
+
AbstractPacket decodedPacket = encodeAndDecode(response);
assertTrue(decodedPacket instanceof CreateBrowserResponse);
More information about the jboss-cvs-commits
mailing list