Author: borges
Date: 2011-10-26 08:14:51 -0400 (Wed, 26 Oct 2011)
New Revision: 11604
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Avoid locking the locator to close it, (ii) track factories at static connector with
connectingFactory,
(iii) add 'state' field and remove all other fields used to track state.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26
12:05:13 UTC (rev 11603)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-26
12:14:51 UTC (rev 11604)
@@ -59,6 +59,11 @@
{
private static final long serialVersionUID = -1615857864410205260L;
+ private enum STATE
+ {
+ INITIALIZED, CLOSING, CLOSED,
+ };
+
private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
private final boolean ha;
@@ -97,8 +102,6 @@
private ConnectionLoadBalancingPolicy loadBalancingPolicy;
- private boolean readOnly;
-
// Settable attributes:
private boolean cacheLargeMessagesClient;
@@ -155,10 +158,8 @@
private int initialMessagePacketSize;
- private volatile boolean closed;
+ private volatile STATE state;
- private volatile boolean closing;
-
private final List<Interceptor> interceptors = new
CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
@@ -332,10 +333,14 @@
private synchronized void initialise() throws HornetQException
{
- if (readOnly)
+ if (state == STATE.INITIALIZED)
{
return;
}
+ if (state == STATE.CLOSING)
+ {
+ throw new IllegalStateException("Cannot initialize 'closing'
locator");
+ }
try
{
setThreadPools();
@@ -369,7 +374,7 @@
discoveryGroup.start();
}
- readOnly = true;
+ state = STATE.INITIALIZED;
}
catch (Exception e)
{
@@ -550,7 +555,7 @@
}
catch (Exception e)
{
- if (!closing)
+ if (!isClosed())
{
log.warn("did not connect the cluster connection to other
nodes", e);
}
@@ -600,21 +605,20 @@
public boolean isClosed()
{
- return closed || closing;
+ return state == STATE.CLOSED || state == STATE.CLOSING;
}
public ClientSessionFactory createSessionFactory(final TransportConfiguration
transportConfiguration) throws Exception
{
assertOpen();
- initialise();
+ initialise();
synchronized (this)
{
assertOpen();
ClientSessionFactoryInternal factory =
- new ClientSessionFactoryImpl(this,
-
transportConfiguration,
+ new ClientSessionFactoryImpl(this, transportConfiguration,
callTimeout,
clientFailureCheckPeriod,
connectionTTL,
@@ -625,17 +629,36 @@
threadPool,
scheduledThreadPool,
interceptors);
+ addToConnecting(factory);
+ try
+ {
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ addFactory(factory);
+ return factory;
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
+ }
+ }
+
+ private void removeFromConnecting(ClientSessionFactoryInternal factory)
+ {
+ connectingFactories.remove(factory);
+ }
+
+ private void addToConnecting(ClientSessionFactoryInternal factory)
+ {
+ synchronized (connectingFactories)
+ {
+ assertOpen();
connectingFactories.add(factory);
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
- connectingFactories.remove(factory);
- addFactory(factory);
- return factory;
}
}
-
private void assertOpen()
{
- if (closed || closing)
+ if (state != null && state != STATE.INITIALIZED)
{
throw new IllegalStateException("Cannot create session factory, server
locator is closed (maybe it has been garbage collected)");
}
@@ -677,27 +700,24 @@
try
{
- factory = new ClientSessionFactoryImpl(this,
- tc,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- connectingFactories.add(factory);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
- connectingFactories.remove(factory);
+ factory =
+ new ClientSessionFactoryImpl(this, tc, callTimeout,
clientFailureCheckPeriod, connectionTTL,
+ retryInterval,
retryIntervalMultiplier, maxRetryInterval,
+ reconnectAttempts, threadPool,
scheduledThreadPool, interceptors);
+ addToConnecting(factory);
+ try
+ {
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
}
catch (HornetQException e)
{
if (factory != null)
{
- connectingFactories.remove(factory);
factory.close();
}
factory = null;
@@ -728,9 +748,7 @@
if (ha || clusterConnection)
{
long timeout = System.currentTimeMillis() + 30000;
- while (!ServerLocatorImpl.this.closed &&
!ServerLocatorImpl.this.closing &&
- !receivedTopology &&
- timeout > System.currentTimeMillis())
+ while (!isClosed() && !receivedTopology && timeout >
System.currentTimeMillis())
{
// Now wait for the topology
@@ -744,7 +762,7 @@
}
- if (System.currentTimeMillis() > timeout && !receivedTopology
&& !closed && !closing)
+ if (System.currentTimeMillis() > timeout && !receivedTopology)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster
topology. Group:" + discoveryGroup);
@@ -755,7 +773,6 @@
return factory;
}
-
}
public boolean isHA()
@@ -1108,7 +1125,7 @@
private void checkWrite()
{
- if (readOnly)
+ if (state == STATE.INITIALIZED)
{
throw new IllegalStateException("Cannot set attribute on SessionFactory
after it has been used");
}
@@ -1187,7 +1204,7 @@
protected void doClose(final boolean sendClose)
{
- if (closed)
+ if (state == STATE.CLOSED)
{
if (log.isDebugEnabled())
{
@@ -1201,7 +1218,7 @@
log.debug(this + " is calling close", new
Exception("trace"));
}
- closing = true;
+ state = STATE.CLOSING;
if (discoveryGroup != null)
{
@@ -1219,28 +1236,33 @@
staticConnector.disconnect();
}
- for (ClientSessionFactoryInternal factory : connectingFactories)
+ synchronized (connectingFactories)
{
- factory.causeExit();
- factory.close();
+ for (ClientSessionFactoryInternal factory : connectingFactories)
+ {
+ factory.causeExit();
+ factory.close();
+ }
+ connectingFactories.clear();
}
- synchronized (this)
+
+ synchronized (factories)
{
- Set<ClientSessionFactoryInternal> clonedFactory = new
HashSet<ClientSessionFactoryInternal>(factories);
+ Set<ClientSessionFactoryInternal> clonedFactory = new
HashSet<ClientSessionFactoryInternal>(factories);
- for (ClientSessionFactory factory : clonedFactory)
- {
- if (sendClose)
+ for (ClientSessionFactory factory : clonedFactory)
{
- factory.close();
+ if (sendClose)
+ {
+ factory.close();
+ }
+ else
+ {
+ factory.cleanup();
+ }
}
- else
- {
- factory.cleanup();
- }
- }
- factories.clear();
+ factories.clear();
}
if (shutdownPool)
@@ -1277,9 +1299,7 @@
}
}
}
- readOnly = false;
-
- closed = true;
+ state = STATE.CLOSED;
}
/** This is directly called when the connection to the node is gone,
@@ -1370,9 +1390,6 @@
}
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
@@ -1385,21 +1402,19 @@
discoveryGroupConfiguration +
"]";
}
- else
- {
- return "ServerLocatorImpl [initialConnectors=" +
Arrays.toString(initialConnectors) +
+ return "ServerLocatorImpl [initialConnectors=" +
Arrays.toString(initialConnectors) +
", discoveryGroupConfiguration=" +
discoveryGroupConfiguration +
"]";
- }
}
private synchronized void updateArraysAndPairs()
{
+ assertOpen();
Collection<TopologyMember> membersCopy = topology.getMembers();
- topologyArray = (Pair<TransportConfiguration,
TransportConfiguration>[])Array.newInstance(Pair.class,
-
membersCopy.size());
+ topologyArray =
+ (Pair<TransportConfiguration,
TransportConfiguration>[])Array.newInstance(Pair.class, membersCopy.size());
int count = 0;
for (TopologyMember pair : membersCopy)
@@ -1472,20 +1487,22 @@
topology.removeClusterTopologyListener(listener);
}
- public synchronized void addFactory(ClientSessionFactoryInternal factory)
+ private synchronized void addFactory(ClientSessionFactoryInternal factory)
{
if (factory == null)
{
return;
}
- if (closed || closing)
+ synchronized (factories)
{
- factory.close();
- return;
- }
+ if (isClosed())
+ {
+ factory.close();
+ return;
+ }
- TransportConfiguration backup = null;
+ TransportConfiguration backup = null;
if (topology != null)
{
@@ -1494,7 +1511,7 @@
factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
factories.add(factory);
-
+ }
}
final class StaticConnector implements Serializable
@@ -1505,7 +1522,7 @@
public ClientSessionFactory connect() throws HornetQException
{
- if (closed || closing)
+ if (isClosed())
{
throw new IllegalStateException("Cannot create session factory, server
locator is closed (maybe it has been garbage collected)");
}
@@ -1520,7 +1537,7 @@
{
int retryNumber = 0;
- while (csf == null && !ServerLocatorImpl.this.closed &&
!ServerLocatorImpl.this.closing)
+ while (csf == null && isClosed())
{
retryNumber++;
for (Connector conn : connectors)
@@ -1573,7 +1590,7 @@
break;
}
- if (!closed && !closing)
+ if (!isClosed())
{
Thread.sleep(retryInterval);
}
@@ -1586,7 +1603,7 @@
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to
connect to any static connectors", e);
}
- if (csf == null && !closed)
+ if (csf == null && !isClosed())
{
log.warn("Failed to connecto to any static connector, throwing exception
now");
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to
connect to any static connectors");
@@ -1647,7 +1664,7 @@
@Override
public void finalize() throws Throwable
{
- if (!closed && finalizeCheck)
+ if (state != STATE.CLOSED && finalizeCheck)
{
log.warn("I'm closing a core ServerLocator you left open. Please
make sure you close all ServerLocators explicitly " + "before letting them go
out of scope! " +
System.identityHashCode(this));
@@ -1692,7 +1709,16 @@
ClientSessionFactoryInternal factoryToUse = factory;
if (factoryToUse != null)
{
- factory.connect(1, false);
+ addToConnecting(factoryToUse);
+
+ try
+ {
+ factoryToUse.connect(1, false);
+ }
+ finally
+ {
+ removeFromConnecting(factoryToUse);
+ }
}
return factoryToUse;
}
@@ -1715,9 +1741,6 @@
}
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{