Author: borges
Date: 2012-03-13 08:05:21 -0400 (Tue, 13 Mar 2012)
New Revision: 12296
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
Log:
HORNETQ-720 HORNETQ-776 Improve signaling and rely on explicit signal from the live to
fail-over.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-13
12:05:00 UTC (rev 12295)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-13
12:05:21 UTC (rev 12296)
@@ -967,7 +967,6 @@
for (FailureListener listener : oldListeners)
{
// Add all apart from the first one which is the old DelegatingFailureListener
-
if (listener instanceof DelegatingFailureListener == false)
{
newListeners.add(listener);
@@ -1594,7 +1593,7 @@
}
}
- private class DelegatingFailureListener implements FailureListener
+ private final class DelegatingFailureListener implements FailureListener
{
private final Object connectionID;
@@ -1607,6 +1606,13 @@
{
handleConnectionFailure(connectionID, me);
}
+
+ @Override
+ public String toString()
+ {
+ return DelegatingFailureListener.class.getSimpleName() +
"('reconnectsOrFailover', hash=" +
+ super.hashCode() + ")";
+ }
}
private static final class ActualScheduledPinger implements Runnable
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-13
12:05:00 UTC (rev 12295)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-13
12:05:21 UTC (rev 12296)
@@ -1630,6 +1630,12 @@
}
}
}
+
+ @Override
+ public String toString()
+ {
+ return "FailureListener('restarts cluster
connections')";
+ }
});
if (log.isDebugEnabled())
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2012-03-13
12:05:00 UTC (rev 12295)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2012-03-13
12:05:21 UTC (rev 12296)
@@ -36,7 +36,6 @@
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptor;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.security.HornetQPrincipal;
@@ -102,7 +101,7 @@
private final ClusterManager clusterManager;
- private Map<ProtocolType, ProtocolManager> protocolMap = new
ConcurrentHashMap<ProtocolType, ProtocolManager>();
+ private final Map<ProtocolType, ProtocolManager> protocolMap = new
ConcurrentHashMap<ProtocolType, ProtocolManager>();
// Static --------------------------------------------------------
@@ -464,7 +463,6 @@
conn.connection.destroy();
}
-
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-13
12:05:00 UTC (rev 12295)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-13
12:05:21 UTC (rev 12296)
@@ -119,6 +119,7 @@
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.server.group.impl.LocalGroupingHandler;
import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.impl.QuorumManager.BACKUP_ACTIVATION;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.impl.ManagementServiceImpl;
import org.hornetq.core.settings.HierarchicalRepository;
@@ -500,7 +501,6 @@
if (replicationManager!=null) {
replicationManager.sendLiveIsStopping();
}
-
connectorsService.stop();
// we stop the groupingHandler before we stop the cluster manager so binding
mappings
@@ -1903,9 +1903,10 @@
if (nodeManager.isBackupLive())
{
- // looks like we've failed over at some point need to inform that we
are the backup so when the current
- // live
- // goes down they failover to us
+ /*
+ * looks like we've failed over at some point need to inform that we
are the backup
+ * so when the current live goes down they failover to us
+ */
if (log.isDebugEnabled())
{
log.debug("announcing backup to the former live" + this);
@@ -2087,8 +2088,8 @@
private final class SharedNothingBackupActivation implements Activation
{
private ServerLocatorInternal serverLocator0;
- private volatile boolean failedConnection;
- private volatile boolean failOver;
+ private volatile boolean failedToConnect;
+ private QuorumManager quorumManager;
public void run()
{
@@ -2108,7 +2109,7 @@
final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator0 =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
- final QuorumManager quorumManager = new QuorumManager(serverLocator0,
threadPool);
+ quorumManager = new QuorumManager(serverLocator0, threadPool,
getIdentity());
replicationEndpoint.setQuorumManager(quorumManager);
serverLocator0.setReconnectAttempts(-1);
@@ -2123,10 +2124,10 @@
final ClientSessionFactory liveServerSessionFactory =
serverLocator0.connect();
if (liveServerSessionFactory == null)
{
- // XXX HORNETQ-768
- throw new RuntimeException("Need to retry?");
+ throw new RuntimeException("Could not estabilish the
connection");
}
CoreRemotingConnection liveConnection =
liveServerSessionFactory.getConnection();
+ liveConnection.addFailureListener(quorumManager);
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id,
-1);
Channel replicationChannel =
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
connectToReplicationEndpoint(replicationChannel);
@@ -2136,14 +2137,13 @@
catch (Exception e)
{
log.warn("Unable to announce backup for replication. Trying to
stop the server.", e);
- failedConnection = true;
+ failedToConnect = true;
+
+ quorumManager.causeExit();
try
{
- synchronized (quorumManager)
- {
- quorumManager.notify();
- }
- HornetQServerImpl.this.stop();
+ if (!stopped)
+ HornetQServerImpl.this.stop();
return;
}
catch (Exception e1)
@@ -2160,31 +2160,18 @@
// Server node (i.e. Live node) is not running, now the backup takes over.
// we must remember to close stuff we don't need any more
- synchronized (quorumManager)
- {
- if (failedConnection)
+ if (failedToConnect)
return;
- while (true)
- {
- quorumManager.wait();
- if (failOver || !started || quorumManager.isNodeDown())
- {
- break;
- }
- }
- }
+ QuorumManager.BACKUP_ACTIVATION signal =
quorumManager.waitForStatusChange();
serverLocator0.close();
replicationEndpoint.stop();
- if (failedConnection)
+ if (failedToConnect || !started || signal == BACKUP_ACTIVATION.STOP)
return;
+
if (!isRemoteBackupUpToDate())
{
- /*
- * XXX HORNETQ-768 Live is down, and this server was not in sync. Perhaps
we should
- * first try to wait a little longer to see if the 'live' comes
back?
- */
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup
Server was not yet in sync with live");
}
@@ -2211,15 +2198,16 @@
public void close(final boolean permanently) throws Exception
{
+ if (quorumManager != null)
+ quorumManager.causeExit();
+
if (serverLocator0 != null)
{
serverLocator0.close();
- serverLocator0 = null;
}
if (configuration.isBackup())
{
-
long timeout = 30000;
long start = System.currentTimeMillis();
@@ -2247,7 +2235,7 @@
*/
public void failOver()
{
- failOver = true;
+ quorumManager.failOver();
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-13
12:05:00 UTC (rev 12295)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-13
12:05:21 UTC (rev 12296)
@@ -10,68 +10,47 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
/**
* Manages a quorum of servers used to determine whether a given server is running or
not.
* <p>
* The use case scenario is an eventual connection loss between the live and the backup,
where the
- * quorum will help a remote backup in deciding whether to replace its 'live'
server or to wait for
- * it.
+ * quorum will help a remote backup deciding whether to replace its 'live' server
or to keep trying
+ * to reconnect.
*/
-public final class QuorumManager implements ClusterTopologyListener
+public final class QuorumManager implements FailureListener
{
-
- // volatile boolean started;
- private final ServerLocator locator;
+ private static final Logger log = Logger.getLogger(QuorumManager.class);
private String targetServerID = "";
private final Map<String, Pair<TransportConfiguration,
TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration,
TransportConfiguration>>();
private final ExecutorService executor;
+ private final String serverIdentity;
+ private final CountDownLatch latch;
+ private volatile BACKUP_ACTIVATION signal;
/** safety parameter to make _sure_ we get out of await() */
private static final int LATCH_TIMEOUT = 60;
private static final long DISCOVERY_TIMEOUT = 5;
- public QuorumManager(ServerLocator serverLocator, ExecutorService executor)
+ public QuorumManager(ServerLocator serverLocator, ExecutorService executor, String
identity)
{
+ this.serverIdentity = identity;
this.executor = executor;
- this.locator = serverLocator;
- locator.addClusterTopologyListener(this);
+ this.latch = new CountDownLatch(1);
+ // locator.addClusterTopologyListener(this);
}
- @Override
- public void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- boolean last)
- {
- if (targetServerID.equals(nodeID))
- {
- return;
- }
- nodes.put(nodeID, connectorPair);
- }
-
- @Override
- public void nodeDown(long eventUID, String nodeID)
- {
- if (targetServerID.equals(nodeID))
- {
- if (!targetServerID.isEmpty())
- synchronized (this)
- {
- notify();
- }
- }
- nodes.remove(nodeID);
- }
-
public void setLiveID(String liveID)
{
targetServerID = liveID;
@@ -129,6 +108,15 @@
}
}
+ @Override
+ public String toString()
+ {
+ return QuorumManager.class.getSimpleName() + "(server=" + serverIdentity
+ ")";
+ }
+
+ /**
+ * Attempts to connect to a given server.
+ */
private static class ServerConnect implements Runnable
{
private final ServerLocatorImpl locator;
@@ -168,4 +156,67 @@
}
}
}
+
+ @Override
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ // connection failed,
+ if (exception.getCode() == HornetQException.DISCONNECTED)
+ {
+ log.info("Connection failed: " + exception);
+ signal = BACKUP_ACTIVATION.FAIL_OVER;
+ latch.countDown();
+ return;
+ }
+ throw new RuntimeException(exception);
+
+ // by the time it got here, the connection might have been re-established
+ // check for it...
+ // if connectionIsOk:
+ // replicationEndPoint must see how up-to-date it is
+ // If not:
+ // 1. take a vote
+ // if vote result is weird... retry connecting
+ }
+
+ enum BACKUP_ACTIVATION
+ {
+ FAIL_OVER, STOP;
+ }
+
+ /**
+ * Called by the replicating backup (i.e. "SharedNothing" backup) to wait
for the signal to
+ * fail-over or to stop.
+ * @return signal, indicating whether to stop or to fail-over
+ */
+ public final BACKUP_ACTIVATION waitForStatusChange()
+ {
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ return BACKUP_ACTIVATION.STOP;
+ }
+ return signal;
+ }
+
+ /**
+ * Cause the Activation thread to exit and the server to be stopped.
+ */
+ public synchronized void causeExit()
+ {
+ signal = BACKUP_ACTIVATION.STOP;
+ latch.countDown();
+ }
+
+ /**
+ * Releases the semaphore, causing the Activation thread to fail-over.
+ */
+ public synchronized void failOver()
+ {
+ signal = BACKUP_ACTIVATION.FAIL_OVER;
+ latch.countDown();
+ }
}