[hornetq-commits] JBoss hornetq SVN: r11007 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 20 02:30:24 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-07-20 02:30:24 -0400 (Wed, 20 Jul 2011)
New Revision: 11007

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fixing OneWayClusterTest

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-20 06:02:09 UTC (rev 11006)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-20 06:30:24 UTC (rev 11007)
@@ -45,6 +45,7 @@
 import org.hornetq.core.cluster.DiscoveryListener;
 import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -156,6 +157,8 @@
    private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
 
    private static ExecutorService globalThreadPool;
+   
+   private static Executor startExecutor;
 
    private static ScheduledExecutorService globalScheduledThreadPool;
 
@@ -168,14 +171,14 @@
    private boolean backup;
 
    private final Exception e = new Exception();
-   
+
    // To be called when there are ServerLocator being finalized.
    // To be used on test assertions
    public static Runnable finalizeCallback = null;
-   
+
    public static synchronized void clearThreadPools()
    {
-      
+
       if (globalThreadPool != null)
       {
          globalThreadPool.shutdown();
@@ -194,7 +197,7 @@
             globalThreadPool = null;
          }
       }
-      
+
       if (globalScheduledThreadPool != null)
       {
          globalScheduledThreadPool.shutdown();
@@ -471,6 +474,8 @@
    public void start(Executor executor) throws Exception
    {
       initialise();
+      
+      this.startExecutor = executor;
 
       executor.execute(new Runnable()
       {
@@ -1106,7 +1111,7 @@
       {
          staticConnector.disconnect();
       }
-      
+
       Set<ClientSessionFactory> clonedFactory = new HashSet<ClientSessionFactory>(factories);
 
       for (ClientSessionFactory factory : clonedFactory)
@@ -1231,7 +1236,7 @@
       // Notify if waiting on getting topology
       notify();
    }
-   
+
    /* (non-Javadoc)
     * @see java.lang.Object#toString()
     */
@@ -1297,7 +1302,7 @@
    public void addClusterTopologyListener(final ClusterTopologyListener listener)
    {
       topologyListeners.add(listener);
-      if(topology.members() > 0)
+      if (topology.members() > 0)
       {
          log.debug("ServerLocatorImpl.addClusterTopologyListener");
       }
@@ -1360,37 +1365,57 @@
 
          try
          {
-            
+
             int retryNumber = 0;
             while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
             {
-               retryNumber ++;
+               retryNumber++;
                for (Connector conn : connectors)
                {
-                   if (log.isDebugEnabled())
-                   {
-                      log.debug("Submitting connect towards " + conn);
-                   }
-                   
-                   csf = conn.tryConnect();
-                   
-                   if (csf != null)
-                   {
-                      return csf;
-                   }
+                  if (log.isDebugEnabled())
+                  {
+                     log.debug("Submitting connect towards " + conn);
+                  }
+
+                  csf = conn.tryConnect();
+
+                  if (csf != null)
+                  {
+                     csf.getConnection().addFailureListener(new FailureListener()
+                     {
+                        // Case the node where we were connected is gone, we need to restart the connection
+                        public void connectionFailed(HornetQException exception, boolean failedOver)
+                        {
+                           if (exception.getCode() == HornetQException.DISCONNECTED)
+                           {
+                              try
+                              {
+                                 ServerLocatorImpl.this.start(startExecutor);
+                              }
+                              catch (Exception e)
+                              {
+                                 // There isn't much to be done if this happens here
+                                 log.warn(e.getMessage());
+                              }
+                           }
+                        }
+                     });
+
+                     return csf;
+                  }
                }
-               
-               if (initialConnectAttempts >=0 && retryNumber > initialConnectAttempts)
+
+               if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts)
                {
                   break;
                }
-               
+
                if (!closed && !closing)
                {
-                  Thread.sleep (retryInterval);
+                  Thread.sleep(retryInterval);
                }
             }
-            
+
          }
          catch (Exception e)
          {
@@ -1444,7 +1469,7 @@
                      System.identityHashCode(this));
 
             log.warn("The ServerLocator you didn't close was created here:", e);
-            
+
             if (ServerLocatorImpl.finalizeCallback != null)
             {
                ServerLocatorImpl.finalizeCallback.run();
@@ -1456,7 +1481,7 @@
          super.finalize();
       }
 
-      class Connector 
+      class Connector
       {
          private TransportConfiguration initialConnector;
 
@@ -1510,9 +1535,7 @@
          {
             return "Connector [initialConnector=" + initialConnector + "]";
          }
-         
-         
-         
+
       }
    }
 }



More information about the hornetq-commits mailing list