Author: borges
Date: 2011-10-04 09:35:19 -0400 (Tue, 04 Oct 2011)
New Revision: 11465
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 HORNETQ-776 Quorum voting for a backup (untested)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClusterTopologyListener.java 2011-10-04
13:33:11 UTC (rev 11464)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClusterTopologyListener.java 2011-10-04
13:35:19 UTC (rev 11465)
@@ -21,12 +21,10 @@
* A ClusterTopologyListener
*
* @author tim
- *
- *
*/
public interface ClusterTopologyListener
{
void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration>
connectorPair, boolean last);
-
+
void nodeDown(String nodeID);
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java 2011-10-04
13:33:11 UTC (rev 11464)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java 2011-10-04
13:35:19 UTC (rev 11465)
@@ -2,6 +2,7 @@
import java.nio.ByteBuffer;
import java.util.EnumSet;
+import java.util.Set;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
@@ -30,12 +31,12 @@
private byte[] byteArray;
private SimpleString pageStoreName;
private FileType fileType;
-
public enum FileType
{
JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
private byte code;
+ private static final Set<FileType> ALL_OF = EnumSet.allOf(FileType.class);
private FileType(int code)
{
@@ -48,7 +49,7 @@
*/
public static FileType getFileType(byte readByte)
{
- for (FileType type : EnumSet.allOf(FileType.class))
+ for (FileType type : ALL_OF)
{
if (type.code == readByte)
return type;
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-10-04
13:33:11 UTC (rev 11464)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-10-04
13:35:19 UTC (rev 11465)
@@ -539,13 +539,15 @@
nodeManager.startBackup();
initialisePart1();
- clusterManager.start();
- String liveConnectorName = configuration.getLiveConnectorName();
+ final String liveConnectorName = configuration.getLiveConnectorName();
if (liveConnectorName == null)
{
throw new IllegalArgumentException("Cannot have a replicated backup
without configuring its live-server!");
}
+ final QuorumManager quorumManager = new QuorumManager(clusterManager,
nodeManager.getNodeId().toString());
+ clusterManager.start();
+
final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
@@ -559,21 +561,21 @@
{
try
{
- final ClientSessionFactory liveServerSessionFactory =
serverLocator.connect();
+ final ClientSessionFactory liveServerSessionFactory =
serverLocator.connect();
- if (liveServerSessionFactory == null)
- {
+ if (liveServerSessionFactory == null)
+ {
// XXX HORNETQ-768
- throw new RuntimeException("Need to retry...");
- }
- CoreRemotingConnection liveConnection =
liveServerSessionFactory.getConnection();
- Channel pingChannel =
liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
- Channel replicationChannel =
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ throw new RuntimeException("Need to retry?");
+ }
+ CoreRemotingConnection liveConnection =
liveServerSessionFactory.getConnection();
+ Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id,
-1);
+ Channel replicationChannel =
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
- connectToReplicationEndpoint(replicationChannel);
- replicationEndpoint.start();
+ connectToReplicationEndpoint(replicationChannel);
+ replicationEndpoint.start();
- clusterManager.announceReplicatingBackup(pingChannel);
+ clusterManager.announceReplicatingBackup(pingChannel);
}
catch (Exception e)
{
@@ -589,7 +591,14 @@
// Server node (i.e. Life node) is not running, now the backup takes over.
//we must remember to close stuff we don't need any more
- nodeManager.awaitLiveNode();
+ while (true)
+ {
+ nodeManager.awaitLiveNode();
+ if (quorumManager.isNodeDown())
+ {
+ break;
+ }
+ }
serverLocator.close();
replicationEndpoint.stop();
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-10-04
13:35:19 UTC (rev 11465)
@@ -0,0 +1,170 @@
+package org.hornetq.core.server.impl;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.server.cluster.ClusterManager;
+
+/**
+ * Manages a quorum of servers used to determine whether a given server is running or
not.
+ * <p>
+ * The use case scenario is an eventual connection loss between the live and the backup,
where the
+ * quorum will help a remote backup in deciding whether to replace its 'live'
server or to wait for
+ * it.
+ */
+final class QuorumManager implements ClusterTopologyListener
+{
+
+ // private static final Logger LOG = Logger.getLogger(QuorumManager.class);
+
+ // volatile boolean started;
+ private final ClusterManager clusterManager;
+ private final String targetServerName;
+ private final Map<String, Pair<TransportConfiguration,
TransportConfiguration>> nodes =
+ new ConcurrentHashMap<String, Pair<TransportConfiguration,
TransportConfiguration>>();
+ private static final long DISCOVERY_TIMEOUT = 3;
+
+ public QuorumManager(ClusterManager clusterManager, String nodeID)
+ {
+ this.clusterManager = clusterManager;
+ this.targetServerName = nodeID;
+
+ clusterManager.addClusterTopologyListener(this, true);
+ }
+
+ @Override
+ public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
+ {
+ if (targetServerName.equals(nodeID))
+ {
+ return;
+ }
+ nodes.put(nodeID, connectorPair);
+ }
+
+ @Override
+ public void nodeDown(String nodeID)
+ {
+ if (targetServerName.equals(nodeID))
+ {
+ // targetReturned = false;
+ // trigger action
+
+ // decide to wake backup
+ clusterManager.removeClusterTopologyListener(this, true);
+ }
+ else
+ {
+ nodes.remove(nodeID);
+ }
+ }
+
+ public boolean isNodeDown()
+ {
+ boolean liveShutdownCleanly = !nodes.containsKey(targetServerName);
+ boolean noOtherServersAround = nodes.size() == 0;
+ if (liveShutdownCleanly || noOtherServersAround)
+ return true;
+ // go for the vote...
+ // Set<ServerLocator> currentNodes = new HashSet(nodes.entrySet());
+ final int size = nodes.size();
+ Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
+ AtomicInteger pingCount = new AtomicInteger(0);
+ ExecutorService pool = Executors.newFixedThreadPool(size);
+ final CountDownLatch latch = new CountDownLatch(size);
+ try
+ {
+ for (Entry<String, Pair<TransportConfiguration,
TransportConfiguration>> pair : nodes.entrySet())
+ {
+ if (targetServerName.equals(pair.getKey()))
+ continue;
+ TransportConfiguration serverTC = pair.getValue().a;
+ ServerLocatorImpl locator =
(ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
+ locatorsList.add(locator);
+ pool.submit(new ServerConnect(latch, pingCount, locator));
+ }
+ // Some servers may have disappeared between the latch creation
+ for (int i = 0; i < size - locatorsList.size(); i++)
+ {
+ latch.countDown();
+ }
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException interruption)
+ {
+ // No-op. As the best the quorum can do now is to return the latest number it
has
+ }
+ return pingCount.get() * 2 >= locatorsList.size();
+ }
+ finally
+ {
+ for (ServerLocator locator: locatorsList){
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
+ }
+ pool.shutdownNow();
+ }
+ }
+
+ private static class ServerConnect implements Runnable
+ {
+ private final ServerLocatorImpl locator;
+ private final CountDownLatch latch;
+ private final AtomicInteger count;
+
+ public ServerConnect(CountDownLatch latch, AtomicInteger count, ServerLocatorImpl
serverLocator)
+ {
+ locator = serverLocator;
+ this.latch = latch;
+ this.count = count;
+ }
+
+ @Override
+ public void run()
+ {
+ locator.setReconnectAttempts(-1);
+
locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
+
+ final ClientSessionFactory liveServerSessionFactory;
+ try
+ {
+ liveServerSessionFactory = locator.connect();
+ if (liveServerSessionFactory != null)
+ {
+ count.incrementAndGet();
+ }
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+
+ }
+}