Author: borges
Date: 2012-02-21 10:42:48 -0500 (Tue, 21 Feb 2012)
New Revision: 12156
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
Log:
HORNETQ-720 Backup should not worry about split brain if live had a clean exit.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -97,6 +97,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.LiveIsStoppingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
@@ -563,6 +564,11 @@
packet = new ReplicationSyncFileMessage();
break;
}
+ case PacketImpl.REPLICATION_SCHEDULED_FAILOVER:
+ {
+ packet = new LiveIsStoppingMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -204,6 +204,7 @@
public static final byte BACKUP_REGISTRATION_FAILED = 116;
public static final byte REPLICATION_START_FINISH_SYNC = 120;
+ public static final byte REPLICATION_SCHEDULED_FAILOVER = 121;
// Static --------------------------------------------------------
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
(rev 0)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -0,0 +1,21 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Message indicating that the live is stopping.
+ * <p>
+ * The backup starts the fail-over immediately after receiving this.
+ */
+public final class LiveIsStoppingMessage extends PacketImpl
+{
+
+ public LiveIsStoppingMessage()
+ {
+ super(PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+ }
+
+}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -119,9 +119,9 @@
private boolean started;
private QuorumManager quorumManager;
-
+
//https://community.jboss.org/thread/195519
- private Object stopLock = new Object();
+ private final Object stopLock = new Object();
// Constructors --------------------------------------------------
public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener
criticalErrorListener)
@@ -165,7 +165,7 @@
{
return;
}
-
+
if (type == PacketImpl.REPLICATION_APPEND)
{
handleAppendAddRecord((ReplicationAddMessage) packet);
@@ -223,10 +223,13 @@
{
handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
}
- else
- {
- log.warn("Packet " + packet
- + " can't be processed by the ReplicationEndpoint");
+ else if (type == PacketImpl.REPLICATION_SCHEDULED_FAILOVER)
+ {
+ handleLiveStopping();
+ }
+ else
+ {
+ log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
}
}
}
@@ -246,6 +249,14 @@
channel.send(response);
}
+ /**
+ * @throws HornetQException
+ */
+ private void handleLiveStopping() throws HornetQException
+ {
+ server.remoteFailOver();
+ }
+
public boolean isStarted()
{
return started;
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -135,4 +135,12 @@
* @param largeMessageIDs
*/
void sendLargeMessageIdListMessage(List<Long> largeMessageIDs);
+
+ /**
+ * Notifies the backup that the live server is stopping.
+ * <p>
+ * This notification allows the backup to skip quorum voting (or any other measure to
avoid
+ * 'split-brain') and do a faster fail-over.
+ */
+ void sendLiveIsStopping();
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -41,6 +41,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.LiveIsStoppingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
@@ -314,7 +315,7 @@
synchronized (replicationLock)
{
- enabled = false;
+ enabled = false;
// Complete any pending operations...
// Case the backup crashed, this should clean up any pending requests
@@ -601,4 +602,13 @@
sendReplicatePacket(new ReplicationStartSyncMessage(largeMessageIDs));
}
+
+ /**
+ * Notifies the backup that the live is about to stop.
+ */
+ public void sendLiveIsStopping()
+ {
+ if (enabled)
+ sendReplicatePacket(new LiveIsStoppingMessage());
+ }
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -260,7 +260,7 @@
private Thread backupActivationThread;
private Activation activation;
-
+
private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new
ShutdownOnCriticalErrorListener();
// Constructors
@@ -487,7 +487,7 @@
{
stop(failoverOnServerShutdown, false);
}
-
+
private void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws
Exception
{
synchronized (this)
@@ -497,6 +497,10 @@
return;
}
+ if (replicationManager!=null) {
+ replicationManager.sendLiveIsStopping();
+ }
+
connectorsService.stop();
// we stop the groupingHandler before we stop the cluster manager so binding
mappings
@@ -539,7 +543,7 @@
log.warn(e.getMessage(), e);
}
}
-
+
storageManager.clearContext();
synchronized (this)
@@ -651,9 +655,9 @@
{
// Ignore
}
-
+
securityStore.stop();
-
+
threadPool = null;
scheduledPool = null;
@@ -679,7 +683,7 @@
initialised = new CountDownLatch(1);
}
}
-
+
// to display in the log message
SimpleString tempNodeID = getNodeID();
@@ -804,7 +808,7 @@
{
return started;
}
-
+
public boolean isStopped()
{
return stopped;
@@ -1048,15 +1052,15 @@
{
storageManager.deleteQueueBinding(queue.getID());
}
-
+
if (queue.getPageSubscription() != null)
{
queue.getPageSubscription().close();
}
-
+
PageSubscription subs = queue.getPageSubscription();
-
+
if (subs != null)
{
subs.cleanupEntries(true);
@@ -1241,8 +1245,8 @@
addressSettingsRepository);
}
- /**
- * This method is protected as it may be used as a hook for creating a custom storage
manager (on tests for instance)
+ /**
+ * This method is protected as it may be used as a hook for creating a custom storage
manager (on tests for instance)
*/
private StorageManager createStorageManager()
{
@@ -1742,8 +1746,8 @@
pageSubscription.close();
throw e;
}
-
+
managementService.registerAddress(address);
managementService.registerQueue(queue, address, storageManager);
@@ -2051,19 +2055,19 @@
}
}
}
-
+
private final class ShutdownOnCriticalErrorListener implements
IOCriticalErrorListener
{
boolean failedAlready = false;
-
+
public synchronized void onIOException(int code, String message, SequentialFile
file)
{
if (!failedAlready)
{
failedAlready = true;
-
+
log.warn("Critical IO Error, shutting down the server. code=" +
code + ", message=" + message);
-
+
new Thread()
{
@Override
@@ -2092,6 +2096,7 @@
{
private ServerLocatorInternal serverLocator0;
private volatile boolean failedConnection;
+ private volatile boolean failOver;
public void run()
{
@@ -2161,7 +2166,7 @@
"] started, waiting live to fail before it gets active");
started = true;
- // Server node (i.e. Life node) is not running, now the backup takes over.
+ // Server node (i.e. Live node) is not running, now the backup takes over.
// we must remember to close stuff we don't need any more
synchronized (quorumManager)
{
@@ -2170,11 +2175,10 @@
while (true)
{
quorumManager.wait();
- break;
-// if (!started || quorumManager.isNodeDown())
-// {
-// break;
-// }
+ if (failOver || !started || quorumManager.isNodeDown())
+ {
+ break;
+ }
}
}
@@ -2245,6 +2249,14 @@
nodeManager.stopBackup();
}
}
+
+ /**
+ * Live has notified this server that it is going to stop.
+ */
+ public void failOver()
+ {
+ failOver = true;
+ }
}
@@ -2285,7 +2297,7 @@
}
}
}
-
+
/** This seems duplicate code all over the place, but for security reasons we
can't let something like this to be open in a
* utility class, as it would be a door to load anything you like in a safe VM.
* For that reason any class trying to do a privileged block should do with the
AccessController directly.
@@ -2359,10 +2371,8 @@
{
throw (HornetQException)e;
}
- else
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error
trying to start replication", e);
- }
+
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error
trying to start replication", e);
}
}
}
@@ -2408,4 +2418,21 @@
}
}
+ /**
+ * @throws HornetQException
+ */
+ public void remoteFailOver() throws HornetQException
+ {
+ if (!configuration.isBackup() || configuration.isSharedStore())
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR);
+ }
+ if (!backupUpToDate) return;
+ if (activation instanceof SharedNothingBackupActivation)
+ {
+ ((SharedNothingBackupActivation)activation).failOver();
+ }
+
+ }
+
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -8,6 +8,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
@@ -28,15 +29,16 @@
public final class QuorumManager implements ClusterTopologyListener
{
- // private static final Logger LOG = Logger.getLogger(QuorumManager.class);
-
// volatile boolean started;
private final ServerLocator locator;
private String targetServerID = "";
private final Map<String, Pair<TransportConfiguration,
TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration,
TransportConfiguration>>();
- private static final long DISCOVERY_TIMEOUT = 3;
+ /** safety parameter to make _sure_ we get out of await() */
+ private static final int LATCH_TIMEOUT = 60;
+ private static final long DISCOVERY_TIMEOUT = 5;
+
public QuorumManager(ServerLocator serverLocator)
{
this.locator = serverLocator;
@@ -75,12 +77,11 @@
public boolean isNodeDown()
{
- boolean liveShutdownCleanly = !nodes.containsKey(targetServerID);
- boolean noOtherServersAround = nodes.size() == 0;
- if (liveShutdownCleanly || noOtherServersAround)
+ if (nodes.size() == 0)
+ {
return true;
+ }
// go for the vote...
- // Set<ServerLocator> currentNodes = new HashSet(nodes.entrySet());
final int size = nodes.size();
Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
AtomicInteger pingCount = new AtomicInteger(0);
@@ -104,7 +105,7 @@
}
try
{
- latch.await();
+ latch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
}
catch (InterruptedException interruption)
{
@@ -163,8 +164,8 @@
finally
{
latch.countDown();
+ locator.close();
}
}
-
}
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -251,6 +251,4 @@
{
return TransportConfigurationUtils.getInVMConnector(live);
}
-
-
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-02-21
14:06:10 UTC (rev 12155)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-02-21
15:42:48 UTC (rev 12156)
@@ -2,6 +2,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
@@ -12,27 +13,37 @@
public void testQuorumVoting() throws Exception
{
+
setupCluster();
+
startServers(0, 1, 2, 3, 4, 5);
+ for (int i : new int[] { 0, 1, 2 })
+ {
+ setupSessionFactory(i, i + 3, isNetty(), false);
+ }
+ createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+
final TopologyListener liveTopologyListener = new
TopologyListener("LIVE-1");
- fail("must rewrite to use new interfaces");
- // servers[0].getClusterManager().addClusterTopologyListener(liveTopologyListener,
true);
- final TopologyListener backupTopologyListener = new
TopologyListener("BACKUP-3");
- //
servers[3].getClusterManager().addClusterTopologyListener(backupTopologyListener, true);
+ locators[0].addClusterTopologyListener(liveTopologyListener);
+ final TopologyListener backupTopologyListener = new
TopologyListener("LIVE-2");
+ locators[1].addClusterTopologyListener(backupTopologyListener);
+
assertTrue("we assume 3 is a backup",
servers[3].getConfiguration().isBackup());
assertFalse("no shared storage",
servers[3].getConfiguration().isSharedStore());
- setupSessionFactory(0, 3, isNetty(), false);
- setupSessionFactory(1, 4, isNetty(), false);
- setupSessionFactory(2, 5, isNetty(), false);
+ // assertEquals(liveTopologyListener.toString(), 6,
liveTopologyListener.nodes.size());
+ // assertEquals(backupTopologyListener.toString(), 6,
backupTopologyListener.nodes.size());
- assertEquals(liveTopologyListener.toString(), 6,
liveTopologyListener.nodes.size());
- assertEquals(backupTopologyListener.toString(), 6,
backupTopologyListener.nodes.size());
+ failNode(0);
+ waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
- failNode(0);
+ assertTrue(servers[3].waitForInitialization(10, TimeUnit.SECONDS));
+ assertFalse("3 should have failed over ",
servers[3].getConfiguration().isBackup());
+ servers[1].stop();
+ assertFalse("4 should have failed over ",
servers[4].getConfiguration().isBackup());
}
@Override
@@ -41,7 +52,6 @@
return false;
}
-
private static class TopologyListener implements ClusterTopologyListener
{
final String prefix;
@@ -53,12 +63,11 @@
}
@Override
- public
- void nodeUP(long eventUID, String nodeID,
- Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
+ public void nodeUP(long eventUID, String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair, boolean last)
{
nodes.put(nodeID, connectorPair);
- System.out.println(prefix + " UP: " + nodeID);
+ System.out.println(prefix + " UP: " + nodeID + "
connectPair=" + connectorPair);
}
@Override