[hornetq-commits] JBoss hornetq SVN: r9566 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Aug 19 11:10:14 EDT 2010
Author: jmesnil
Date: 2010-08-19 11:10:14 -0400 (Thu, 19 Aug 2010)
New Revision: 9566
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
HA refactoring
* add thread to reconnect to a node when it is detected as down (static connector case)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-19 15:10:14 UTC (rev 9566)
@@ -1116,35 +1116,67 @@
closed = true;
}
- public void notifyNodeDown(final String nodeID)
+ public synchronized void notifyNodeDown(final String nodeID)
{
boolean removed = false;
- synchronized (this)
+
+ if (!ha)
{
- if (!ha)
+ return;
+ }
+
+ final TopologyMember member = topology.getMember(nodeID);
+ removed = topology.removeMember(nodeID);
+
+ if (!topology.isEmpty())
+ {
+ updateArraysAndPairs();
+
+ if (topology.size() == 1 && topology.getMember(nodeID) != null)
{
- return;
+ receivedTopology = false;
}
+ }
+ else
+ {
+ pairs.clear();
- removed = topology.removeMember(nodeID);
-
- if (!topology.isEmpty())
+ topologyArray = null;
+
+ receivedTopology = false;
+ }
+
+ if (ha && discoveryAddress == null && removed)
+ {
+ threadPool.execute(new Runnable()
{
- updateArraysAndPairs();
-
- if (topology.size() == 1 && topology.getMember(nodeID) != null)
+ public void run()
{
- receivedTopology = false;
+ System.out.println(ServerLocatorImpl.this.nodeID + " will try to connect to " + nodeID);
+ ClientSessionFactory sf = null;
+ do
+ {
+ try
+ {
+ Pair<TransportConfiguration,TransportConfiguration> pair = member.getConnector();
+ TransportConfiguration tc = (pair.a != null) ? pair.a : pair.b;
+ sf = createSessionFactory(tc);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ continue;
+ }
+ }
+ catch (Exception e)
+ {
+ break;
+ }
+ }
+ while (sf == null);
}
- }
- else
- {
- pairs.clear();
-
- topologyArray = null;
-
- receivedTopology = false;
- }
+ });
}
if (removed)
@@ -1213,7 +1245,7 @@
this.initialConnectors[count++] = entry.getConnector();
}
- if (clusterConnection && !receivedTopology)
+ if (ha && clusterConnection && !receivedTopology)
{
// FIXME the node is alone in the cluster. We create a connection to the new node
// to trigger the node notification to form the cluster.
More information about the hornetq-commits
mailing list