[jboss-cvs] JBoss Messaging SVN: r3559 - in trunk: src/main/org/jboss/jms/server/endpoint and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 11 10:02:00 EST 2008
Author: jmesnil
Date: 2008-01-11 10:01:59 -0500 (Fri, 11 Jan 2008)
New Revision: 3559
Modified:
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/invm/test/unit/INVMClientTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java
Log:
* refactored server-side PacketDispatcher: removed PacketDispatcher.server singleton and replaced it by using MinaService.getDispatcher() instead
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -216,7 +216,7 @@
// Registering with the dispatcher should always be the last thing otherwise a client could
// use a partially initialised object
- PacketDispatcher.server.register(endpoint.newHandler());
+ serverPeer.getMinaService().getDispatcher().register(endpoint.newHandler());
// Replicate the change - we will ignore this locally
@@ -265,7 +265,7 @@
}
}
- PacketDispatcher.server.unregister(endpoint.getID());
+ serverPeer.getMinaService().getDispatcher().unregister(endpoint.getID());
}
// MessagingComponent implementation ------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -39,7 +39,6 @@
import org.jboss.jms.server.selector.Selector;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Channel;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -249,7 +248,7 @@
iterator = null;
- PacketDispatcher.server.unregister(id);
+ session.getConnectionEndpoint().getServerPeer().getMinaService().getDispatcher().unregister(id);
closed = true;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -59,7 +59,6 @@
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
import static org.jboss.messaging.core.remoting.Assert.assertValidID;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -251,7 +250,7 @@
serverPeer.addSession(sessionID, ep);
- PacketDispatcher.server.register(ep.newHandler());
+ serverPeer.getMinaService().getDispatcher().register(ep.newHandler());
log.trace("created and registered " + ep);
@@ -407,7 +406,7 @@
cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
- PacketDispatcher.server.unregister(id);
+ serverPeer.getMinaService().getDispatcher().unregister(id);
closed = true;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -21,6 +21,10 @@
*/
package org.jboss.jms.server.endpoint;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETTOPOLOGY;
+
import java.util.List;
import java.util.Map;
@@ -31,25 +35,18 @@
import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.delegate.TopologyResult;
-import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.exception.MessagingJMSException;
+import org.jboss.jms.server.ServerPeer;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.Assert;
-import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
+import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
-import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackResponse;
-import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
+import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTAOPSTACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETTOPOLOGY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.Version;
@@ -231,7 +228,7 @@
final String connectionID = endpoint.getConnectionID();
- PacketDispatcher.server.register(endpoint.newHandler(connectionID));
+ serverPeer.getMinaService().getDispatcher().register(endpoint.newHandler(connectionID));
log.trace("created and registered " + endpoint);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -39,15 +39,12 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.DeliveryObserver;
-import org.jboss.messaging.newcore.Message;
-import org.jboss.messaging.newcore.MessageReference;
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.messaging.core.contract.Receiver;
import org.jboss.messaging.core.contract.Replicator;
import org.jboss.messaging.core.impl.SimpleDelivery;
import org.jboss.messaging.core.impl.tx.Transaction;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -58,6 +55,8 @@
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.newcore.Message;
+import org.jboss.messaging.newcore.MessageReference;
import org.jboss.messaging.util.ExceptionUtil;
/**
@@ -549,7 +548,7 @@
messageQueue.getLocalDistributor().remove(this);
}
- PacketDispatcher.server.unregister(id);
+ sp.getMinaService().getDispatcher().unregister(id);
// If this is a consumer of a non durable subscription then we want to unbind the
// subscription and delete all its data.
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -21,6 +21,21 @@
*/
package org.jboss.jms.server.endpoint;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
+
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
@@ -32,28 +47,26 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.delegate.Ack;
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.Cancel;
import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.delegate.DefaultAck;
import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.jms.delegate.DeliveryRecovery;
import org.jboss.jms.delegate.SessionEndpoint;
-import org.jboss.jms.delegate.DefaultAck;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
+import org.jboss.jms.exception.MessagingJMSException;
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.JMSCondition;
import org.jboss.jms.server.ServerPeer;
@@ -64,7 +77,6 @@
import org.jboss.jms.server.messagecounter.MessageCounter;
import org.jboss.jms.server.security.CheckType;
import org.jboss.jms.server.selector.Selector;
-import org.jboss.jms.exception.MessagingJMSException;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.Channel;
@@ -82,52 +94,42 @@
import org.jboss.messaging.core.impl.tx.TransactionException;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
import org.jboss.messaging.core.impl.tx.TxCallback;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
-import org.jboss.messaging.core.remoting.wireformat.SendMessage;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
-import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
-import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
-import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
-import org.jboss.messaging.core.remoting.wireformat.CreateBrowserRequest;
-import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
-import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
-import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
-import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
-import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
-import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.SendMessage;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
import org.jboss.messaging.newcore.Message;
import org.jboss.messaging.newcore.MessageReference;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.messaging.util.MessageQueueNameHelper;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
+
/**
* The server side representation of a JMS session.
*
@@ -382,7 +384,7 @@
connectionEndpoint.removeSession(id);
- PacketDispatcher.server.unregister(id);
+ sp.getMinaService().getDispatcher().unregister(id);
}
catch (Throwable t)
{
@@ -1843,7 +1845,7 @@
binding.queue.getName(), this, selectorString, false,
dest, null, null, 0, -1, true, false, prefetchSize);
- PacketDispatcher.server.register(ep.newHandler());
+ sp.getMinaService().getDispatcher().register(ep.newHandler());
ClientConsumerDelegate stub =
new ClientConsumerDelegate(consumerID, prefetchSize, -1, 0);
@@ -2150,7 +2152,7 @@
rep.put(queue.getName(), DUR_SUB_STATE_CONSUMERS);
}
- PacketDispatcher.server.register(ep.newHandler());
+ sp.getMinaService().getDispatcher().register(ep.newHandler());
ClientConsumerDelegate stub =
new ClientConsumerDelegate(consumerID, prefetchSize, maxDeliveryAttemptsToUse, redeliveryDelayToUse);
@@ -2208,7 +2210,7 @@
browsers.put(browserID, ep);
}
- PacketDispatcher.server.register(ep.newHandler());
+ sp.getMinaService().getDispatcher().register(ep.newHandler());
ClientBrowserDelegate stub = new ClientBrowserDelegate(browserID);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -35,7 +35,7 @@
* @return <code>true</code> if this locator has not already been
* registered, <code>false</code> else
*/
- boolean register(ServerLocator locator);
+ boolean register(ServerLocator locator, PacketDispatcher serverDispatcher);
/**
* @return <code>true</code> if this locator was registered,
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -33,7 +33,6 @@
// Static --------------------------------------------------------
public static final PacketDispatcher client = new PacketDispatcher();
- public static final PacketDispatcher server = new PacketDispatcher();
// Constructors --------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -10,13 +10,13 @@
import static org.jboss.messaging.core.remoting.TransportType.TCP;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.ConnectorRegistry;
import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.ServerLocator;
import org.jboss.messaging.core.remoting.TransportType;
import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
@@ -36,7 +36,7 @@
// Attributes ----------------------------------------------------
- public Set<ServerLocator> locators = new HashSet<ServerLocator>();
+ public Map<ServerLocator, PacketDispatcher> locators = new HashMap<ServerLocator, PacketDispatcher>();
public Map<ServerLocator, NIOConnectorHolder> connectors = new HashMap<ServerLocator, NIOConnectorHolder>();
@@ -46,9 +46,13 @@
* @return <code>true</code> if this locator has not already been registered,
* <code>false</code> else
*/
- public boolean register(ServerLocator locator)
+ public boolean register(ServerLocator locator, PacketDispatcher serverDispatcher)
{
- return locators.add(locator);
+ assert locator != null;
+ assert serverDispatcher != null;
+
+ PacketDispatcher previousDispatcher = locators.put(locator, serverDispatcher);
+ return (previousDispatcher == null);
}
/**
@@ -57,7 +61,8 @@
*/
public boolean unregister(ServerLocator locator)
{
- return locators.remove(locator);
+ PacketDispatcher dispatcher = locators.remove(locator);
+ return (dispatcher != null);
}
public synchronized NIOConnector getConnector(ServerLocator locator)
@@ -78,10 +83,10 @@
}
// check if the server is in the same vm than the client
- if (locators.contains(locator))
+ if (locators.containsKey(locator))
{
NIOConnector connector = new INVMConnector(locator.getHost(), locator
- .getPort());
+ .getPort(), locators.get(locator));
if (log.isDebugEnabled())
log.debug("Created " + connector.getServerURI() + " to connect to "
@@ -102,7 +107,7 @@
.getHost(), locator.getPort());
} else if (transport == INVM)
{
- connector = new INVMConnector(locator.getHost(), locator.getPort());
+ connector = new INVMConnector(locator.getHost(), locator.getPort(), locators.get(locator));
}
if (connector == null)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMConnector.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMConnector.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -13,6 +13,7 @@
import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.messaging.core.remoting.NIOConnector;
import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -31,6 +32,8 @@
private int port;
private INVMSession session;
+
+ private PacketDispatcher serverDispatcher;
// Static --------------------------------------------------------
@@ -38,12 +41,14 @@
// Public --------------------------------------------------------
- public INVMConnector(String host, int port)
+ public INVMConnector(String host, int port, PacketDispatcher serverDispatcher)
{
assert host != null;
+ assert serverDispatcher != null;
this.host = host;
this.port = port;
+ this.serverDispatcher = serverDispatcher;
}
// NIOConnector implementation -----------------------------------
@@ -51,7 +56,7 @@
public NIOSession connect()
throws IOException
{
- this.session = new INVMSession();
+ this.session = new INVMSession(serverDispatcher);
return session;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -35,16 +35,20 @@
private String id;
private ExecutorService executor;
private long correlationCounter;
+ private PacketDispatcher serverDispatcher;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public INVMSession()
+ public INVMSession(PacketDispatcher serverDispatcher)
{
+ assert serverDispatcher != null;
+
this.id = randomUUID().toString();
this.executor = Executors.newSingleThreadExecutor();
this.correlationCounter = 0;
+ this.serverDispatcher = serverDispatcher;
}
// Public --------------------------------------------------------
@@ -73,7 +77,7 @@
{
assert object instanceof AbstractPacket;
- PacketDispatcher.server.dispatch((AbstractPacket) object,
+ serverDispatcher.dispatch((AbstractPacket) object,
new PacketSender()
{
@@ -116,7 +120,7 @@
final CountDownLatch latch = new CountDownLatch(1);
final AbstractPacket[] responses = new AbstractPacket[1];
- PacketDispatcher.server.dispatch((AbstractPacket) packet,
+ serverDispatcher.dispatch((AbstractPacket) packet,
new PacketSender()
{
public void send(AbstractPacket response)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -47,8 +47,9 @@
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception
{
+ // FIXME ugly way to know we're on the server side
// close session only on the server side
- if (dispatcher == PacketDispatcher.server)
+ if (dispatcher != PacketDispatcher.client)
{
session.close();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -49,6 +49,8 @@
private int blockingRequestTimeout = 5;
+ private PacketDispatcher dispatcher;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -68,6 +70,7 @@
this.host = host;
this.port = port;
this.parameters = new HashMap<String, String>();
+ this.dispatcher = new PacketDispatcher();
}
// Public --------------------------------------------------------
@@ -84,6 +87,11 @@
return new ServerLocator(transport, host, port, parameters);
}
+ public PacketDispatcher getDispatcher()
+ {
+ return dispatcher;
+ }
+
public void start() throws Exception
{
if (acceptor == null)
@@ -103,10 +111,10 @@
acceptor.getSessionConfig().setKeepAlive(true);
acceptor.setDisconnectOnUnbind(false);
- acceptor.setHandler(new MinaHandler(PacketDispatcher.server));
+ acceptor.setHandler(new MinaHandler(dispatcher));
acceptor.bind();
- REGISTRY.register(getLocator());
+ REGISTRY.register(getLocator(), dispatcher);
}
}
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -33,14 +33,16 @@
*/
public abstract class ClientTestBase extends TestCase
{
- private ReversePacketHandler serverPacketHandler;
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private Client client;
-
+
+ private ReversePacketHandler serverPacketHandler;
+
+ private PacketDispatcher serverDispatcher;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -212,7 +214,7 @@
@Override
protected void setUp() throws Exception
{
- startServer();
+ serverDispatcher = startServer();
ServerLocator serverLocator = createServerLocator();
NIOConnector connector = createNIOConnector();
@@ -220,25 +222,26 @@
client.connect();
serverPacketHandler = new ReversePacketHandler();
- PacketDispatcher.server.register(serverPacketHandler);
+ serverDispatcher.register(serverPacketHandler);
}
@Override
protected void tearDown() throws Exception
{
- PacketDispatcher.server.unregister(serverPacketHandler.getID());
+ serverDispatcher.unregister(serverPacketHandler.getID());
client.disconnect();
stopServer();
client = null;
+ serverDispatcher = null;
}
protected abstract ServerLocator createServerLocator();
protected abstract NIOConnector createNIOConnector();
- protected abstract void startServer() throws Exception;
+ protected abstract PacketDispatcher startServer() throws Exception;
protected abstract void stopServer();
}
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/invm/test/unit/INVMClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/invm/test/unit/INVMClientTest.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/invm/test/unit/INVMClientTest.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -10,6 +10,7 @@
import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.ServerLocator;
import org.jboss.messaging.core.remoting.impl.ClientTestBase;
import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
@@ -26,6 +27,7 @@
// Attributes ----------------------------------------------------
+ PacketDispatcher dispatcher = new PacketDispatcher();
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -37,7 +39,7 @@
@Override
protected NIOConnector createNIOConnector()
{
- return new INVMConnector("localhost", PORT);
+ return new INVMConnector("localhost", PORT, dispatcher);
}
@Override
@@ -47,15 +49,15 @@
}
@Override
- protected void startServer() throws Exception
+ protected PacketDispatcher startServer() throws Exception
{
- // no op
+ return dispatcher;
}
@Override
protected void stopServer()
{
- // no op
+ dispatcher = null;
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -10,6 +10,7 @@
import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.ServerLocator;
import org.jboss.messaging.core.remoting.impl.ClientTestBase;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
@@ -51,10 +52,11 @@
}
@Override
- protected void startServer() throws Exception
+ protected PacketDispatcher startServer() throws Exception
{
service = new MinaService(TCP, "localhost", PORT);
service.start();
+ return service.getDispatcher();
}
@Override
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java 2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java 2008-01-11 15:01:59 UTC (rev 3559)
@@ -13,6 +13,7 @@
import org.jboss.messaging.core.remoting.ConnectorRegistry;
import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.ServerLocator;
import org.jboss.messaging.core.remoting.impl.ConnectorRegistryImpl;
@@ -35,11 +36,13 @@
// Public --------------------------------------------------------
private ConnectorRegistry registry;
+ private PacketDispatcher dispatcher;
@Override
protected void setUp() throws Exception
{
registry = new ConnectorRegistryImpl();
+ dispatcher = new PacketDispatcher();
assertEquals(0, registry.getRegisteredLocators().length);
}
@@ -54,13 +57,13 @@
{
ServerLocator locator = new ServerLocator(TCP, "localhost", PORT);
- assertTrue(registry.register(locator));
- assertFalse(registry.register(locator));
+ assertTrue(registry.register(locator, dispatcher));
+ assertFalse(registry.register(locator, dispatcher));
assertTrue(registry.unregister(locator));
assertFalse(registry.unregister(locator));
- assertTrue(registry.register(locator));
+ assertTrue(registry.register(locator, dispatcher));
assertTrue(registry.unregister(locator));
}
@@ -69,7 +72,7 @@
ServerLocator locator = new ServerLocator(TCP, "localhost", PORT);
// locator is registered -> client and server are in the same vm
- assertTrue(registry.register(locator));
+ assertTrue(registry.register(locator, dispatcher));
NIOConnector connector = registry.getConnector(locator);
More information about the jboss-cvs-commits
mailing list