[jboss-cvs] JBoss Messaging SVN: r3511 - in trunk: src/main/org/jboss/jms/client/remoting and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 17 04:45:46 EST 2007


Author: jmesnil
Date: 2007-12-17 04:45:46 -0500 (Mon, 17 Dec 2007)
New Revision: 3511

Modified:
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/Client.java
   trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
   trunk/src/main/org/jboss/messaging/core/remoting/ServerLocator.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ClientTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1201 share remoting NIOConnectors to a same server
* enhanced ConnectorRegistry to keep track of NIOConnector created for each ServerLocator

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -211,6 +211,9 @@
       try
       {
          client.disconnect();
+         NIOConnector connector = ConnectorRegistry.removeConnector(new ServerLocator(serverLocatorURI));
+         if (connector != null)
+            connector.disconnect();
       } catch (Throwable t)
       {
          throw handleThrowable(t);
@@ -277,7 +280,7 @@
       try
       {
          ServerLocator locator = new ServerLocator(serverLocatorURI);
-         NIOConnector connector = ConnectorRegistry.get(locator);
+         NIOConnector connector = ConnectorRegistry.getConnector(locator);
          client = new Client(connector, locator);
          client.connect();
       }

Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -80,7 +80,7 @@
 
       callbackManager = new CallbackManager();
 
-      NIOConnector connector = ConnectorRegistry.get(serverLocator);
+      NIOConnector connector = ConnectorRegistry.getConnector(serverLocator);
       client = new Client(connector, serverLocator);
       client.connect();
 
@@ -97,6 +97,9 @@
       try
       {
          client.disconnect();
+         NIOConnector connector = ConnectorRegistry.removeConnector(serverLocator);
+         if (connector != null)
+            connector.disconnect();
       }
       catch (Throwable ignore)
       {        

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -138,7 +138,7 @@
                                       loadBalancingFactory, strictTck);               
          connectorManager.registerConnector(getName());
       
-         log.info(serverLocator + " has lease disabled");
+         log.info("Server locator is " + serverLocator);
          log.info(this + " started");
       }
       catch (Throwable t)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Client.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Client.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -64,8 +64,12 @@
 
    public boolean disconnect() throws Exception
    {
+      if (session == null)
+      {
+         return false;         
+      }
       session = null;
-      return connector.disconnect();
+      return true;
    }
 
    public String getSessionID()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -9,9 +9,12 @@
 import static org.jboss.messaging.core.remoting.TransportType.INVM;
 import static org.jboss.messaging.core.remoting.TransportType.TCP;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
+import org.jboss.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
 import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
 import org.jboss.messaging.core.remoting.impl.mina.MinaService;
@@ -37,48 +40,150 @@
 {
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(ConnectorRegistry.class);
+
    // Attributes ----------------------------------------------------
 
+   private static Set<ServerLocator> locators = new HashSet<ServerLocator>();
+
+   private static Map<ServerLocator, NIOConnectorHolder> connectors = new HashMap<ServerLocator, NIOConnectorHolder>();
+
    // Static --------------------------------------------------------
 
-   private static Set<ServerLocator> locators = new HashSet<ServerLocator>();
-
+   /**
+    * @return <code>true</code> if this locator has not already been registered,
+    *         <code>false</code> else
+    */
    public static boolean register(ServerLocator locator)
    {
       return locators.add(locator);
    }
 
-   public static boolean unregister(ServerLocator locator)
+   /**
+    * @return <code>true</code> if this locator was registered,
+    *         <code>false</code> else
+    */   public static boolean unregister(ServerLocator locator)
    {
       return locators.remove(locator);
    }
 
-   public static NIOConnector get(ServerLocator locator)
+   public static synchronized NIOConnector getConnector(ServerLocator locator)
    {
       assert locator != null;
 
+      if (connectors.containsKey(locator))
+      {
+         NIOConnectorHolder holder = connectors.get(locator);
+         holder.increment();
+         NIOConnector connector = holder.getConnector();
+
+         if (log.isDebugEnabled())
+            log.debug("Reuse " + connector.getServerURI() + " to connect to "
+                  + locator + " [count=" + holder.getCount() + "]");
+
+         return connector;
+      }
+
       // check if the server is in the same vm than the client
       if (locators.contains(locator))
       {
-         return new INVMConnector(locator.getHost(), locator.getPort());
+         NIOConnector connector = new INVMConnector(locator.getHost(), locator
+               .getPort());
+
+         if (log.isDebugEnabled())
+            log.debug("Created " + connector.getServerURI() + " to connect to "
+                  + locator);
+
+         NIOConnectorHolder holder = new NIOConnectorHolder(connector);
+         connectors.put(locator, holder);
+         return connector;
       }
 
+      NIOConnector connector = null;
+
       TransportType transport = locator.getTransport();
 
       if (transport == TCP)
       {
-         return new MinaConnector(locator.getTransport(), locator.getHost(),
-               locator.getPort());
+         connector = new MinaConnector(locator.getTransport(), locator
+               .getHost(), locator.getPort());
       } else if (transport == INVM)
       {
-         return new INVMConnector(locator.getHost(), locator.getPort());
-      } else
+         connector = new INVMConnector(locator.getHost(), locator.getPort());
+      }
+
+      if (connector == null)
       {
          throw new IllegalArgumentException(
                "no connector defined for transport " + transport);
       }
+
+      if (log.isDebugEnabled())
+         log.debug("Created " + connector.getServerURI() + " to connect to "
+               + locator);
+      NIOConnectorHolder holder = new NIOConnectorHolder(connector);
+      connectors.put(locator, holder);
+      return connector;
    }
 
+   /**
+    * Decrement the number of references on the NIOConnector corresponding to
+    * the locator.
+    * 
+    * If there is only one reference, remove it from the connectors Map and
+    * returns it. Otherwise return null.
+    * 
+    * @param locator
+    *           a ServerLocator
+    * @return the NIOConnector if there is no longer any references to it or
+    *         <code>null</code>
+    * @throws IllegalStateException
+    *            if no NIOConnector were created for the given locator
+    */
+   public synchronized static NIOConnector removeConnector(ServerLocator locator)
+   {
+      assert locator != null;
+
+      NIOConnectorHolder holder = connectors.get(locator);
+      if (holder == null)
+      {
+         throw new IllegalStateException("No Connector were created for "
+               + locator);
+      }
+
+      if (holder.getCount() == 1)
+      {
+         if (log.isDebugEnabled())
+            log.debug("Removed connector for " + locator);
+         connectors.remove(locator);
+         return holder.getConnector();
+      } else
+      {
+         holder.decrement();
+         if (log.isDebugEnabled())
+            log.debug(holder.getCount() + " remaining reference to "
+                  + holder.getConnector().getServerURI() + " to " + locator);
+         return null;
+      }
+   }
+
+   public static ServerLocator[] getRegisteredLocators()
+   {
+      Set<ServerLocator> registeredLocators = connectors.keySet();
+      return (ServerLocator[]) registeredLocators
+            .toArray(new ServerLocator[registeredLocators.size()]);
+   }
+
+   public static Object getConnectorCount(ServerLocator locator)
+   {
+      NIOConnectorHolder holder = connectors.get(locator);
+      if (holder == null)
+      {
+         return 0;
+      }
+      return holder.getCount();
+   }
+
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
@@ -90,4 +195,42 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
+
+   static class NIOConnectorHolder
+   {
+      private final NIOConnector connector;
+      private int count;
+
+      public NIOConnectorHolder(NIOConnector connector)
+      {
+         assert connector != null;
+
+         this.connector = connector;
+         this.count = 1;
+      }
+
+      void increment()
+      {
+         assert count > 0;
+
+         count++;
+      }
+
+      void decrement()
+      {
+         count--;
+
+         assert count > 0;
+      }
+
+      int getCount()
+      {
+         return count;
+      }
+
+      public NIOConnector getConnector()
+      {
+         return connector;
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/ServerLocator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ServerLocator.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ServerLocator.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -124,7 +124,7 @@
    @Override
    public String toString()
    {
-      return "RemoteServiceLocator[uri=" + getURI() + "]";
+      return "ServerLocator[uri=" + getURI() + "]";
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -91,11 +91,10 @@
       connector.getSessionConfig().setReuseAddress(true);
    }
 
-
    // NIOConnector implementation -----------------------------------
    
-   public NIOSession connect() throws IOException {
-   
+   public NIOSession connect() throws IOException 
+   {
       InetSocketAddress address = new InetSocketAddress(host, port);
       ConnectFuture future = connector.connect(address);
       connector.setDefaultRemoteAddress(address);

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ClientTest.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ClientTest.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -50,19 +50,15 @@
       NIOSession session1 = createStrictMock(NIOSession.class);
       NIOSession session2 = createStrictMock(NIOSession.class);
       
-      expect(connector.connect()).andReturn(session1);
-      expect(connector.disconnect()).andReturn(true);
-      
-      expect(connector.connect()).andReturn(session2);
+      expect(connector.connect()).andReturn(session1).andReturn(session2);
+      expect(session1.isConnected()).andReturn(true);
       expect(session2.isConnected()).andReturn(true);
       
-      expect(connector.disconnect()).andReturn(true);
-      expect(connector.disconnect()).andReturn(false);
-
       replay(connector, session1, session2);
 
       Client client = new Client(connector, serverLocator);
       client.connect();
+      assertTrue(client.isConnected());
       assertTrue(client.disconnect());
       assertFalse(client.isConnected());
 
@@ -106,7 +102,6 @@
       expect(connector.connect()).andReturn(session);
       expect(session.isConnected()).andReturn(true);
       expect(session.getID()).andReturn(sessionID);
-      expect(connector.disconnect()).andReturn(true);
       
       replay(connector, session);
       
@@ -133,7 +128,6 @@
       expect(connector.getServerURI()).andReturn(null);
       expect(connector.connect()).andReturn(session);
       expect(connector.getServerURI()).andReturn("tcp://localhost:" + PORT);
-      expect(connector.disconnect()).andReturn(true);
       expect(connector.getServerURI()).andReturn(null);
       // no expectation for the session
       

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java	2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java	2007-12-17 09:45:46 UTC (rev 3511)
@@ -33,6 +33,18 @@
 
    // Public --------------------------------------------------------
    
+   @Override
+   protected void setUp() throws Exception
+   {
+      assertEquals(0, ConnectorRegistry.getRegisteredLocators().length);
+   }
+   
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, ConnectorRegistry.getRegisteredLocators().length);
+   }
+   
    public void testLocatorRegistration() throws Exception
    {
       ServerLocator locator = new ServerLocator(TCP, "localhost", PORT);
@@ -54,11 +66,13 @@
       // locator is registered -> client and server are in the same vm
       assertTrue(ConnectorRegistry.register(locator));
       
-      NIOConnector connector = ConnectorRegistry.get(locator);
+      NIOConnector connector = ConnectorRegistry.getConnector(locator);
       
       assertTrue(connector.getServerURI().startsWith(INVM.toString()));
       
       assertTrue(ConnectorRegistry.unregister(locator));
+      
+      assertNotNull(ConnectorRegistry.removeConnector(locator));
    }
    
    
@@ -68,12 +82,62 @@
       
       // locator is not registered -> client and server are not in the same vm
       
-      NIOConnector connector = ConnectorRegistry.get(locator);
+      NIOConnector connector = ConnectorRegistry.getConnector(locator);
       
       assertNotNull(connector);
       assertEquals(locator.getURI(), connector.getServerURI());
+      
+      assertNotNull(ConnectorRegistry.removeConnector(locator));
    }
+   
+   public void testConnectorCount() throws Exception
+   {
+      ServerLocator locator = new ServerLocator(TCP, "localhost", PORT);
+      assertEquals(0, ConnectorRegistry.getConnectorCount(locator));
 
+      NIOConnector connector1 = ConnectorRegistry.getConnector(locator);
+      assertEquals(1, ConnectorRegistry.getConnectorCount(locator));
+
+      NIOConnector connector2 = ConnectorRegistry.getConnector(locator);
+      assertEquals(2, ConnectorRegistry.getConnectorCount(locator));
+
+      assertSame(connector1, connector2);
+      
+      assertNull(ConnectorRegistry.removeConnector(locator));
+      assertEquals(1, ConnectorRegistry.getConnectorCount(locator));
+
+      NIOConnector connector3 = ConnectorRegistry.getConnector(locator);
+      assertEquals(2, ConnectorRegistry.getConnectorCount(locator));
+
+      assertSame(connector1, connector3);
+      
+      assertNull(ConnectorRegistry.removeConnector(locator));
+      assertNotNull(ConnectorRegistry.removeConnector(locator));
+      assertEquals(0, ConnectorRegistry.getConnectorCount(locator));
+   }
+   
+   public void testConnectorCount_2() throws Exception
+   {
+      ServerLocator locator1 = new ServerLocator(TCP, "localhost", PORT);
+      ServerLocator locator2 = new ServerLocator(TCP, "127.0.0.1", PORT);
+
+      assertNotSame(locator1, locator2);
+      
+      assertEquals(0, ConnectorRegistry.getConnectorCount(locator1));
+      assertEquals(0, ConnectorRegistry.getConnectorCount(locator2));
+
+      NIOConnector connector1 = ConnectorRegistry.getConnector(locator1);
+      assertEquals(1, ConnectorRegistry.getConnectorCount(locator1));
+
+      NIOConnector connector2 = ConnectorRegistry.getConnector(locator2);
+      assertEquals(1, ConnectorRegistry.getConnectorCount(locator2));
+      
+      assertNotSame(connector1, connector2);
+      
+      assertNotNull(ConnectorRegistry.removeConnector(locator1));
+      assertNotNull(ConnectorRegistry.removeConnector(locator2));
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list