[jboss-cvs] JBoss Messaging SVN: r3559 - in trunk: src/main/org/jboss/jms/server/endpoint and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 11 10:02:00 EST 2008


Author: jmesnil
Date: 2008-01-11 10:01:59 -0500 (Fri, 11 Jan 2008)
New Revision: 3559

Modified:
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
   trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/invm/test/unit/INVMClientTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java
Log:
* refactored server-side PacketDispatcher: removed PacketDispatcher.server singleton and replaced it by using MinaService.getDispatcher() instead

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -216,7 +216,7 @@
 
       // Registering with the dispatcher should always be the last thing otherwise a client could
       // use a partially initialised object
-      PacketDispatcher.server.register(endpoint.newHandler());
+      serverPeer.getMinaService().getDispatcher().register(endpoint.newHandler());
       
       // Replicate the change - we will ignore this locally
 
@@ -265,7 +265,7 @@
          }
       }
 
-      PacketDispatcher.server.unregister(endpoint.getID());
+      serverPeer.getMinaService().getDispatcher().unregister(endpoint.getID());
    }
 
    // MessagingComponent implementation ------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -39,7 +39,6 @@
 import org.jboss.jms.server.selector.Selector;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Channel;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -249,7 +248,7 @@
       
       iterator = null;
       
-      PacketDispatcher.server.unregister(id);
+      session.getConnectionEndpoint().getServerPeer().getMinaService().getDispatcher().unregister(id);
       
       closed = true;
    }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -59,7 +59,6 @@
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
 import static org.jboss.messaging.core.remoting.Assert.assertValidID;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -251,7 +250,7 @@
 
          serverPeer.addSession(sessionID, ep);
 
-         PacketDispatcher.server.register(ep.newHandler());
+         serverPeer.getMinaService().getDispatcher().register(ep.newHandler());
          
          log.trace("created and registered " + ep);
 
@@ -407,7 +406,7 @@
 
          cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
 
-         PacketDispatcher.server.unregister(id);
+         serverPeer.getMinaService().getDispatcher().unregister(id);
 
          closed = true;
       }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -21,6 +21,10 @@
   */
 package org.jboss.jms.server.endpoint;
 
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETTOPOLOGY;
+
 import java.util.List;
 import java.util.Map;
 
@@ -31,25 +35,18 @@
 import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
 import org.jboss.jms.delegate.CreateConnectionResult;
 import org.jboss.jms.delegate.TopologyResult;
-import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.exception.MessagingJMSException;
+import org.jboss.jms.server.ServerPeer;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.Assert;
-import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
+import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
 import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
-import org.jboss.messaging.core.remoting.wireformat.GetClientAOPStackResponse;
-import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
+import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTAOPSTACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETTOPOLOGY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.Version;
 
@@ -231,7 +228,7 @@
 
       final String connectionID = endpoint.getConnectionID();
 
-      PacketDispatcher.server.register(endpoint.newHandler(connectionID));
+      serverPeer.getMinaService().getDispatcher().register(endpoint.newHandler(connectionID));
 
       log.trace("created and registered " + endpoint);
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -39,15 +39,12 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.DeliveryObserver;
-import org.jboss.messaging.newcore.Message;
-import org.jboss.messaging.newcore.MessageReference;
 import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.contract.Queue;
 import org.jboss.messaging.core.contract.Receiver;
 import org.jboss.messaging.core.contract.Replicator;
 import org.jboss.messaging.core.impl.SimpleDelivery;
 import org.jboss.messaging.core.impl.tx.Transaction;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -58,6 +55,8 @@
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.newcore.Message;
+import org.jboss.messaging.newcore.MessageReference;
 import org.jboss.messaging.util.ExceptionUtil;
 
 /**
@@ -549,7 +548,7 @@
       	messageQueue.getLocalDistributor().remove(this);
       }
 
-      PacketDispatcher.server.unregister(id);
+      sp.getMinaService().getDispatcher().unregister(id);
       
       // If this is a consumer of a non durable subscription then we want to unbind the
       // subscription and delete all its data.

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -21,6 +21,21 @@
   */
 package org.jboss.jms.server.endpoint;
 
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
+
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,28 +47,26 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
 import org.jboss.jms.client.delegate.ClientBrowserDelegate;
 import org.jboss.jms.client.delegate.ClientConsumerDelegate;
 import org.jboss.jms.delegate.Ack;
 import org.jboss.jms.delegate.BrowserDelegate;
 import org.jboss.jms.delegate.Cancel;
 import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.delegate.DefaultAck;
 import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.jms.delegate.DeliveryRecovery;
 import org.jboss.jms.delegate.SessionEndpoint;
-import org.jboss.jms.delegate.DefaultAck;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.destination.JBossTopic;
+import org.jboss.jms.exception.MessagingJMSException;
 import org.jboss.jms.server.DestinationManager;
 import org.jboss.jms.server.JMSCondition;
 import org.jboss.jms.server.ServerPeer;
@@ -64,7 +77,6 @@
 import org.jboss.jms.server.messagecounter.MessageCounter;
 import org.jboss.jms.server.security.CheckType;
 import org.jboss.jms.server.selector.Selector;
-import org.jboss.jms.exception.MessagingJMSException;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
 import org.jboss.messaging.core.contract.Channel;
@@ -82,52 +94,42 @@
 import org.jboss.messaging.core.impl.tx.TransactionException;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
 import org.jboss.messaging.core.impl.tx.TxCallback;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
-import org.jboss.messaging.core.remoting.wireformat.SendMessage;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
-import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
-import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
-import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
-import org.jboss.messaging.core.remoting.wireformat.CreateBrowserRequest;
-import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
+import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
-import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
-import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
-import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.ClosingRequest;
 import org.jboss.messaging.core.remoting.wireformat.ClosingResponse;
-import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
-import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateBrowserResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateConsumerResponse;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationRequest;
+import org.jboss.messaging.core.remoting.wireformat.CreateDestinationResponse;
 import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
+import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEDESTINATION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATEBROWSER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ACKDELIVERIES;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_RECOVERDELIVERIES;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CANCELDELIVERIES;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CLOSING;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_ADDTEMPORARYDESTINATION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_DELETETEMPORARYDESTINATION;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.RecoverDeliveriesMessage;
+import org.jboss.messaging.core.remoting.wireformat.SendMessage;
+import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
 import org.jboss.messaging.newcore.Message;
 import org.jboss.messaging.newcore.MessageReference;
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.GUIDGenerator;
 import org.jboss.messaging.util.MessageQueueNameHelper;
 
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
+
 /**
  * The server side representation of a JMS session.
  * 
@@ -382,7 +384,7 @@
          
          connectionEndpoint.removeSession(id);
          
-         PacketDispatcher.server.unregister(id);
+         sp.getMinaService().getDispatcher().unregister(id);
       }
       catch (Throwable t)
       {
@@ -1843,7 +1845,7 @@
                                     binding.queue.getName(), this, selectorString, false,
                                     dest, null, null, 0, -1, true, false, prefetchSize);
       
-      PacketDispatcher.server.register(ep.newHandler());
+      sp.getMinaService().getDispatcher().register(ep.newHandler());
       
       ClientConsumerDelegate stub =
          new ClientConsumerDelegate(consumerID, prefetchSize, -1, 0);
@@ -2150,7 +2152,7 @@
       	rep.put(queue.getName(), DUR_SUB_STATE_CONSUMERS);
       }
       
-      PacketDispatcher.server.register(ep.newHandler());
+      sp.getMinaService().getDispatcher().register(ep.newHandler());
 
       ClientConsumerDelegate stub =
          new ClientConsumerDelegate(consumerID, prefetchSize, maxDeliveryAttemptsToUse, redeliveryDelayToUse);
@@ -2208,7 +2210,7 @@
          browsers.put(browserID, ep);
       }
 
-      PacketDispatcher.server.register(ep.newHandler());
+      sp.getMinaService().getDispatcher().register(ep.newHandler());
       
       ClientBrowserDelegate stub = new ClientBrowserDelegate(browserID);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -35,7 +35,7 @@
     * @return <code>true</code> if this locator has not already been
     *         registered, <code>false</code> else
     */
-   boolean register(ServerLocator locator);
+   boolean register(ServerLocator locator, PacketDispatcher serverDispatcher);
 
    /**
     * @return <code>true</code> if this locator was registered,

Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -33,7 +33,6 @@
    // Static --------------------------------------------------------
 
    public static final PacketDispatcher client = new PacketDispatcher();
-   public static final PacketDispatcher server = new PacketDispatcher();
 
    // Constructors --------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectorRegistryImpl.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -10,13 +10,13 @@
 import static org.jboss.messaging.core.remoting.TransportType.TCP;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.remoting.ConnectorRegistry;
 import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.ServerLocator;
 import org.jboss.messaging.core.remoting.TransportType;
 import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
@@ -36,7 +36,7 @@
 
    // Attributes ----------------------------------------------------
 
-   public Set<ServerLocator> locators = new HashSet<ServerLocator>();
+   public Map<ServerLocator, PacketDispatcher> locators = new HashMap<ServerLocator, PacketDispatcher>();
 
    public Map<ServerLocator, NIOConnectorHolder> connectors = new HashMap<ServerLocator, NIOConnectorHolder>();
 
@@ -46,9 +46,13 @@
     * @return <code>true</code> if this locator has not already been registered,
     *         <code>false</code> else
     */
-   public boolean register(ServerLocator locator)
+   public boolean register(ServerLocator locator, PacketDispatcher serverDispatcher)
    {
-      return locators.add(locator);
+      assert locator != null;
+      assert serverDispatcher != null;
+      
+      PacketDispatcher previousDispatcher = locators.put(locator, serverDispatcher);
+      return (previousDispatcher == null);
    }
 
    /**
@@ -57,7 +61,8 @@
     */  
    public boolean unregister(ServerLocator locator)
    {
-      return locators.remove(locator);
+       PacketDispatcher dispatcher = locators.remove(locator);
+       return (dispatcher != null);
    }
 
    public synchronized NIOConnector getConnector(ServerLocator locator)
@@ -78,10 +83,10 @@
       }
 
       // check if the server is in the same vm than the client
-      if (locators.contains(locator))
+      if (locators.containsKey(locator))
       {
          NIOConnector connector = new INVMConnector(locator.getHost(), locator
-               .getPort());
+               .getPort(), locators.get(locator));
 
          if (log.isDebugEnabled())
             log.debug("Created " + connector.getServerURI() + " to connect to "
@@ -102,7 +107,7 @@
                .getHost(), locator.getPort());
       } else if (transport == INVM)
       {
-         connector = new INVMConnector(locator.getHost(), locator.getPort());
+         connector = new INVMConnector(locator.getHost(), locator.getPort(), locators.get(locator));
       }
 
       if (connector == null)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMConnector.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMConnector.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -13,6 +13,7 @@
 import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.messaging.core.remoting.NIOConnector;
 import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -31,6 +32,8 @@
    private int port;
 
    private INVMSession session;
+
+   private PacketDispatcher serverDispatcher;
  
    // Static --------------------------------------------------------
 
@@ -38,12 +41,14 @@
 
    // Public --------------------------------------------------------
 
-   public INVMConnector(String host, int port)
+   public INVMConnector(String host, int port, PacketDispatcher serverDispatcher)
    {
       assert host != null;
+      assert serverDispatcher != null;
       
       this.host = host;
       this.port = port;
+      this.serverDispatcher = serverDispatcher;
    }
 
    // NIOConnector implementation -----------------------------------
@@ -51,7 +56,7 @@
    public NIOSession connect()
          throws IOException
    {
-      this.session = new INVMSession();
+      this.session = new INVMSession(serverDispatcher);
       return session;
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -35,16 +35,20 @@
    private String id;
    private ExecutorService executor;
    private long correlationCounter;
+   private PacketDispatcher serverDispatcher;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public INVMSession()
+   public INVMSession(PacketDispatcher serverDispatcher)
    {
+      assert serverDispatcher != null;
+      
       this.id = randomUUID().toString();
       this.executor = Executors.newSingleThreadExecutor();
       this.correlationCounter = 0;
+      this.serverDispatcher = serverDispatcher;
    }
 
    // Public --------------------------------------------------------
@@ -73,7 +77,7 @@
    {
       assert object instanceof AbstractPacket;
 
-      PacketDispatcher.server.dispatch((AbstractPacket) object,
+      serverDispatcher.dispatch((AbstractPacket) object,
             new PacketSender()
             {
 
@@ -116,7 +120,7 @@
          final CountDownLatch latch = new CountDownLatch(1);
          final AbstractPacket[] responses = new AbstractPacket[1];
 
-         PacketDispatcher.server.dispatch((AbstractPacket) packet,
+         serverDispatcher.dispatch((AbstractPacket) packet,
                new PacketSender()
                {
                   public void send(AbstractPacket response)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -47,8 +47,9 @@
    public void exceptionCaught(IoSession session, Throwable cause)
          throws Exception
    {
+      // FIXME ugly way to know we're on the server side
       // close session only on the server side
-      if (dispatcher == PacketDispatcher.server)
+      if (dispatcher != PacketDispatcher.client)
       {
          session.close();
       }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -49,6 +49,8 @@
 
    private int blockingRequestTimeout = 5;
 
+   private PacketDispatcher dispatcher;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -68,6 +70,7 @@
       this.host = host;
       this.port = port;
       this.parameters = new HashMap<String, String>();
+      this.dispatcher = new PacketDispatcher();
    }
 
    // Public --------------------------------------------------------
@@ -84,6 +87,11 @@
       return new ServerLocator(transport, host, port, parameters);
    }
    
+   public PacketDispatcher getDispatcher()
+   {
+      return dispatcher;
+   }
+   
    public void start() throws Exception
    {
       if (acceptor == null)
@@ -103,10 +111,10 @@
          acceptor.getSessionConfig().setKeepAlive(true);
          acceptor.setDisconnectOnUnbind(false);
 
-         acceptor.setHandler(new MinaHandler(PacketDispatcher.server));
+         acceptor.setHandler(new MinaHandler(dispatcher));
          acceptor.bind();
          
-         REGISTRY.register(getLocator());
+         REGISTRY.register(getLocator(), dispatcher);
       } 
    }
 

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -33,14 +33,16 @@
  */
 public abstract class ClientTestBase extends TestCase
 {
-   private ReversePacketHandler serverPacketHandler;
-
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
 
    private Client client;
-   
+ 
+   private ReversePacketHandler serverPacketHandler;
+
+   private PacketDispatcher serverDispatcher;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -212,7 +214,7 @@
    @Override
    protected void setUp() throws Exception
    {
-      startServer();
+      serverDispatcher = startServer();
       
       ServerLocator serverLocator = createServerLocator();
       NIOConnector connector = createNIOConnector();
@@ -220,25 +222,26 @@
       client.connect();
       
       serverPacketHandler = new ReversePacketHandler();
-      PacketDispatcher.server.register(serverPacketHandler);
+      serverDispatcher.register(serverPacketHandler);
    }
 
    @Override
    protected void tearDown() throws Exception
    {
-      PacketDispatcher.server.unregister(serverPacketHandler.getID());
+      serverDispatcher.unregister(serverPacketHandler.getID());
 
       client.disconnect();
       stopServer();
       
       client = null;
+      serverDispatcher = null;
    }
    
    protected abstract ServerLocator createServerLocator();
    
    protected abstract NIOConnector createNIOConnector();
 
-   protected abstract void startServer() throws Exception;
+   protected abstract PacketDispatcher startServer() throws Exception;
    
    protected abstract void stopServer();
 }

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/invm/test/unit/INVMClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/invm/test/unit/INVMClientTest.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/invm/test/unit/INVMClientTest.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -10,6 +10,7 @@
 import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
 
 import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.ServerLocator;
 import org.jboss.messaging.core.remoting.impl.ClientTestBase;
 import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
@@ -26,6 +27,7 @@
 
    // Attributes ----------------------------------------------------
 
+   PacketDispatcher dispatcher = new PacketDispatcher();
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -37,7 +39,7 @@
    @Override
    protected NIOConnector createNIOConnector()
    {
-      return new INVMConnector("localhost", PORT);
+      return new INVMConnector("localhost", PORT, dispatcher);
    }
    
    @Override
@@ -47,15 +49,15 @@
    }
    
    @Override
-   protected void startServer() throws Exception
+   protected PacketDispatcher startServer() throws Exception
    {
-      // no op
+      return dispatcher;
    }
    
    @Override
    protected void stopServer()
    {
-      // no op
+      dispatcher = null;
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaClientTest.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -10,6 +10,7 @@
 import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
 
 import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.ServerLocator;
 import org.jboss.messaging.core.remoting.impl.ClientTestBase;
 import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
@@ -51,10 +52,11 @@
    }
 
    @Override
-   protected void startServer() throws Exception
+   protected PacketDispatcher startServer() throws Exception
    {
       service = new MinaService(TCP, "localhost", PORT);
       service.start();
+      return service.getDispatcher();
    }
 
    @Override

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java	2008-01-11 14:36:03 UTC (rev 3558)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java	2008-01-11 15:01:59 UTC (rev 3559)
@@ -13,6 +13,7 @@
 
 import org.jboss.messaging.core.remoting.ConnectorRegistry;
 import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.ServerLocator;
 import org.jboss.messaging.core.remoting.impl.ConnectorRegistryImpl;
 
@@ -35,11 +36,13 @@
    // Public --------------------------------------------------------
    
    private ConnectorRegistry registry;
+   private PacketDispatcher dispatcher;
 
    @Override
    protected void setUp() throws Exception
    {
       registry = new ConnectorRegistryImpl();
+      dispatcher = new PacketDispatcher();
       assertEquals(0, registry.getRegisteredLocators().length);
    }
    
@@ -54,13 +57,13 @@
    {
       ServerLocator locator = new ServerLocator(TCP, "localhost", PORT);
       
-      assertTrue(registry.register(locator));
-      assertFalse(registry.register(locator));
+      assertTrue(registry.register(locator, dispatcher));
+      assertFalse(registry.register(locator, dispatcher));
       
       assertTrue(registry.unregister(locator));
       assertFalse(registry.unregister(locator));
 
-      assertTrue(registry.register(locator));
+      assertTrue(registry.register(locator, dispatcher));
       assertTrue(registry.unregister(locator));
    }
    
@@ -69,7 +72,7 @@
       ServerLocator locator = new ServerLocator(TCP, "localhost", PORT);
       
       // locator is registered -> client and server are in the same vm
-      assertTrue(registry.register(locator));
+      assertTrue(registry.register(locator, dispatcher));
       
       NIOConnector connector = registry.getConnector(locator);
       




More information about the jboss-cvs-commits mailing list