[hornetq-commits] JBoss hornetq SVN: r11600 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Oct 26 06:25:21 EDT 2011
Author: borges
Date: 2011-10-26 06:25:21 -0400 (Wed, 26 Oct 2011)
New Revision: 11600
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fix race & dead-locking between ServerLocatorImpl.close() and the creation of new SessionFactories.
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 10:25:06 UTC (rev 11599)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26 10:25:21 UTC (rev 11600)
@@ -57,6 +57,8 @@
*/
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
{
+ private enum STATE{ INITIALIZED, CLOSED, CLOSING};
+
private static final long serialVersionUID = -1615857864410205260L;
private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
@@ -69,7 +71,8 @@
private transient String identity;
- private Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+ private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+ private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
private TransportConfiguration[] initialConnectors;
@@ -154,10 +157,8 @@
private int initialMessagePacketSize;
- private volatile boolean closed;
+ private volatile STATE state;
- private volatile boolean closing;
-
private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
@@ -252,7 +253,7 @@
return globalScheduledThreadPool;
}
- private void setThreadPools()
+ private synchronized void setThreadPools()
{
if (threadPool != null)
{
@@ -329,10 +330,16 @@
});
}
- private synchronized void initialise() throws Exception
+ private synchronized void initialise() throws HornetQException
{
- if (!readOnly)
+ if (readOnly)
{
+ return;
+ }
+
+ try
+ {
+ state = STATE.INITIALIZED;
setThreadPools();
instantiateLoadBalancingPolicy();
@@ -366,6 +373,11 @@
readOnly = true;
}
+ catch (Exception e)
+ {
+ state = null;
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
}
private ServerLocatorImpl(final Topology topology,
@@ -515,14 +527,12 @@
return pair.getA();
}
- else
- {
- // Get from initialconnectors
+
+ // Get from initialconnectors
- int pos = loadBalancingPolicy.select(initialConnectors.length);
+ int pos = loadBalancingPolicy.select(initialConnectors.length);
- return initialConnectors[pos];
- }
+ return initialConnectors[pos];
}
public void start(Executor executor) throws Exception
@@ -541,7 +551,7 @@
}
catch (Exception e)
{
- if (!closing)
+ if (!isClosed())
{
log.warn("did not connect the cluster connection to other nodes", e);
}
@@ -565,19 +575,15 @@
public ClientSessionFactoryInternal connect() throws Exception
{
- ClientSessionFactoryInternal sf;
// static list of initial connectors
if (initialConnectors != null && discoveryGroup == null)
{
- sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ addFactory(sf);
+ return sf;
}
// wait for discovery group to get the list of initial connectors
- else
- {
- sf = (ClientSessionFactoryInternal)createSessionFactory();
- }
- addFactory(sf);
- return sf;
+ return (ClientSessionFactoryInternal)createSessionFactory();
}
/* (non-Javadoc)
@@ -593,27 +599,12 @@
return afterConnectListener;
}
- public boolean isClosed()
+ public synchronized ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
- return closed || closing;
- }
+ assertOpen();
- public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
+ initialise();
+
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
transportConfiguration,
callTimeout,
@@ -627,29 +618,39 @@
scheduledThreadPool,
interceptors);
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ addToConnecting(factory);
+ try
+ {
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ addFactory(factory);
+ return factory;
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
+ }
- addFactory(factory);
-
- return factory;
+ private void removeFromConnecting(ClientSessionFactoryInternal factory)
+ {
+ connectingFactories.remove(factory);
}
- public ClientSessionFactory createSessionFactory() throws Exception
+ private void addToConnecting(ClientSessionFactoryInternal factory)
{
- if (closed || closing)
+ synchronized (connectingFactories)
{
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ assertOpen();
+ connectingFactories.add(factory);
}
+ }
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ assertOpen();
+ initialise();
+
if (initialConnectors == null && discoveryGroup != null)
{
// Wait for an initial broadcast to give us at least one node in the cluster
@@ -691,7 +692,15 @@
threadPool,
scheduledThreadPool,
interceptors);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ try
+ {
+ addToConnecting(factory);
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
}
catch (HornetQException e)
{
@@ -723,10 +732,8 @@
if (ha || clusterConnection)
{
- long timeout = System.currentTimeMillis() + 30000;
- while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
- !receivedTopology &&
- timeout > System.currentTimeMillis())
+ final long timeout = System.currentTimeMillis() + 30000;
+ while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis())
{
// Now wait for the topology
@@ -740,7 +747,7 @@
}
- if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
+ if (System.currentTimeMillis() > timeout && !receivedTopology && !isClosed())
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
@@ -1184,7 +1191,7 @@
protected void doClose(final boolean sendClose)
{
- if (closed)
+ if (state == STATE.CLOSED)
{
if (log.isDebugEnabled())
{
@@ -1193,13 +1200,8 @@
return;
}
- if (log.isDebugEnabled())
- {
- log.debug(this + " is calling close", new Exception("trace"));
- }
+ state = STATE.CLOSING;
- closing = true;
-
if (discoveryGroup != null)
{
try
@@ -1215,9 +1217,21 @@
{
staticConnector.disconnect();
}
+
+ synchronized (connectingFactories)
+ {
+ for (ClientSessionFactoryInternal csf : connectingFactories)
+ {
+ csf.causeExit();
+ csf.close();
+ }
+ connectingFactories.clear();
+ }
+
+ synchronized (this)
+ {
+ Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
- Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
-
for (ClientSessionFactory factory : clonedFactory)
{
if (sendClose)
@@ -1268,7 +1282,8 @@
}
readOnly = false;
- closed = true;
+ state = STATE.CLOSED;
+ }
}
/** This is directly called when the connection to the node is gone,
@@ -1378,13 +1393,10 @@
discoveryGroupConfiguration +
"]";
}
- else
- {
- return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
- ", discoveryGroupConfiguration=" +
- discoveryGroupConfiguration +
- "]";
- }
+ return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
}
private synchronized void updateArraysAndPairs()
@@ -1467,18 +1479,27 @@
public synchronized void addFactory(ClientSessionFactoryInternal factory)
{
- if (factory != null)
+ if (factory == null)
{
- TransportConfiguration backup = null;
+ return;
+ }
- if (topology != null)
- {
- backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
- }
+ if (isClosed())
+ {
+ factory.causeExit();
+ factory.close();
+ return;
+ }
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
- factories.add(factory);
+ TransportConfiguration backup = null;
+
+ if (topology != null)
+ {
+ backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
}
+
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factories.add(factory);
}
class StaticConnector implements Serializable
@@ -1489,19 +1510,9 @@
public ClientSessionFactory connect() throws HornetQException
{
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
+ assertOpen();
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
+ initialise();
ClientSessionFactory csf = null;
@@ -1511,7 +1522,7 @@
{
int retryNumber = 0;
- while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
+ while (csf == null && !isClosed())
{
retryNumber++;
for (Connector conn : connectors)
@@ -1564,7 +1575,7 @@
break;
}
- if (!closed && !closing)
+ if (!isClosed())
{
Thread.sleep(retryInterval);
}
@@ -1577,7 +1588,7 @@
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
}
- if (csf == null && !closed)
+ if (csf == null && !isClosed())
{
log.warn("Failed to connecto to any static connector, throwing exception now");
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
@@ -1636,7 +1647,7 @@
public void finalize() throws Throwable
{
- if (!closed && finalizeCheck)
+ if (!isClosed() && finalizeCheck)
{
log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
System.identityHashCode(this));
@@ -1681,7 +1692,15 @@
ClientSessionFactoryInternal factoryToUse = factory;
if (factoryToUse != null)
{
- factory.connect(1, false);
+ try
+ {
+ addToConnecting(factoryToUse);
+ factoryToUse.connect(1, false);
+ }
+ finally
+ {
+ removeFromConnecting(factoryToUse);
+ }
}
return factoryToUse;
}
@@ -1704,9 +1723,6 @@
}
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
@@ -1715,4 +1731,17 @@
}
}
+
+ private void assertOpen()
+ {
+ if (state != null && state != STATE.INITIALIZED)
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+ }
+
+ public boolean isClosed()
+ {
+ return state != STATE.INITIALIZED;
+ }
}
More information about the hornetq-commits
mailing list