[jboss-cvs] JBoss Messaging SVN: r3511 - in trunk: src/main/org/jboss/jms/client/remoting and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 17 04:45:46 EST 2007
Author: jmesnil
Date: 2007-12-17 04:45:46 -0500 (Mon, 17 Dec 2007)
New Revision: 3511
Modified:
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/Client.java
trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
trunk/src/main/org/jboss/messaging/core/remoting/ServerLocator.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ClientTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1201 share remoting NIOConnectors to a same server
* enhanced ConnectorRegistry to keep track of NIOConnector created for each ServerLocator
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -211,6 +211,9 @@
try
{
client.disconnect();
+ NIOConnector connector = ConnectorRegistry.removeConnector(new ServerLocator(serverLocatorURI));
+ if (connector != null)
+ connector.disconnect();
} catch (Throwable t)
{
throw handleThrowable(t);
@@ -277,7 +280,7 @@
try
{
ServerLocator locator = new ServerLocator(serverLocatorURI);
- NIOConnector connector = ConnectorRegistry.get(locator);
+ NIOConnector connector = ConnectorRegistry.getConnector(locator);
client = new Client(connector, locator);
client.connect();
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -80,7 +80,7 @@
callbackManager = new CallbackManager();
- NIOConnector connector = ConnectorRegistry.get(serverLocator);
+ NIOConnector connector = ConnectorRegistry.getConnector(serverLocator);
client = new Client(connector, serverLocator);
client.connect();
@@ -97,6 +97,9 @@
try
{
client.disconnect();
+ NIOConnector connector = ConnectorRegistry.removeConnector(serverLocator);
+ if (connector != null)
+ connector.disconnect();
}
catch (Throwable ignore)
{
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -138,7 +138,7 @@
loadBalancingFactory, strictTck);
connectorManager.registerConnector(getName());
- log.info(serverLocator + " has lease disabled");
+ log.info("Server locator is " + serverLocator);
log.info(this + " started");
}
catch (Throwable t)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Client.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Client.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -64,8 +64,12 @@
public boolean disconnect() throws Exception
{
+ if (session == null)
+ {
+ return false;
+ }
session = null;
- return connector.disconnect();
+ return true;
}
public String getSessionID()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectorRegistry.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -9,9 +9,12 @@
import static org.jboss.messaging.core.remoting.TransportType.INVM;
import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.INVMConnector;
import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
import org.jboss.messaging.core.remoting.impl.mina.MinaService;
@@ -37,48 +40,150 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ConnectorRegistry.class);
+
// Attributes ----------------------------------------------------
+ private static Set<ServerLocator> locators = new HashSet<ServerLocator>();
+
+ private static Map<ServerLocator, NIOConnectorHolder> connectors = new HashMap<ServerLocator, NIOConnectorHolder>();
+
// Static --------------------------------------------------------
- private static Set<ServerLocator> locators = new HashSet<ServerLocator>();
-
+ /**
+ * @return <code>true</code> if this locator has not already been registered,
+ * <code>false</code> else
+ */
public static boolean register(ServerLocator locator)
{
return locators.add(locator);
}
- public static boolean unregister(ServerLocator locator)
+ /**
+ * @return <code>true</code> if this locator was registered,
+ * <code>false</code> else
+ */ public static boolean unregister(ServerLocator locator)
{
return locators.remove(locator);
}
- public static NIOConnector get(ServerLocator locator)
+ public static synchronized NIOConnector getConnector(ServerLocator locator)
{
assert locator != null;
+ if (connectors.containsKey(locator))
+ {
+ NIOConnectorHolder holder = connectors.get(locator);
+ holder.increment();
+ NIOConnector connector = holder.getConnector();
+
+ if (log.isDebugEnabled())
+ log.debug("Reuse " + connector.getServerURI() + " to connect to "
+ + locator + " [count=" + holder.getCount() + "]");
+
+ return connector;
+ }
+
// check if the server is in the same vm than the client
if (locators.contains(locator))
{
- return new INVMConnector(locator.getHost(), locator.getPort());
+ NIOConnector connector = new INVMConnector(locator.getHost(), locator
+ .getPort());
+
+ if (log.isDebugEnabled())
+ log.debug("Created " + connector.getServerURI() + " to connect to "
+ + locator);
+
+ NIOConnectorHolder holder = new NIOConnectorHolder(connector);
+ connectors.put(locator, holder);
+ return connector;
}
+ NIOConnector connector = null;
+
TransportType transport = locator.getTransport();
if (transport == TCP)
{
- return new MinaConnector(locator.getTransport(), locator.getHost(),
- locator.getPort());
+ connector = new MinaConnector(locator.getTransport(), locator
+ .getHost(), locator.getPort());
} else if (transport == INVM)
{
- return new INVMConnector(locator.getHost(), locator.getPort());
- } else
+ connector = new INVMConnector(locator.getHost(), locator.getPort());
+ }
+
+ if (connector == null)
{
throw new IllegalArgumentException(
"no connector defined for transport " + transport);
}
+
+ if (log.isDebugEnabled())
+ log.debug("Created " + connector.getServerURI() + " to connect to "
+ + locator);
+ NIOConnectorHolder holder = new NIOConnectorHolder(connector);
+ connectors.put(locator, holder);
+ return connector;
}
+ /**
+ * Decrement the number of references on the NIOConnector corresponding to
+ * the locator.
+ *
+ * If there is only one reference, remove it from the connectors Map and
+ * returns it. Otherwise return null.
+ *
+ * @param locator
+ * a ServerLocator
+ * @return the NIOConnector if there is no longer any references to it or
+ * <code>null</code>
+ * @throws IllegalStateException
+ * if no NIOConnector were created for the given locator
+ */
+ public synchronized static NIOConnector removeConnector(ServerLocator locator)
+ {
+ assert locator != null;
+
+ NIOConnectorHolder holder = connectors.get(locator);
+ if (holder == null)
+ {
+ throw new IllegalStateException("No Connector were created for "
+ + locator);
+ }
+
+ if (holder.getCount() == 1)
+ {
+ if (log.isDebugEnabled())
+ log.debug("Removed connector for " + locator);
+ connectors.remove(locator);
+ return holder.getConnector();
+ } else
+ {
+ holder.decrement();
+ if (log.isDebugEnabled())
+ log.debug(holder.getCount() + " remaining reference to "
+ + holder.getConnector().getServerURI() + " to " + locator);
+ return null;
+ }
+ }
+
+ public static ServerLocator[] getRegisteredLocators()
+ {
+ Set<ServerLocator> registeredLocators = connectors.keySet();
+ return (ServerLocator[]) registeredLocators
+ .toArray(new ServerLocator[registeredLocators.size()]);
+ }
+
+ public static Object getConnectorCount(ServerLocator locator)
+ {
+ NIOConnectorHolder holder = connectors.get(locator);
+ if (holder == null)
+ {
+ return 0;
+ }
+ return holder.getCount();
+ }
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@@ -90,4 +195,42 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+ static class NIOConnectorHolder
+ {
+ private final NIOConnector connector;
+ private int count;
+
+ public NIOConnectorHolder(NIOConnector connector)
+ {
+ assert connector != null;
+
+ this.connector = connector;
+ this.count = 1;
+ }
+
+ void increment()
+ {
+ assert count > 0;
+
+ count++;
+ }
+
+ void decrement()
+ {
+ count--;
+
+ assert count > 0;
+ }
+
+ int getCount()
+ {
+ return count;
+ }
+
+ public NIOConnector getConnector()
+ {
+ return connector;
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/ServerLocator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ServerLocator.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ServerLocator.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -124,7 +124,7 @@
@Override
public String toString()
{
- return "RemoteServiceLocator[uri=" + getURI() + "]";
+ return "ServerLocator[uri=" + getURI() + "]";
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -91,11 +91,10 @@
connector.getSessionConfig().setReuseAddress(true);
}
-
// NIOConnector implementation -----------------------------------
- public NIOSession connect() throws IOException {
-
+ public NIOSession connect() throws IOException
+ {
InetSocketAddress address = new InetSocketAddress(host, port);
ConnectFuture future = connector.connect(address);
connector.setDefaultRemoteAddress(address);
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ClientTest.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ClientTest.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -50,19 +50,15 @@
NIOSession session1 = createStrictMock(NIOSession.class);
NIOSession session2 = createStrictMock(NIOSession.class);
- expect(connector.connect()).andReturn(session1);
- expect(connector.disconnect()).andReturn(true);
-
- expect(connector.connect()).andReturn(session2);
+ expect(connector.connect()).andReturn(session1).andReturn(session2);
+ expect(session1.isConnected()).andReturn(true);
expect(session2.isConnected()).andReturn(true);
- expect(connector.disconnect()).andReturn(true);
- expect(connector.disconnect()).andReturn(false);
-
replay(connector, session1, session2);
Client client = new Client(connector, serverLocator);
client.connect();
+ assertTrue(client.isConnected());
assertTrue(client.disconnect());
assertFalse(client.isConnected());
@@ -106,7 +102,6 @@
expect(connector.connect()).andReturn(session);
expect(session.isConnected()).andReturn(true);
expect(session.getID()).andReturn(sessionID);
- expect(connector.disconnect()).andReturn(true);
replay(connector, session);
@@ -133,7 +128,6 @@
expect(connector.getServerURI()).andReturn(null);
expect(connector.connect()).andReturn(session);
expect(connector.getServerURI()).andReturn("tcp://localhost:" + PORT);
- expect(connector.disconnect()).andReturn(true);
expect(connector.getServerURI()).andReturn(null);
// no expectation for the session
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java 2007-12-17 09:35:34 UTC (rev 3510)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/ConnectorRegistryTest.java 2007-12-17 09:45:46 UTC (rev 3511)
@@ -33,6 +33,18 @@
// Public --------------------------------------------------------
+ @Override
+ protected void setUp() throws Exception
+ {
+ assertEquals(0, ConnectorRegistry.getRegisteredLocators().length);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, ConnectorRegistry.getRegisteredLocators().length);
+ }
+
public void testLocatorRegistration() throws Exception
{
ServerLocator locator = new ServerLocator(TCP, "localhost", PORT);
@@ -54,11 +66,13 @@
// locator is registered -> client and server are in the same vm
assertTrue(ConnectorRegistry.register(locator));
- NIOConnector connector = ConnectorRegistry.get(locator);
+ NIOConnector connector = ConnectorRegistry.getConnector(locator);
assertTrue(connector.getServerURI().startsWith(INVM.toString()));
assertTrue(ConnectorRegistry.unregister(locator));
+
+ assertNotNull(ConnectorRegistry.removeConnector(locator));
}
@@ -68,12 +82,62 @@
// locator is not registered -> client and server are not in the same vm
- NIOConnector connector = ConnectorRegistry.get(locator);
+ NIOConnector connector = ConnectorRegistry.getConnector(locator);
assertNotNull(connector);
assertEquals(locator.getURI(), connector.getServerURI());
+
+ assertNotNull(ConnectorRegistry.removeConnector(locator));
}
+
+ public void testConnectorCount() throws Exception
+ {
+ ServerLocator locator = new ServerLocator(TCP, "localhost", PORT);
+ assertEquals(0, ConnectorRegistry.getConnectorCount(locator));
+ NIOConnector connector1 = ConnectorRegistry.getConnector(locator);
+ assertEquals(1, ConnectorRegistry.getConnectorCount(locator));
+
+ NIOConnector connector2 = ConnectorRegistry.getConnector(locator);
+ assertEquals(2, ConnectorRegistry.getConnectorCount(locator));
+
+ assertSame(connector1, connector2);
+
+ assertNull(ConnectorRegistry.removeConnector(locator));
+ assertEquals(1, ConnectorRegistry.getConnectorCount(locator));
+
+ NIOConnector connector3 = ConnectorRegistry.getConnector(locator);
+ assertEquals(2, ConnectorRegistry.getConnectorCount(locator));
+
+ assertSame(connector1, connector3);
+
+ assertNull(ConnectorRegistry.removeConnector(locator));
+ assertNotNull(ConnectorRegistry.removeConnector(locator));
+ assertEquals(0, ConnectorRegistry.getConnectorCount(locator));
+ }
+
+ public void testConnectorCount_2() throws Exception
+ {
+ ServerLocator locator1 = new ServerLocator(TCP, "localhost", PORT);
+ ServerLocator locator2 = new ServerLocator(TCP, "127.0.0.1", PORT);
+
+ assertNotSame(locator1, locator2);
+
+ assertEquals(0, ConnectorRegistry.getConnectorCount(locator1));
+ assertEquals(0, ConnectorRegistry.getConnectorCount(locator2));
+
+ NIOConnector connector1 = ConnectorRegistry.getConnector(locator1);
+ assertEquals(1, ConnectorRegistry.getConnectorCount(locator1));
+
+ NIOConnector connector2 = ConnectorRegistry.getConnector(locator2);
+ assertEquals(1, ConnectorRegistry.getConnectorCount(locator2));
+
+ assertNotSame(connector1, connector2);
+
+ assertNotNull(ConnectorRegistry.removeConnector(locator1));
+ assertNotNull(ConnectorRegistry.removeConnector(locator2));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list