[jboss-cvs] JBoss Messaging SVN: r3376 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/client/delegate and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 28 08:24:40 EST 2007
Author: jmesnil
Date: 2007-11-28 08:24:40 -0500 (Wed, 28 Nov 2007)
New Revision: 3376
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/ServerHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
* replaced MINA's IoSession by our own PacketReplier interface in the new remoting API
* tweaked MINA port to be able to run remote-tests
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -23,7 +23,6 @@
import javax.jms.MessageListener;
-import org.apache.mina.common.IoSession;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.delegate.DelegateSupport;
@@ -40,6 +39,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -130,7 +130,7 @@
return consumerID;
}
- public void handle(AbstractPacket packet, IoSession session)
+ public void handle(AbstractPacket packet, PacketReplier replier)
{
try
{
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -21,6 +21,7 @@
*/
package org.jboss.jms.client.delegate;
+import static org.jboss.messaging.core.remoting.Constants.PORT;
import static org.jboss.messaging.core.remoting.TransportType.TCP;
import java.io.DataInputStream;
@@ -39,7 +40,6 @@
import org.jboss.jms.exception.MessagingNetworkFailureException;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.wireformat.JMSWireFormat;
-import org.jboss.messaging.core.remoting.Constants;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackRequest;
@@ -365,9 +365,10 @@
try
{
+ System.err.println("### " + serverLocatorURI);
InvokerLocator locator = new InvokerLocator(serverLocatorURI);
client = new org.jboss.messaging.core.remoting.Client();
- client.connect(locator.getHost(), Constants.PORT, TCP);
+ client.connect(locator.getHost(), locator.getPort() + 1000, TCP);
}
catch (Exception e)
{
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -34,7 +34,6 @@
import javax.jms.JMSException;
import javax.jms.Message;
-import org.apache.mina.common.IoSession;
import org.jboss.jms.delegate.BrowserEndpoint;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.selector.Selector;
@@ -44,6 +43,7 @@
import org.jboss.messaging.core.contract.Filter;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
@@ -273,7 +273,7 @@
return id;
}
- public void handle(AbstractPacket packet, IoSession session)
+ public void handle(AbstractPacket packet, PacketReplier replier)
{
try
{
@@ -282,20 +282,20 @@
{
BrowserHasNextMessageResponse response = new BrowserHasNextMessageResponse(hasNextMessage());
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_BROWSER_NEXTMESSAGE)
{
JBossMessage message = nextMessage();
BrowserNextMessageResponse response = new BrowserNextMessageResponse(message);
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_BROWSER_RESET)
{
reset();
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_CLOSING)
{
ClosingRequest request = (ClosingRequest) packet;
@@ -303,14 +303,14 @@
ClosingResponse response = new ClosingResponse(id);
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_CLOSE)
{
close();
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else {
log.error("Unsupported packet for browser: " + packet);
}
@@ -318,7 +318,7 @@
{
JMSExceptionMessage message = new JMSExceptionMessage(e);
message.normalize(packet);
- session.write(message);
+ replier.reply(message);
}
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -29,7 +29,6 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-import org.apache.mina.common.IoSession;
import org.jboss.jms.delegate.ConsumerEndpoint;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
@@ -51,6 +50,7 @@
import org.jboss.messaging.core.impl.SimpleDelivery;
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
@@ -645,22 +645,21 @@
sessionEndpoint.promptDelivery(messageQueue);
}
-
- private IoSession session;
+ private PacketReplier replier;
- private void setSession(IoSession session)
+ private void setReplier(PacketReplier replier)
{
- this.session = session;
+ this.replier = replier;
}
public void deliver(DeliverMessage message)
{
- if (session != null)
+ if (replier != null)
{
message.setTargetID(id);
- session.write(message);
+ replier.reply(message);
} else {
- log.error("No session to deliver message");
+ log.error("No replier to deliver message to consumer");
}
}
@@ -673,14 +672,15 @@
return id;
}
- public void handle(AbstractPacket packet, IoSession session)
+ public void handle(AbstractPacket packet, PacketReplier replier)
{
- setSession(session);
try
{
PacketType type = packet.getType();
if (type == MSG_CHANGERATE)
{
+ setReplier(replier);
+
ChangeRateMessage message = (ChangeRateMessage) packet;
changeRate(message.getRate());
} else if (type == REQ_CLOSING)
@@ -689,14 +689,16 @@
long id = closing(request.getSequence());
ClosingResponse response = new ClosingResponse(id);
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_CLOSE)
{
close();
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
+
+ setReplier(null);
} else {
log.error("unsupported packet by server consumer endpoint: " + packet);
}
@@ -704,7 +706,7 @@
{
JMSExceptionMessage message = new JMSExceptionMessage(e);
message.normalize(packet);
- session.write(message);
+ replier.reply(message);
}
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -35,15 +35,14 @@
import javax.jms.JMSException;
-import org.apache.mina.common.IoSession;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.jms.delegate.IDBlock;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.tx.MessagingXid;
import org.jboss.jms.tx.TransactionRequest;
-import org.jboss.messaging.core.remoting.Assert;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
import org.jboss.messaging.core.remoting.wireformat.CreateSessionRequest;
@@ -177,7 +176,7 @@
return id;
}
- public void handle(AbstractPacket packet, IoSession session)
+ public void handle(AbstractPacket packet, PacketReplier replier)
{
try
{
@@ -188,14 +187,14 @@
ClientSessionDelegate sessionDelegate = (ClientSessionDelegate) createSessionDelegate(request.isTransacted(), request.getAcknowledgementMode(), request.isXA());
CreateSessionResponse response = new CreateSessionResponse(sessionDelegate.getID(), sessionDelegate.getDupsOKBatchSize(), sessionDelegate.isStrictTck());
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_IDBLOCK)
{
IDBlockRequest request = (IDBlockRequest) packet;
IDBlock idBlock = getIdBlock(request.getSize());
IDBlockResponse response = new IDBlockResponse(idBlock.getLow(), idBlock.getHigh());
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_STARTCONNECTION)
{
start();
@@ -205,46 +204,46 @@
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_CLOSING)
{
org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
long id = closing(request.getSequence());
ClosingResponse response = new ClosingResponse(id);
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_CLOSE)
{
close();
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_SENDTRANSACTION)
{
SendTransactionMessage message = (SendTransactionMessage) packet;
sendTransaction(message.getTransactionRequest(), message.checkForDuplicates());
NullPacket response = new NullPacket();
response.normalize(message);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_GETPREPAREDTRANSACTIONS)
{
MessagingXid[] xids = getPreparedTransactions();
GetPreparedTransactionsResponse response = new GetPreparedTransactionsResponse(xids);
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_GETCLIENTID)
{
GetClientIDResponse response = new GetClientIDResponse(getClientID());
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_SETCLIENTID)
{
SetClientIDMessage message = (SetClientIDMessage) packet;
setClientID(message.getClientID());
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else
{
System.err.println("Unsupported packet for connection: " + packet);
@@ -253,7 +252,7 @@
{
JMSExceptionMessage message = new JMSExceptionMessage(e);
message.normalize(packet);
- session.write(message);
+ replier.reply(message);
}
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -27,7 +27,6 @@
import javax.jms.JMSException;
-import org.apache.mina.common.IoSession;
import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.delegate.TopologyResult;
@@ -35,6 +34,7 @@
import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
import org.jboss.messaging.core.remoting.Assert;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
@@ -165,7 +165,7 @@
return id;
}
- public void handle(AbstractPacket packet, IoSession session)
+ public void handle(AbstractPacket packet, PacketReplier replier)
{
try
{
@@ -178,21 +178,21 @@
CreateConnectionResponse response = new CreateConnectionResponse(del.getDelegate().getID(), del.getDelegate().getServerID());
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_GETCLIENTAOPSTACK)
{
byte[] stack = getClientAOPStack();
GetClientAOPStackResponse response = new GetClientAOPStackResponse(stack);
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_GETTOPOLOGY)
{
TopologyResult topology = getTopology();
GetTopologyResponse response = new GetTopologyResponse(topology);
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else
{
System.err.println("unhandled packet:" + packet);
@@ -201,7 +201,7 @@
{
JMSExceptionMessage message = new JMSExceptionMessage(e);
message.normalize(packet);
- session.write(message);
+ replier.reply(message);
}
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -40,7 +40,6 @@
import javax.jms.JMSException;
-import org.apache.mina.common.IoSession;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.delegate.Ack;
@@ -56,6 +55,7 @@
import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.jms.server.endpoint.SessionInternalEndpoint;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
@@ -227,7 +227,7 @@
return id;
}
- public void handle(AbstractPacket packet, IoSession session)
+ public void handle(AbstractPacket packet, PacketReplier replier)
{
try
{
@@ -239,14 +239,14 @@
NullPacket response = new NullPacket();
response.normalize(message);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_CREATECONSUMER)
{
CreateConsumerRequest request = (CreateConsumerRequest) packet;
ClientConsumerDelegate consumer = (ClientConsumerDelegate) createConsumerDelegate(request.getDestination(), request.getSelector(), request.isNoLocal(), request.getSubscriptionName(), request.isConnectionConsumer(), request.isAutoFlowControl());
CreateConsumerResponse response = new CreateConsumerResponse(consumer.getID(), consumer.getBufferSize(), consumer.getMaxDeliveries(), consumer.getRedeliveryDelay());
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_CREATEDESTINATION)
{
CreateDestinationRequest request = (CreateDestinationRequest) packet;
@@ -260,21 +260,21 @@
}
CreateDestinationResponse response = new CreateDestinationResponse(destination);
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_CREATEBROWSER)
{
CreateBrowserRequest request = (CreateBrowserRequest) packet;
ClientBrowserDelegate browser = (ClientBrowserDelegate) createBrowserDelegate(request.getDestination(), request.getSelector());
CreateBrowserResponse response = new CreateBrowserResponse(browser.getID());
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == REQ_ACKDELIVERY)
{
AcknowledgeDeliveryRequest request = (AcknowledgeDeliveryRequest) packet;
boolean acknowledged = acknowledgeDelivery(new DefaultAck(request.getDeliveryID()));
AcknowledgeDeliveryResponse response = new AcknowledgeDeliveryResponse(acknowledged);
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_ACKDELIVERIES)
{
AcknowledgeDeliveriesMessage message = (AcknowledgeDeliveriesMessage) packet;
@@ -282,7 +282,7 @@
NullPacket p = new NullPacket();
p.normalize(message);
- session.write(p);
+ replier.reply(p);
} else if (type == MSG_RECOVERDELIVERIES)
{
RecoverDeliveriesMessage message = (RecoverDeliveriesMessage) packet;
@@ -290,7 +290,7 @@
NullPacket p = new NullPacket();
p.normalize(message);
- session.write(p);
+ replier.reply(p);
} else if (type == MSG_CANCELDELIVERY)
{
CancelDeliveryMessage message = (CancelDeliveryMessage) packet;
@@ -298,7 +298,7 @@
NullPacket p = new NullPacket();
p.normalize(message);
- session.write(p);
+ replier.reply(p);
} else if (type == MSG_CANCELDELIVERIES)
{
CancelDeliveriesMessage message = (CancelDeliveriesMessage) packet;
@@ -306,21 +306,21 @@
NullPacket p = new NullPacket();
p.normalize(message);
- session.write(p);
+ replier.reply(p);
} else if (type == REQ_CLOSING)
{
org.jboss.messaging.core.remoting.wireformat.ClosingRequest request = (org.jboss.messaging.core.remoting.wireformat.ClosingRequest) packet;
long id = closing(request.getSequence());
ClosingResponse response = new ClosingResponse(id);
response.normalize(request);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_CLOSE)
{
close();
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_UNSUBSCRIBE)
{
UnsubscribeMessage message = (UnsubscribeMessage) packet;
@@ -328,7 +328,7 @@
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_ADDTEMPORARYDESTINATION)
{
AddTemporaryDestinationMessage message = (AddTemporaryDestinationMessage) packet;
@@ -336,7 +336,7 @@
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else if (type == MSG_DELETETEMPORARYDESTINATION)
{
DeleteTemporaryDestinationMessage message = (DeleteTemporaryDestinationMessage) packet;
@@ -344,7 +344,7 @@
NullPacket response = new NullPacket();
response.normalize(packet);
- session.write(response);
+ replier.reply(response);
} else {
//log.error("Unsupported packet for session: " + packet);
}
@@ -352,7 +352,7 @@
{
JMSExceptionMessage message = new JMSExceptionMessage(e);
message.normalize(packet);
- session.write(message);
+ replier.reply(message);
}
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/AbstractPacketHandler.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -10,7 +10,6 @@
import java.util.UUID;
-import org.apache.mina.common.IoSession;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
/**
@@ -55,7 +54,7 @@
return id;
}
- public abstract void handle(AbstractPacket packet, IoSession session);
+ public abstract void handle(AbstractPacket packet, PacketReplier replier);
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -74,6 +74,7 @@
assert port > 0;
assert transport != null;
+ System.err.println("### connect to " + host + ":" + port + " ###");
connector = new NioSocketConnector();
MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/PacketHandler.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -6,7 +6,6 @@
*/
package org.jboss.messaging.core.remoting;
-import org.apache.mina.common.IoSession;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
/**
@@ -36,5 +35,5 @@
*/
String getID();
- void handle(AbstractPacket packet, IoSession session);
+ void handle(AbstractPacket packet, PacketReplier replier);
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/internal/ClientHandler.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -11,6 +11,7 @@
import org.apache.mina.filter.reqres.Response;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
/**
@@ -33,7 +34,7 @@
// IoHandlerAdapter overrides ------------------------------------
@Override
- public void messageReceived(IoSession session, Object message)
+ public void messageReceived(final IoSession session, Object message)
throws Exception
{
if (message instanceof AbstractPacket)
@@ -45,7 +46,13 @@
if (handler != null)
{
System.err.println("ClientHandler.messageReceived() handler: " + handler);
- handler.handle(packet, session);
+ handler.handle(packet, new PacketReplier() {
+ public void reply(AbstractPacket p)
+ {
+ session.write(p);
+ }
+
+ });
} else
{
System.err
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/ServerHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/ServerHandler.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/server/ServerHandler.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -11,6 +11,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
/**
@@ -56,7 +57,12 @@
PacketHandler handler = PacketDispatcher.server.getHandler(targetID);
if (handler != null)
{
- handler.handle(packet, session);
+ handler.handle(packet, new PacketReplier() {
+ public void reply(AbstractPacket p)
+ {
+ session.write(p);
+ }
+ });
return;
} else
{
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/RemoteDispatcherTest.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -14,9 +14,9 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
-import org.apache.mina.common.IoSession;
import org.jboss.messaging.core.remoting.AbstractPacketHandler;
import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
@@ -55,6 +55,7 @@
public void testSendOneWay() throws Exception
{
TextPacket packet = new TextPacket("testSendOneWay");
+ packet.setVersion((byte)1);
client.sendOneWay(packet);
Thread.sleep(300);
@@ -71,6 +72,7 @@
for (int i = 0; i < MANY_MESSAGES; i++)
{
packets[i] = new TextPacket("testSendManyOneWay " + i);
+ packets[i].setVersion((byte)1);
client.sendOneWay(packets[i]);
}
@@ -96,6 +98,7 @@
TestClientHandler callbackHandler = new TestClientHandler();
TextPacket packet = new TextPacket("testSendOneWayWithCallbackHandler");
+ packet.setVersion((byte)1);
client.sendOneWay(packet, callbackHandler);
assertEquals(0, callbackHandler.getPackets().size());
@@ -110,6 +113,7 @@
public void testSendBlocking() throws Exception
{
TextPacket request = new TextPacket("testSendBlocking");
+ request.setVersion((byte)1);
AbstractPacket receivedPacket = client.sendBlocking(request);
@@ -125,6 +129,7 @@
serverHandler.setSleepTime(7, SECONDS);
AbstractPacket packet = new TextPacket("testSendBlockingWithTimeout");
+ packet.setVersion((byte)1);
try
{
@@ -168,7 +173,7 @@
packets = new ArrayList<TextPacket>();
}
- public void handle(AbstractPacket packet, IoSession session)
+ public void handle(AbstractPacket packet, PacketReplier replier)
{
packets.add((TextPacket) packet);
}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/ReverseServerHandler.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -91,6 +91,7 @@
Thread.sleep(sleepTime);
TextPacket p = new TextPacket(reverse(incomingPacket.getText()));
p.setCorrelationID(incomingPacket.getCorrelationID());
+ p.setVersion(incomingPacket.getVersion());
if (!NO_ID_SET.equals(incomingPacket.getCallbackID()))
{
p.setTargetID(incomingPacket.getCallbackID());
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/TargetHandlerTest.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -14,6 +14,7 @@
import org.apache.mina.common.IoSession;
import org.jboss.messaging.core.remoting.AbstractPacketHandler;
import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketReplier;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
@@ -40,9 +41,11 @@
PacketDispatcher.client.register(targetHandler);
+ TextPacket packet = new TextPacket(
+ "testClientHandlePacketSentByServer from client");
+ packet.setVersion((byte)1);
// send a packet to create the IoSession on the server
- client.sendOneWay(new TextPacket(
- "testClientHandlePacketSentByServer from client"));
+ client.sendOneWay(packet);
Thread.sleep(300);
@@ -50,6 +53,7 @@
IoSession serverSession = serverHandler.getSessions().get(0);
TextPacket packetFromServer = new TextPacket(
"testClientHandlePacketSentByServer from server");
+ packetFromServer.setVersion((byte)1);
packetFromServer.setTargetID(targetHandler.getID());
serverSession.write(packetFromServer);
@@ -92,7 +96,7 @@
packets = new ArrayList<TextPacket>();
}
- public void handle(AbstractPacket packet, IoSession session)
+ public void handle(AbstractPacket packet, PacketReplier replier)
{
packets.add((TextPacket) packet);
}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-11-28 12:07:34 UTC (rev 3375)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-11-28 13:24:40 UTC (rev 3376)
@@ -21,9 +21,6 @@
*/
package org.jboss.test.messaging.tools.container;
-import static org.jboss.messaging.core.remoting.Constants.PORT;
-import static org.jboss.remoting.transport.PortUtil.findFreePort;
-
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
@@ -99,8 +96,6 @@
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
-import sun.management.StringFlag;
-
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
/**
@@ -1418,14 +1413,19 @@
log.debug("started " + objectName);
- startMINAServer();
+ if (!transport.equals("http"))
+ {
+ startMINAServer(locator.getPort() + 1000);
+ }
}
- private void startMINAServer() throws Exception
+ private void startMINAServer(int port) throws Exception
{
if (acceptor == null)
{
+ info("Starting MINA on port " + port);
+
acceptor = new NioSocketAcceptor();
// Prepare the configuration
@@ -1436,7 +1436,7 @@
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
// Bind
- acceptor.setLocalAddress(new InetSocketAddress(PORT));
+ acceptor.setLocalAddress(new InetSocketAddress("127.0.0.1", port));
acceptor.setReuseAddress(true);
acceptor.getSessionConfig().setReuseAddress(true);
acceptor.getSessionConfig().setKeepAlive(true);
@@ -1445,7 +1445,7 @@
acceptor.setHandler(new ServerHandler());
acceptor.bind();
- info("Started MINA on port " + PORT);
+ info("MINA started");
}
}
@@ -1455,6 +1455,7 @@
{
acceptor.unbind();
acceptor.dispose();
+ acceptor = null;
info("Stopped MINA ");
}
More information about the jboss-cvs-commits
mailing list