[hornetq-commits] JBoss hornetq SVN: r9566 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 19 11:10:14 EDT 2010


Author: jmesnil
Date: 2010-08-19 11:10:14 -0400 (Thu, 19 Aug 2010)
New Revision: 9566

Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
HA refactoring

* add thread to reconnect to a node when it is detected as down (static connector case)

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-19 11:42:35 UTC (rev 9565)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-19 15:10:14 UTC (rev 9566)
@@ -1116,35 +1116,67 @@
       closed = true;
    }
 
-   public void notifyNodeDown(final String nodeID)
+   public synchronized void notifyNodeDown(final String nodeID)
    {
       boolean removed = false;
-      synchronized (this)
+
+      if (!ha)
       {
-         if (!ha)
+         return;
+      }
+
+      final TopologyMember member = topology.getMember(nodeID);
+      removed = topology.removeMember(nodeID);
+
+      if (!topology.isEmpty())
+      {
+         updateArraysAndPairs();
+
+         if (topology.size() == 1 && topology.getMember(nodeID) != null)
          {
-            return;
+            receivedTopology = false;
          }
+      }
+      else
+      {
+         pairs.clear();
 
-         removed = topology.removeMember(nodeID);
-         
-         if (!topology.isEmpty())
+         topologyArray = null;
+
+         receivedTopology = false;
+      }
+
+      if (ha && discoveryAddress == null && removed)
+      {
+         threadPool.execute(new Runnable()
          {
-            updateArraysAndPairs();
-            
-            if (topology.size() == 1 && topology.getMember(nodeID) != null)
+            public void run()
             {
-               receivedTopology = false;
+               System.out.println(ServerLocatorImpl.this.nodeID + " will try to connect to " + nodeID);
+               ClientSessionFactory sf = null;
+               do
+               {
+                  try
+                  {
+                     Pair<TransportConfiguration,TransportConfiguration> pair = member.getConnector();
+                     TransportConfiguration tc = (pair.a != null) ? pair.a : pair.b;
+                     sf = createSessionFactory(tc);
+                  }
+                  catch (HornetQException e)
+                  {
+                     if (e.getCode() == HornetQException.NOT_CONNECTED)
+                     {
+                        continue;
+                     }
+                  }
+                  catch (Exception e)
+                  {
+                     break;
+                  }
+               }
+               while (sf == null);
             }
-         }
-         else
-         {
-            pairs.clear();
-
-            topologyArray = null;
-
-            receivedTopology = false;
-         }
+         });
       }
 
       if (removed)
@@ -1213,7 +1245,7 @@
          this.initialConnectors[count++] = entry.getConnector();
       }
       
-      if (clusterConnection && !receivedTopology)
+      if (ha && clusterConnection && !receivedTopology)
       {
          // FIXME the node is alone in the cluster. We create a connection to the new node
          // to trigger the node notification to form the cluster.



More information about the hornetq-commits mailing list