Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 02:30:24 -0400 (Wed, 20 Jul 2011)
New Revision: 11007
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fixing OneWayClusterTest
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-20
06:02:09 UTC (rev 11006)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-20
06:30:24 UTC (rev 11007)
@@ -45,6 +45,7 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -156,6 +157,8 @@
private final List<Interceptor> interceptors = new
CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
+
+ private static Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
@@ -168,14 +171,14 @@
private boolean backup;
private final Exception e = new Exception();
-
+
// To be called when there are ServerLocator being finalized.
// To be used on test assertions
public static Runnable finalizeCallback = null;
-
+
public static synchronized void clearThreadPools()
{
-
+
if (globalThreadPool != null)
{
globalThreadPool.shutdown();
@@ -194,7 +197,7 @@
globalThreadPool = null;
}
}
-
+
if (globalScheduledThreadPool != null)
{
globalScheduledThreadPool.shutdown();
@@ -471,6 +474,8 @@
public void start(Executor executor) throws Exception
{
initialise();
+
+ this.startExecutor = executor;
executor.execute(new Runnable()
{
@@ -1106,7 +1111,7 @@
{
staticConnector.disconnect();
}
-
+
Set<ClientSessionFactory> clonedFactory = new
HashSet<ClientSessionFactory>(factories);
for (ClientSessionFactory factory : clonedFactory)
@@ -1231,7 +1236,7 @@
// Notify if waiting on getting topology
notify();
}
-
+
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@@ -1297,7 +1302,7 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.add(listener);
- if(topology.members() > 0)
+ if (topology.members() > 0)
{
log.debug("ServerLocatorImpl.addClusterTopologyListener");
}
@@ -1360,37 +1365,57 @@
try
{
-
+
int retryNumber = 0;
while (csf == null && !ServerLocatorImpl.this.closed &&
!ServerLocatorImpl.this.closing)
{
- retryNumber ++;
+ retryNumber++;
for (Connector conn : connectors)
{
- if (log.isDebugEnabled())
- {
- log.debug("Submitting connect towards " + conn);
- }
-
- csf = conn.tryConnect();
-
- if (csf != null)
- {
- return csf;
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Submitting connect towards " + conn);
+ }
+
+ csf = conn.tryConnect();
+
+ if (csf != null)
+ {
+ csf.getConnection().addFailureListener(new FailureListener()
+ {
+ // Case the node where we were connected is gone, we need to
restart the connection
+ public void connectionFailed(HornetQException exception, boolean
failedOver)
+ {
+ if (exception.getCode() == HornetQException.DISCONNECTED)
+ {
+ try
+ {
+ ServerLocatorImpl.this.start(startExecutor);
+ }
+ catch (Exception e)
+ {
+ // There isn't much to be done if this happens here
+ log.warn(e.getMessage());
+ }
+ }
+ }
+ });
+
+ return csf;
+ }
}
-
- if (initialConnectAttempts >=0 && retryNumber >
initialConnectAttempts)
+
+ if (initialConnectAttempts >= 0 && retryNumber >
initialConnectAttempts)
{
break;
}
-
+
if (!closed && !closing)
{
- Thread.sleep (retryInterval);
+ Thread.sleep(retryInterval);
}
}
-
+
}
catch (Exception e)
{
@@ -1444,7 +1469,7 @@
System.identityHashCode(this));
log.warn("The ServerLocator you didn't close was created
here:", e);
-
+
if (ServerLocatorImpl.finalizeCallback != null)
{
ServerLocatorImpl.finalizeCallback.run();
@@ -1456,7 +1481,7 @@
super.finalize();
}
- class Connector
+ class Connector
{
private TransportConfiguration initialConnector;
@@ -1510,9 +1535,7 @@
{
return "Connector [initialConnector=" + initialConnector +
"]";
}
-
-
-
+
}
}
}