[hornetq-commits] JBoss hornetq SVN: r12296 - in trunk/hornetq-core/src/main/java/org/hornetq/core: remoting/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Mar 13 08:05:22 EDT 2012


Author: borges
Date: 2012-03-13 08:05:21 -0400 (Tue, 13 Mar 2012)
New Revision: 12296

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
Log:
HORNETQ-720 HORNETQ-776 Improve signaling and rely on explicit signal from the live to fail-over.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2012-03-13 12:05:21 UTC (rev 12296)
@@ -967,7 +967,6 @@
       for (FailureListener listener : oldListeners)
       {
          // Add all apart from the first one which is the old DelegatingFailureListener
-
          if (listener instanceof DelegatingFailureListener == false)
          {
             newListeners.add(listener);
@@ -1594,7 +1593,7 @@
       }
    }
 
-   private class DelegatingFailureListener implements FailureListener
+   private final class DelegatingFailureListener implements FailureListener
    {
       private final Object connectionID;
 
@@ -1607,6 +1606,13 @@
       {
          handleConnectionFailure(connectionID, me);
       }
+
+      @Override
+      public String toString()
+      {
+         return DelegatingFailureListener.class.getSimpleName() + "('reconnectsOrFailover', hash=" +
+                  super.hashCode() + ")";
+      }
    }
 
    private static final class ActualScheduledPinger implements Runnable

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2012-03-13 12:05:21 UTC (rev 12296)
@@ -1630,6 +1630,12 @@
                               }
                            }
                         }
+
+                        @Override
+                        public String toString()
+                        {
+                           return "FailureListener('restarts cluster connections')";
+                        }
                      });
 
                      if (log.isDebugEnabled())

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2012-03-13 12:05:21 UTC (rev 12296)
@@ -36,7 +36,6 @@
 import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
 import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptor;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.security.HornetQPrincipal;
@@ -102,7 +101,7 @@
 
    private final ClusterManager clusterManager;
 
-   private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+   private final Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
 
    // Static --------------------------------------------------------
 
@@ -464,7 +463,6 @@
 
             conn.connection.destroy();
          }
-
       }
    }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-13 12:05:21 UTC (rev 12296)
@@ -119,6 +119,7 @@
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.core.server.group.impl.LocalGroupingHandler;
 import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.impl.QuorumManager.BACKUP_ACTIVATION;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.core.server.management.impl.ManagementServiceImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
@@ -500,7 +501,6 @@
          if (replicationManager!=null) {
             replicationManager.sendLiveIsStopping();
          }
-
          connectorsService.stop();
 
          // we stop the groupingHandler before we stop the cluster manager so binding mappings
@@ -1903,9 +1903,10 @@
 
             if (nodeManager.isBackupLive())
             {
-               // looks like we've failed over at some point need to inform that we are the backup so when the current
-               // live
-               // goes down they failover to us
+               /*
+                * looks like we've failed over at some point need to inform that we are the backup
+                * so when the current live goes down they failover to us
+                */
                if (log.isDebugEnabled())
                {
                   log.debug("announcing backup to the former live" + this);
@@ -2087,8 +2088,8 @@
    private final class SharedNothingBackupActivation implements Activation
    {
       private ServerLocatorInternal serverLocator0;
-      private volatile boolean failedConnection;
-      private volatile boolean failOver;
+      private volatile boolean failedToConnect;
+      private QuorumManager quorumManager;
 
       public void run()
       {
@@ -2108,7 +2109,7 @@
 
             final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
             serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
-            final QuorumManager quorumManager = new QuorumManager(serverLocator0, threadPool);
+            quorumManager = new QuorumManager(serverLocator0, threadPool, getIdentity());
             replicationEndpoint.setQuorumManager(quorumManager);
 
             serverLocator0.setReconnectAttempts(-1);
@@ -2123,10 +2124,10 @@
                      final ClientSessionFactory liveServerSessionFactory = serverLocator0.connect();
                      if (liveServerSessionFactory == null)
                      {
-                        // XXX HORNETQ-768
-                        throw new RuntimeException("Need to retry?");
+                        throw new RuntimeException("Could not estabilish the connection");
                      }
                      CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
+                     liveConnection.addFailureListener(quorumManager);
                      Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
                      Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
                      connectToReplicationEndpoint(replicationChannel);
@@ -2136,14 +2137,13 @@
                   catch (Exception e)
                   {
                      log.warn("Unable to announce backup for replication. Trying to stop the server.", e);
-                     failedConnection = true;
+                     failedToConnect = true;
+
+                     quorumManager.causeExit();
                      try
                      {
-                        synchronized (quorumManager)
-                        {
-                           quorumManager.notify();
-                        }
-                        HornetQServerImpl.this.stop();
+                        if (!stopped)
+                           HornetQServerImpl.this.stop();
                         return;
                      }
                      catch (Exception e1)
@@ -2160,31 +2160,18 @@
 
             // Server node (i.e. Live node) is not running, now the backup takes over.
             // we must remember to close stuff we don't need any more
-            synchronized (quorumManager)
-            {
-               if (failedConnection)
+            if (failedToConnect)
                   return;
-               while (true)
-               {
-                  quorumManager.wait();
-                  if (failOver || !started || quorumManager.isNodeDown())
-                  {
-                     break;
-                  }
-               }
-            }
+            QuorumManager.BACKUP_ACTIVATION signal = quorumManager.waitForStatusChange();
 
             serverLocator0.close();
             replicationEndpoint.stop();
 
-            if (failedConnection)
+            if (failedToConnect || !started || signal == BACKUP_ACTIVATION.STOP)
                return;
+
             if (!isRemoteBackupUpToDate())
             {
-               /*
-                * XXX HORNETQ-768 Live is down, and this server was not in sync. Perhaps we should
-                * first try to wait a little longer to see if the 'live' comes back?
-                */
                throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup Server was not yet in sync with live");
             }
 
@@ -2211,15 +2198,16 @@
 
       public void close(final boolean permanently) throws Exception
       {
+         if (quorumManager != null)
+            quorumManager.causeExit();
+
          if (serverLocator0 != null)
          {
             serverLocator0.close();
-            serverLocator0 = null;
          }
 
          if (configuration.isBackup())
          {
-
             long timeout = 30000;
 
             long start = System.currentTimeMillis();
@@ -2247,7 +2235,7 @@
        */
       public void failOver()
       {
-         failOver = true;
+         quorumManager.failOver();
       }
    }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-03-13 12:05:21 UTC (rev 12296)
@@ -10,68 +10,47 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
 
 /**
  * Manages a quorum of servers used to determine whether a given server is running or not.
  * <p>
  * The use case scenario is an eventual connection loss between the live and the backup, where the
- * quorum will help a remote backup in deciding whether to replace its 'live' server or to wait for
- * it.
+ * quorum will help a remote backup deciding whether to replace its 'live' server or to keep trying
+ * to reconnect.
  */
-public final class QuorumManager implements ClusterTopologyListener
+public final class QuorumManager implements FailureListener
 {
-
-   // volatile boolean started;
-   private final ServerLocator locator;
+   private static final Logger log = Logger.getLogger(QuorumManager.class);
    private String targetServerID = "";
    private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
             new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
 
    private final ExecutorService executor;
+   private final String serverIdentity;
+   private final CountDownLatch latch;
+   private volatile BACKUP_ACTIVATION signal;
 
    /** safety parameter to make _sure_ we get out of await() */
    private static final int LATCH_TIMEOUT = 60;
    private static final long DISCOVERY_TIMEOUT = 5;
 
-   public QuorumManager(ServerLocator serverLocator, ExecutorService executor)
+   public QuorumManager(ServerLocator serverLocator, ExecutorService executor, String identity)
    {
+      this.serverIdentity = identity;
       this.executor = executor;
-      this.locator = serverLocator;
-      locator.addClusterTopologyListener(this);
+      this.latch = new CountDownLatch(1);
+      // locator.addClusterTopologyListener(this);
    }
 
-   @Override
-   public void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-            boolean last)
-   {
-      if (targetServerID.equals(nodeID))
-      {
-         return;
-      }
-      nodes.put(nodeID, connectorPair);
-   }
-
-   @Override
-   public void nodeDown(long eventUID, String nodeID)
-   {
-      if (targetServerID.equals(nodeID))
-      {
-         if (!targetServerID.isEmpty())
-            synchronized (this)
-            {
-               notify();
-            }
-      }
-      nodes.remove(nodeID);
-   }
-
    public void setLiveID(String liveID)
    {
       targetServerID = liveID;
@@ -129,6 +108,15 @@
       }
    }
 
+   @Override
+   public String toString()
+   {
+      return QuorumManager.class.getSimpleName() + "(server=" + serverIdentity + ")";
+   }
+
+   /**
+    * Attempts to connect to a given server.
+    */
    private static class ServerConnect implements Runnable
    {
       private final ServerLocatorImpl locator;
@@ -168,4 +156,67 @@
          }
       }
    }
+
+   @Override
+   public void connectionFailed(HornetQException exception, boolean failedOver)
+   {
+      // connection failed,
+      if (exception.getCode() == HornetQException.DISCONNECTED)
+      {
+         log.info("Connection failed: " + exception);
+         signal = BACKUP_ACTIVATION.FAIL_OVER;
+         latch.countDown();
+         return;
+      }
+      throw new RuntimeException(exception);
+
+      // by the time it got here, the connection might have been re-established
+      // check for it...
+      // if connectionIsOk:
+      // replicationEndPoint must see how up-to-date it is
+      // If not:
+      // 1. take a vote
+      // if vote result is weird... retry connecting
+   }
+
+   enum BACKUP_ACTIVATION
+   {
+      FAIL_OVER, STOP;
+   }
+
+   /**
+    * Called by the replicating backup (i.e. "SharedNothing" backup) to wait for the signal to
+    * fail-over or to stop.
+    * @return signal, indicating whether to stop or to fail-over
+    */
+   public final BACKUP_ACTIVATION waitForStatusChange()
+   {
+      try
+      {
+         latch.await();
+      }
+      catch (InterruptedException e)
+      {
+         return BACKUP_ACTIVATION.STOP;
+      }
+      return signal;
+   }
+
+   /**
+    * Cause the Activation thread to exit and the server to be stopped.
+    */
+   public synchronized void causeExit()
+   {
+      signal = BACKUP_ACTIVATION.STOP;
+      latch.countDown();
+   }
+
+   /**
+    * Releases the semaphore, causing the Activation thread to fail-over.
+    */
+   public synchronized void failOver()
+   {
+      signal = BACKUP_ACTIVATION.FAIL_OVER;
+      latch.countDown();
+   }
 }



More information about the hornetq-commits mailing list