[hornetq-commits] JBoss hornetq SVN: r12156 - in trunk: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Feb 21 10:42:50 EST 2012


Author: borges
Date: 2012-02-21 10:42:48 -0500 (Tue, 21 Feb 2012)
New Revision: 12156

Added:
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
Log:
HORNETQ-720 Backup should not worry about split brain if live had a clean exit.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -97,6 +97,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.LiveIsStoppingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
@@ -563,6 +564,11 @@
             packet = new ReplicationSyncFileMessage();
             break;
          }
+         case PacketImpl.REPLICATION_SCHEDULED_FAILOVER:
+         {
+            packet = new LiveIsStoppingMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -204,6 +204,7 @@
    public static final byte BACKUP_REGISTRATION_FAILED = 116;
 
    public static final byte REPLICATION_START_FINISH_SYNC = 120;
+   public static final byte REPLICATION_SCHEDULED_FAILOVER = 121;
 
    // Static --------------------------------------------------------
 

Added: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java	                        (rev 0)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -0,0 +1,21 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Message indicating that the live is stopping.
+ * <p>
+ * The backup starts the fail-over immediately after receiving this.
+ */
+public final class LiveIsStoppingMessage extends PacketImpl
+{
+
+   public LiveIsStoppingMessage()
+   {
+      super(PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+   }
+
+}

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -119,9 +119,9 @@
    private boolean started;
 
    private QuorumManager quorumManager;
-   
+
    //https://community.jboss.org/thread/195519
-   private Object stopLock = new Object();
+   private final Object stopLock = new Object();
 
    // Constructors --------------------------------------------------
    public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener)
@@ -165,7 +165,7 @@
             {
                return;
             }
-            
+
             if (type == PacketImpl.REPLICATION_APPEND)
             {
                handleAppendAddRecord((ReplicationAddMessage) packet);
@@ -223,10 +223,13 @@
             {
                handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
             }
-            else
-            {
-               log.warn("Packet " + packet
-                     + " can't be processed by the ReplicationEndpoint");
+         else if (type == PacketImpl.REPLICATION_SCHEDULED_FAILOVER)
+         {
+            handleLiveStopping();
+         }
+         else
+         {
+            log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
             }
          }
       }
@@ -246,6 +249,14 @@
       channel.send(response);
    }
 
+   /**
+    * @throws HornetQException
+    */
+   private void handleLiveStopping() throws HornetQException
+   {
+      server.remoteFailOver();
+   }
+
    public boolean isStarted()
    {
       return started;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -135,4 +135,12 @@
     * @param largeMessageIDs
     */
    void sendLargeMessageIdListMessage(List<Long> largeMessageIDs);
+
+   /**
+    * Notifies the backup that the live server is stopping.
+    * <p>
+    * This notification allows the backup to skip quorum voting (or any other measure to avoid
+    * 'split-brain') and do a faster fail-over.
+    */
+   void sendLiveIsStopping();
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -41,6 +41,7 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.LiveIsStoppingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
@@ -314,7 +315,7 @@
 
       synchronized (replicationLock)
       {
-      enabled = false;
+         enabled = false;
 
       // Complete any pending operations...
       // Case the backup crashed, this should clean up any pending requests
@@ -601,4 +602,13 @@
          sendReplicatePacket(new ReplicationStartSyncMessage(largeMessageIDs));
 
    }
+
+   /**
+    * Notifies the backup that the live is about to stop.
+    */
+   public void sendLiveIsStopping()
+   {
+      if (enabled)
+      sendReplicatePacket(new LiveIsStoppingMessage());
+   }
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -260,7 +260,7 @@
    private Thread backupActivationThread;
 
    private Activation activation;
-   
+
    private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
 
    // Constructors
@@ -487,7 +487,7 @@
    {
       stop(failoverOnServerShutdown, false);
    }
-   
+
    private void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws Exception
    {
       synchronized (this)
@@ -497,6 +497,10 @@
             return;
          }
 
+         if (replicationManager!=null) {
+            replicationManager.sendLiveIsStopping();
+         }
+
          connectorsService.stop();
 
          // we stop the groupingHandler before we stop the cluster manager so binding mappings
@@ -539,7 +543,7 @@
             log.warn(e.getMessage(), e);
          }
       }
-      
+
       storageManager.clearContext();
 
       synchronized (this)
@@ -651,9 +655,9 @@
          {
             // Ignore
          }
-         
+
          securityStore.stop();
- 
+
          threadPool = null;
 
          scheduledPool = null;
@@ -679,7 +683,7 @@
                   initialised = new CountDownLatch(1);
             }
          }
-         
+
          // to display in the log message
          SimpleString tempNodeID = getNodeID();
 
@@ -804,7 +808,7 @@
    {
       return started;
    }
-   
+
    public boolean isStopped()
    {
       return stopped;
@@ -1048,15 +1052,15 @@
       {
          storageManager.deleteQueueBinding(queue.getID());
       }
-      
 
+
       if (queue.getPageSubscription() != null)
       {
          queue.getPageSubscription().close();
       }
-      
+
       PageSubscription subs = queue.getPageSubscription();
-      
+
       if (subs != null)
       {
          subs.cleanupEntries(true);
@@ -1241,8 +1245,8 @@
                                    addressSettingsRepository);
    }
 
-   /** 
-    * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance) 
+   /**
+    * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
     */
    private StorageManager createStorageManager()
    {
@@ -1742,8 +1746,8 @@
          pageSubscription.close();
          throw e;
       }
-      
 
+
       managementService.registerAddress(address);
       managementService.registerQueue(queue, address, storageManager);
 
@@ -2051,19 +2055,19 @@
          }
       }
    }
-   
+
    private final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
    {
       boolean failedAlready = false;
-      
+
       public synchronized void onIOException(int code, String message, SequentialFile file)
       {
          if (!failedAlready)
          {
             failedAlready = true;
-            
+
             log.warn("Critical IO Error, shutting down the server. code=" + code + ", message=" + message);
-            
+
             new Thread()
             {
                @Override
@@ -2092,6 +2096,7 @@
    {
       private ServerLocatorInternal serverLocator0;
       private volatile boolean failedConnection;
+      private volatile boolean failOver;
 
       public void run()
       {
@@ -2161,7 +2166,7 @@
                      "] started, waiting live to fail before it gets active");
             started = true;
 
-            // Server node (i.e. Life node) is not running, now the backup takes over.
+            // Server node (i.e. Live node) is not running, now the backup takes over.
             // we must remember to close stuff we don't need any more
             synchronized (quorumManager)
             {
@@ -2170,11 +2175,10 @@
                while (true)
                {
                   quorumManager.wait();
-                  break;
-//               if (!started || quorumManager.isNodeDown())
-//               {
-//                  break;
-//               }
+                  if (failOver || !started || quorumManager.isNodeDown())
+                  {
+                     break;
+                  }
                }
             }
 
@@ -2245,6 +2249,14 @@
             nodeManager.stopBackup();
          }
       }
+
+      /**
+       * Live has notified this server that it is going to stop.
+       */
+      public void failOver()
+      {
+         failOver = true;
+      }
    }
 
 
@@ -2285,7 +2297,7 @@
          }
       }
    }
-   
+
    /** This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
     *  utility class, as it would be a door to load anything you like in a safe VM.
     *  For that reason any class trying to do a privileged block should do with the AccessController directly.
@@ -2359,10 +2371,8 @@
             {
                throw (HornetQException)e;
             }
-            else
-            {
-               throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
-            }
+
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
          }
       }
    }
@@ -2408,4 +2418,21 @@
       }
    }
 
+   /**
+    * @throws HornetQException
+    */
+   public void remoteFailOver() throws HornetQException
+   {
+      if (!configuration.isBackup() || configuration.isSharedStore())
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR);
+      }
+      if (!backupUpToDate) return;
+      if (activation instanceof SharedNothingBackupActivation)
+      {
+         ((SharedNothingBackupActivation)activation).failOver();
+      }
+
+   }
+
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -8,6 +8,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.Pair;
@@ -28,15 +29,16 @@
 public final class QuorumManager implements ClusterTopologyListener
 {
 
-   // private static final Logger LOG = Logger.getLogger(QuorumManager.class);
-
    // volatile boolean started;
    private final ServerLocator locator;
    private String targetServerID = "";
    private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
             new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
-   private static final long DISCOVERY_TIMEOUT = 3;
 
+   /** safety parameter to make _sure_ we get out of await() */
+   private static final int LATCH_TIMEOUT = 60;
+   private static final long DISCOVERY_TIMEOUT = 5;
+
    public QuorumManager(ServerLocator serverLocator)
    {
       this.locator = serverLocator;
@@ -75,12 +77,11 @@
 
    public boolean isNodeDown()
    {
-      boolean liveShutdownCleanly = !nodes.containsKey(targetServerID);
-      boolean noOtherServersAround = nodes.size() == 0;
-      if (liveShutdownCleanly || noOtherServersAround)
+      if (nodes.size() == 0)
+      {
          return true;
+      }
       // go for the vote...
-      // Set<ServerLocator> currentNodes = new HashSet(nodes.entrySet());
       final int size = nodes.size();
       Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
       AtomicInteger pingCount = new AtomicInteger(0);
@@ -104,7 +105,7 @@
          }
          try
          {
-            latch.await();
+            latch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
          }
          catch (InterruptedException interruption)
          {
@@ -163,8 +164,8 @@
          finally
          {
             latch.countDown();
+            locator.close();
          }
       }
-
    }
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -251,6 +251,4 @@
    {
       return TransportConfigurationUtils.getInVMConnector(live);
    }
-
-
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java	2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java	2012-02-21 15:42:48 UTC (rev 12156)
@@ -2,6 +2,7 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
@@ -12,27 +13,37 @@
 
    public void testQuorumVoting() throws Exception
    {
+
       setupCluster();
+
       startServers(0, 1, 2, 3, 4, 5);
+      for (int i : new int[] { 0, 1, 2 })
+      {
+         setupSessionFactory(i, i + 3, isNetty(), false);
+      }
 
+      createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+
       final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
-      fail("must rewrite to use new interfaces");
-      // servers[0].getClusterManager().addClusterTopologyListener(liveTopologyListener, true);
 
-      final TopologyListener backupTopologyListener = new TopologyListener("BACKUP-3");
-      // servers[3].getClusterManager().addClusterTopologyListener(backupTopologyListener, true);
+      locators[0].addClusterTopologyListener(liveTopologyListener);
 
+      final TopologyListener backupTopologyListener = new TopologyListener("LIVE-2");
+      locators[1].addClusterTopologyListener(backupTopologyListener);
+
       assertTrue("we assume 3 is a backup", servers[3].getConfiguration().isBackup());
       assertFalse("no shared storage", servers[3].getConfiguration().isSharedStore());
 
-      setupSessionFactory(0, 3, isNetty(), false);
-      setupSessionFactory(1, 4, isNetty(), false);
-      setupSessionFactory(2, 5, isNetty(), false);
+      // assertEquals(liveTopologyListener.toString(), 6, liveTopologyListener.nodes.size());
+      // assertEquals(backupTopologyListener.toString(), 6, backupTopologyListener.nodes.size());
 
-      assertEquals(liveTopologyListener.toString(), 6, liveTopologyListener.nodes.size());
-      assertEquals(backupTopologyListener.toString(), 6, backupTopologyListener.nodes.size());
+      failNode(0);
+      waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
 
-      failNode(0);
+      assertTrue(servers[3].waitForInitialization(10, TimeUnit.SECONDS));
+      assertFalse("3 should have failed over ", servers[3].getConfiguration().isBackup());
+      servers[1].stop();
+      assertFalse("4 should have failed over ", servers[4].getConfiguration().isBackup());
    }
 
    @Override
@@ -41,7 +52,6 @@
       return false;
    }
 
-
    private static class TopologyListener implements ClusterTopologyListener
    {
       final String prefix;
@@ -53,12 +63,11 @@
       }
 
       @Override
-      public
- void nodeUP(long eventUID, String nodeID,
-               Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+      public void nodeUP(long eventUID, String nodeID,
+                         Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
       {
          nodes.put(nodeID, connectorPair);
-         System.out.println(prefix + " UP: " + nodeID);
+         System.out.println(prefix + " UP: " + nodeID + " connectPair=" + connectorPair);
       }
 
       @Override



More information about the hornetq-commits mailing list