[hornetq-commits] JBoss hornetq SVN: r12303 - in trunk: hornetq-core/src/main/java/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 15 05:56:55 EDT 2012


Author: borges
Date: 2012-03-15 05:56:54 -0400 (Thu, 15 Mar 2012)
New Revision: 12303

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
Log:
HORNETQ-720 HORNETQ-776 Limit Backup's reconnect attempts before and after quorum-vote.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2012-03-15 09:56:54 UTC (rev 12303)
@@ -585,7 +585,7 @@
          }
 
          // We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
-         callFailureListeners(me, false, false);
+         callSessionFailureListeners(me, false, false);
 
          // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
          // There are either no threads executing in createSession, or one is blocking on a createSession
@@ -667,6 +667,7 @@
                }
                catch (Exception ignore)
                {
+                  // no-op
                }
             }
 
@@ -698,14 +699,14 @@
             {
                sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
             }
-            callFailureListeners(me, true, false);
+            callSessionFailureListeners(me, true, false);
          }
       }
 
       // This needs to be outside the failover lock to prevent deadlock
       if (connection != null)
       {
-         callFailureListeners(me, true, true);
+         callSessionFailureListeners(me, true, true);
       }
       if (sessionsToClose != null)
       {
@@ -908,7 +909,8 @@
                                       + "Please inform this condition to the HornetQ team");
    }
 
-   private void callFailureListeners(final HornetQException me, final boolean afterReconnect, final boolean failedOver)
+   private void callSessionFailureListeners(final HornetQException me, final boolean afterReconnect,
+                                            final boolean failedOver)
    {
       final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners);
 
@@ -966,7 +968,7 @@
 
       for (FailureListener listener : oldListeners)
       {
-         // Add all apart from the first one which is the old DelegatingFailureListener
+         // Add all apart from the old DelegatingFailureListener
          if (listener instanceof DelegatingFailureListener == false)
          {
             newListeners.add(listener);
@@ -1717,9 +1719,7 @@
              "]";
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.client.impl.ClientSessionFactoryInternal#setReconnectAttempts(int)
-    */
+   @Override
    public void setReconnectAttempts(final int attempts)
    {
       reconnectAttempts = attempts;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-15 09:56:54 UTC (rev 12303)
@@ -42,10 +42,10 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
@@ -2109,7 +2109,7 @@
 
             final TransportConfiguration tp = configuration.getConnectorConfigurations().get(liveConnectorName);
             serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tp);
-            quorumManager = new QuorumManager(serverLocator0, threadPool, getIdentity());
+            quorumManager = new QuorumManager(HornetQServerImpl.this, serverLocator0, threadPool, getIdentity());
             replicationEndpoint.setQuorumManager(quorumManager);
 
             serverLocator0.setReconnectAttempts(-1);
@@ -2121,11 +2121,14 @@
                {
                   try
                   {
-                     final ClientSessionFactory liveServerSessionFactory = serverLocator0.connect();
+                     final ClientSessionFactoryInternal liveServerSessionFactory = serverLocator0.connect();
                      if (liveServerSessionFactory == null)
                      {
                         throw new RuntimeException("Could not estabilish the connection");
                      }
+
+                     liveServerSessionFactory.setReconnectAttempts(1);
+                     quorumManager.setSessionFactory(liveServerSessionFactory);
                      CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
                      liveConnection.addFailureListener(quorumManager);
                      Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
@@ -2162,6 +2165,10 @@
             // we must remember to close stuff we don't need any more
             if (failedToConnect)
                   return;
+            /**
+             * Wait for a shutdown order or for the live to fail. All the action happens inside
+             * {@link QuorumManager}
+             */
             QuorumManager.BACKUP_ACTIVATION signal = quorumManager.waitForStatusChange();
 
             serverLocator0.close();

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-03-15 09:56:54 UTC (rev 12303)
@@ -1,10 +1,7 @@
 package org.hornetq.core.server.impl;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -16,9 +13,13 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.HornetQServer;
 
 /**
  * Manages a quorum of servers used to determine whether a given server is running or not.
@@ -31,24 +32,26 @@
 {
    private static final Logger log = Logger.getLogger(QuorumManager.class);
    private String targetServerID = "";
-   private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
-            new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
-
    private final ExecutorService executor;
    private final String serverIdentity;
    private final CountDownLatch latch;
    private volatile BACKUP_ACTIVATION signal;
+   private ClientSessionFactoryInternal sessionFactory;
+   private final Topology topology;
+   private final HornetQServer backupServer;
 
    /** safety parameter to make _sure_ we get out of await() */
-   private static final int LATCH_TIMEOUT = 60;
+   private static final int LATCH_TIMEOUT = 30;
    private static final long DISCOVERY_TIMEOUT = 5;
+   private static final int RECONNECT_ATTEMPTS = 5;
 
-   public QuorumManager(ServerLocator serverLocator, ExecutorService executor, String identity)
+   public QuorumManager(HornetQServer backup, ServerLocator serverLocator, ExecutorService executor, String identity)
    {
       this.serverIdentity = identity;
       this.executor = executor;
       this.latch = new CountDownLatch(1);
-      // locator.addClusterTopologyListener(this);
+      this.backupServer = backup;
+      topology = serverLocator.getTopology();
    }
 
    public void setLiveID(String liveID)
@@ -56,46 +59,54 @@
       targetServerID = liveID;
    }
 
-   public boolean isNodeDown()
+   public boolean isLiveDown()
    {
-      if (nodes.size() == 0)
+      Collection<TopologyMember> nodes = topology.getMembers();
+
+      if (nodes.size() < 2) // the life server is also in the list
       {
          return true;
       }
 
       final int size = nodes.size();
-      Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
+      Collection<ServerLocator> locatorsList = new LinkedList<ServerLocator>();
       AtomicInteger pingCount = new AtomicInteger(0);
       final CountDownLatch latch = new CountDownLatch(size);
+      int total = 0;
       try
       {
-         for (Entry<String, Pair<TransportConfiguration, TransportConfiguration>> pair : nodes.entrySet())
+         for (TopologyMember tm : nodes)
          {
-            if (targetServerID.equals(pair.getKey()))
+            Pair<TransportConfiguration, TransportConfiguration> pair = tm.getConnector();
+//            if (targetServerID.equals(pair.getKey()))
+//               continue;
+            TransportConfiguration serverTC = pair.getA();
+            if (serverTC == null)
+            {
+               latch.countDown();
                continue;
-            TransportConfiguration serverTC = pair.getValue().getA();
+            }
+            total++;
             ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
             locatorsList.add(locator);
             executor.submit(new ServerConnect(latch, pingCount, locator));
          }
-         // Some servers may have disappeared between the latch creation
-         for (int i = 0; i < size - locatorsList.size(); i++)
-         {
-            latch.countDown();
-         }
+
          try
          {
             latch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
          }
          catch (InterruptedException interruption)
          {
-            // No-op. As the best the quorum can do now is to return the latest number it has
+            // No-op. The best the quorum can do now is to return the latest number it has
          }
-         return pingCount.get() * 2 >= locatorsList.size();
+         // -1: because the live server is not being filtered out.
+         return pingCount.get() * 2 >= locatorsList.size() - 1;
       }
       finally
       {
-         for (ServerLocator locator: locatorsList){
+         for (ServerLocator locator : locatorsList)
+         {
             try
             {
                locator.close();
@@ -160,25 +171,65 @@
    @Override
    public void connectionFailed(HornetQException exception, boolean failedOver)
    {
-      // connection failed,
-      if (exception.getCode() == HornetQException.DISCONNECTED)
+      // Check if connection was reestablished by the sessionFactory:
+      if (sessionFactory.numConnections() > 0)
       {
-         log.info("Connection failed: " + exception);
-         signal = BACKUP_ACTIVATION.FAIL_OVER;
-         latch.countDown();
+         resetReplication();
          return;
       }
-      throw new RuntimeException(exception);
+      if (!isLiveDown())
+      {
+         try
+         {
+            // no point in repeating all the reconnection logic
+            sessionFactory.connect(RECONNECT_ATTEMPTS, false);
+            resetReplication();
+            return;
+         }
+         catch (HornetQException e)
+         {
+            if (e.getCode() != HornetQException.NOT_CONNECTED)
+               log.warn("Unexpected exception while trying to reconnect", e);
+         }
+      }
 
-      // by the time it got here, the connection might have been re-established
-      // check for it...
-      // if connectionIsOk:
-      // replicationEndPoint must see how up-to-date it is
-      // If not:
-      // 1. take a vote
-      // if vote result is weird... retry connecting
+      // live is assumed to be down, backup fails-over
+      signal = BACKUP_ACTIVATION.FAIL_OVER;
+      latch.countDown();
    }
 
+   /**
+    *
+    */
+   private void resetReplication()
+   {
+      new Thread(new ServerRestart(backupServer)).run();
+   }
+
+   private static final class ServerRestart implements Runnable
+   {
+      final HornetQServer backup;
+
+      public ServerRestart(HornetQServer backup)
+      {
+         this.backup = backup;
+      }
+
+      @Override
+      public void run()
+      {
+         try
+         {
+            backup.stop();
+            backup.start();
+         }
+         catch (Exception e)
+         {
+            log.error("Error while restarting the backup server: " + backup, e);
+         }
+      }
+
+   }
    enum BACKUP_ACTIVATION
    {
       FAIL_OVER, STOP;
@@ -212,11 +263,19 @@
    }
 
    /**
-    * Releases the semaphore, causing the Activation thread to fail-over.
+    * Releases the latch, causing the backup activation thread to fail-over.
     */
    public synchronized void failOver()
    {
       signal = BACKUP_ACTIVATION.FAIL_OVER;
       latch.countDown();
    }
+
+   /**
+    * @param sessionFactory the session factory used to connect to the live server
+    */
+   public void setSessionFactory(ClientSessionFactoryInternal sessionFactory)
+   {
+      this.sessionFactory = sessionFactory;
+   }
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java	2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java	2012-03-15 09:56:54 UTC (rev 12303)
@@ -7,6 +7,8 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
 
 public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest
 {
@@ -17,8 +19,7 @@
       setupCluster();
 
       startServers(0, 1, 2);
-      // BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1],
-      // PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+      BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
       startServers(3, 4, 5);
 
       for (int i : liveServerIDs)



More information about the hornetq-commits mailing list