[hornetq-commits] JBoss hornetq SVN: r11465 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq: core/protocol/core/impl/wireformat and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 4 09:35:20 EDT 2011


Author: borges
Date: 2011-10-04 09:35:19 -0400 (Tue, 04 Oct 2011)
New Revision: 11465

Added:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClusterTopologyListener.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 HORNETQ-776 Quorum voting for a backup (untested)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClusterTopologyListener.java	2011-10-04 13:33:11 UTC (rev 11464)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClusterTopologyListener.java	2011-10-04 13:35:19 UTC (rev 11465)
@@ -21,12 +21,10 @@
  * A ClusterTopologyListener
  *
  * @author tim
- *
- *
  */
 public interface ClusterTopologyListener
 {
    void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
-   
+
    void nodeDown(String nodeID);
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java	2011-10-04 13:33:11 UTC (rev 11464)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java	2011-10-04 13:35:19 UTC (rev 11465)
@@ -2,6 +2,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
+import java.util.Set;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.SimpleString;
@@ -30,12 +31,12 @@
    private byte[] byteArray;
    private SimpleString pageStoreName;
    private FileType fileType;
-
    public enum FileType
    {
       JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
 
       private byte code;
+      private static final Set<FileType> ALL_OF = EnumSet.allOf(FileType.class);
 
       private FileType(int code)
       {
@@ -48,7 +49,7 @@
        */
       public static FileType getFileType(byte readByte)
       {
-         for (FileType type : EnumSet.allOf(FileType.class))
+         for (FileType type : ALL_OF)
          {
             if (type.code == readByte)
                return type;

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-10-04 13:33:11 UTC (rev 11464)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-10-04 13:35:19 UTC (rev 11465)
@@ -539,13 +539,15 @@
             nodeManager.startBackup();
 
             initialisePart1();
-            clusterManager.start();
 
-            String liveConnectorName = configuration.getLiveConnectorName();
+            final String liveConnectorName = configuration.getLiveConnectorName();
             if (liveConnectorName == null)
             {
                throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
             }
+            final QuorumManager quorumManager = new QuorumManager(clusterManager, nodeManager.getNodeId().toString());
+            clusterManager.start();
+
             final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
             serverLocator =
                      (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
@@ -559,21 +561,21 @@
                 {
                     try
                     {
-                        final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
+                     final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
 
-                        if (liveServerSessionFactory == null)
-                        {
+                     if (liveServerSessionFactory == null)
+                     {
                         // XXX HORNETQ-768
-                           throw new RuntimeException("Need to retry...");
-                        }
-                        CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
-                        Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
-                        Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+                        throw new RuntimeException("Need to retry?");
+                     }
+                     CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
+                     Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
+                     Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
 
-                        connectToReplicationEndpoint(replicationChannel);
-                        replicationEndpoint.start();
+                     connectToReplicationEndpoint(replicationChannel);
+                     replicationEndpoint.start();
 
-                        clusterManager.announceReplicatingBackup(pingChannel);
+                     clusterManager.announceReplicatingBackup(pingChannel);
                     }
                     catch (Exception e)
                     {
@@ -589,7 +591,14 @@
 
             // Server node (i.e. Life node) is not running, now the backup takes over.
             //we must remember to close stuff we don't need any more
-            nodeManager.awaitLiveNode();
+            while (true)
+            {
+               nodeManager.awaitLiveNode();
+               if (quorumManager.isNodeDown())
+               {
+                  break;
+               }
+            }
 
             serverLocator.close();
             replicationEndpoint.stop();

Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2011-10-04 13:35:19 UTC (rev 11465)
@@ -0,0 +1,170 @@
+package org.hornetq.core.server.impl;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.server.cluster.ClusterManager;
+
+/**
+ * Manages a quorum of servers used to determine whether a given server is running or not.
+ * <p>
+ * The use case scenario is an eventual connection loss between the live and the backup, where the
+ * quorum will help a remote backup in deciding whether to replace its 'live' server or to wait for
+ * it.
+ */
+final class QuorumManager implements ClusterTopologyListener
+{
+
+   // private static final Logger LOG = Logger.getLogger(QuorumManager.class);
+
+   // volatile boolean started;
+   private final ClusterManager clusterManager;
+   private final String targetServerName;
+   private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
+            new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
+   private static final long DISCOVERY_TIMEOUT = 3;
+
+   public QuorumManager(ClusterManager clusterManager, String nodeID)
+   {
+      this.clusterManager = clusterManager;
+      this.targetServerName = nodeID;
+
+      clusterManager.addClusterTopologyListener(this, true);
+   }
+
+   @Override
+   public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+   {
+      if (targetServerName.equals(nodeID))
+      {
+         return;
+      }
+      nodes.put(nodeID, connectorPair);
+   }
+
+   @Override
+   public void nodeDown(String nodeID)
+   {
+      if (targetServerName.equals(nodeID))
+      {
+         // targetReturned = false;
+         // trigger action
+
+         // decide to wake backup
+         clusterManager.removeClusterTopologyListener(this, true);
+      }
+      else
+      {
+         nodes.remove(nodeID);
+      }
+   }
+
+   public boolean isNodeDown()
+   {
+      boolean liveShutdownCleanly = !nodes.containsKey(targetServerName);
+      boolean noOtherServersAround = nodes.size() == 0;
+      if (liveShutdownCleanly || noOtherServersAround)
+         return true;
+      // go for the vote...
+      // Set<ServerLocator> currentNodes = new HashSet(nodes.entrySet());
+      final int size = nodes.size();
+      Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
+      AtomicInteger pingCount = new AtomicInteger(0);
+      ExecutorService pool = Executors.newFixedThreadPool(size);
+      final CountDownLatch latch = new CountDownLatch(size);
+      try
+      {
+         for (Entry<String, Pair<TransportConfiguration, TransportConfiguration>> pair : nodes.entrySet())
+         {
+            if (targetServerName.equals(pair.getKey()))
+               continue;
+            TransportConfiguration serverTC = pair.getValue().a;
+            ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
+            locatorsList.add(locator);
+            pool.submit(new ServerConnect(latch, pingCount, locator));
+         }
+         // Some servers may have disappeared between the latch creation
+         for (int i = 0; i < size - locatorsList.size(); i++)
+         {
+            latch.countDown();
+         }
+         try
+         {
+            latch.await();
+         }
+         catch (InterruptedException interruption)
+         {
+            // No-op. As the best the quorum can do now is to return the latest number it has
+         }
+         return pingCount.get() * 2 >= locatorsList.size();
+      }
+      finally
+      {
+         for (ServerLocator locator: locatorsList){
+            try
+            {
+               locator.close();
+            }
+            catch (Exception e)
+            {
+               // no-op
+            }
+         }
+         pool.shutdownNow();
+      }
+   }
+
+   private static class ServerConnect implements Runnable
+   {
+      private final ServerLocatorImpl locator;
+      private final CountDownLatch latch;
+      private final AtomicInteger count;
+
+      public ServerConnect(CountDownLatch latch, AtomicInteger count, ServerLocatorImpl serverLocator)
+      {
+         locator = serverLocator;
+         this.latch = latch;
+         this.count = count;
+      }
+
+      @Override
+      public void run()
+      {
+         locator.setReconnectAttempts(-1);
+         locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
+
+         final ClientSessionFactory liveServerSessionFactory;
+         try
+         {
+            liveServerSessionFactory = locator.connect();
+            if (liveServerSessionFactory != null)
+            {
+               count.incrementAndGet();
+            }
+         }
+         catch (Exception e)
+         {
+            // no-op
+         }
+         finally
+         {
+            latch.countDown();
+         }
+      }
+
+   }
+}



More information about the hornetq-commits mailing list