[hornetq-commits] JBoss hornetq SVN: r12303 - in trunk: hornetq-core/src/main/java/org/hornetq/core/server/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Mar 15 05:56:55 EDT 2012
Author: borges
Date: 2012-03-15 05:56:54 -0400 (Thu, 15 Mar 2012)
New Revision: 12303
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
Log:
HORNETQ-720 HORNETQ-776 Limit Backup's reconnect attempts before and after quorum-vote.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-15 09:56:54 UTC (rev 12303)
@@ -585,7 +585,7 @@
}
// We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
- callFailureListeners(me, false, false);
+ callSessionFailureListeners(me, false, false);
// Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
// There are either no threads executing in createSession, or one is blocking on a createSession
@@ -667,6 +667,7 @@
}
catch (Exception ignore)
{
+ // no-op
}
}
@@ -698,14 +699,14 @@
{
sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
}
- callFailureListeners(me, true, false);
+ callSessionFailureListeners(me, true, false);
}
}
// This needs to be outside the failover lock to prevent deadlock
if (connection != null)
{
- callFailureListeners(me, true, true);
+ callSessionFailureListeners(me, true, true);
}
if (sessionsToClose != null)
{
@@ -908,7 +909,8 @@
+ "Please inform this condition to the HornetQ team");
}
- private void callFailureListeners(final HornetQException me, final boolean afterReconnect, final boolean failedOver)
+ private void callSessionFailureListeners(final HornetQException me, final boolean afterReconnect,
+ final boolean failedOver)
{
final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners);
@@ -966,7 +968,7 @@
for (FailureListener listener : oldListeners)
{
- // Add all apart from the first one which is the old DelegatingFailureListener
+ // Add all apart from the old DelegatingFailureListener
if (listener instanceof DelegatingFailureListener == false)
{
newListeners.add(listener);
@@ -1717,9 +1719,7 @@
"]";
}
- /* (non-Javadoc)
- * @see org.hornetq.core.client.impl.ClientSessionFactoryInternal#setReconnectAttempts(int)
- */
+ @Override
public void setReconnectAttempts(final int attempts)
{
reconnectAttempts = attempts;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-15 09:56:54 UTC (rev 12303)
@@ -42,10 +42,10 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
@@ -2109,7 +2109,7 @@
final TransportConfiguration tp = configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tp);
- quorumManager = new QuorumManager(serverLocator0, threadPool, getIdentity());
+ quorumManager = new QuorumManager(HornetQServerImpl.this, serverLocator0, threadPool, getIdentity());
replicationEndpoint.setQuorumManager(quorumManager);
serverLocator0.setReconnectAttempts(-1);
@@ -2121,11 +2121,14 @@
{
try
{
- final ClientSessionFactory liveServerSessionFactory = serverLocator0.connect();
+ final ClientSessionFactoryInternal liveServerSessionFactory = serverLocator0.connect();
if (liveServerSessionFactory == null)
{
throw new RuntimeException("Could not estabilish the connection");
}
+
+ liveServerSessionFactory.setReconnectAttempts(1);
+ quorumManager.setSessionFactory(liveServerSessionFactory);
CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
liveConnection.addFailureListener(quorumManager);
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
@@ -2162,6 +2165,10 @@
// we must remember to close stuff we don't need any more
if (failedToConnect)
return;
+ /**
+ * Wait for a shutdown order or for the live to fail. All the action happens inside
+ * {@link QuorumManager}
+ */
QuorumManager.BACKUP_ACTIVATION signal = quorumManager.waitForStatusChange();
serverLocator0.close();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-15 09:56:54 UTC (rev 12303)
@@ -1,10 +1,7 @@
package org.hornetq.core.server.impl;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collection;
+import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -16,9 +13,13 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.HornetQServer;
/**
* Manages a quorum of servers used to determine whether a given server is running or not.
@@ -31,24 +32,26 @@
{
private static final Logger log = Logger.getLogger(QuorumManager.class);
private String targetServerID = "";
- private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
- new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
-
private final ExecutorService executor;
private final String serverIdentity;
private final CountDownLatch latch;
private volatile BACKUP_ACTIVATION signal;
+ private ClientSessionFactoryInternal sessionFactory;
+ private final Topology topology;
+ private final HornetQServer backupServer;
/** safety parameter to make _sure_ we get out of await() */
- private static final int LATCH_TIMEOUT = 60;
+ private static final int LATCH_TIMEOUT = 30;
private static final long DISCOVERY_TIMEOUT = 5;
+ private static final int RECONNECT_ATTEMPTS = 5;
- public QuorumManager(ServerLocator serverLocator, ExecutorService executor, String identity)
+ public QuorumManager(HornetQServer backup, ServerLocator serverLocator, ExecutorService executor, String identity)
{
this.serverIdentity = identity;
this.executor = executor;
this.latch = new CountDownLatch(1);
- // locator.addClusterTopologyListener(this);
+ this.backupServer = backup;
+ topology = serverLocator.getTopology();
}
public void setLiveID(String liveID)
@@ -56,46 +59,54 @@
targetServerID = liveID;
}
- public boolean isNodeDown()
+ public boolean isLiveDown()
{
- if (nodes.size() == 0)
+ Collection<TopologyMember> nodes = topology.getMembers();
+
+ if (nodes.size() < 2) // the life server is also in the list
{
return true;
}
final int size = nodes.size();
- Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
+ Collection<ServerLocator> locatorsList = new LinkedList<ServerLocator>();
AtomicInteger pingCount = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(size);
+ int total = 0;
try
{
- for (Entry<String, Pair<TransportConfiguration, TransportConfiguration>> pair : nodes.entrySet())
+ for (TopologyMember tm : nodes)
{
- if (targetServerID.equals(pair.getKey()))
+ Pair<TransportConfiguration, TransportConfiguration> pair = tm.getConnector();
+// if (targetServerID.equals(pair.getKey()))
+// continue;
+ TransportConfiguration serverTC = pair.getA();
+ if (serverTC == null)
+ {
+ latch.countDown();
continue;
- TransportConfiguration serverTC = pair.getValue().getA();
+ }
+ total++;
ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
locatorsList.add(locator);
executor.submit(new ServerConnect(latch, pingCount, locator));
}
- // Some servers may have disappeared between the latch creation
- for (int i = 0; i < size - locatorsList.size(); i++)
- {
- latch.countDown();
- }
+
try
{
latch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
}
catch (InterruptedException interruption)
{
- // No-op. As the best the quorum can do now is to return the latest number it has
+ // No-op. The best the quorum can do now is to return the latest number it has
}
- return pingCount.get() * 2 >= locatorsList.size();
+ // -1: because the live server is not being filtered out.
+ return pingCount.get() * 2 >= locatorsList.size() - 1;
}
finally
{
- for (ServerLocator locator: locatorsList){
+ for (ServerLocator locator : locatorsList)
+ {
try
{
locator.close();
@@ -160,25 +171,65 @@
@Override
public void connectionFailed(HornetQException exception, boolean failedOver)
{
- // connection failed,
- if (exception.getCode() == HornetQException.DISCONNECTED)
+ // Check if connection was reestablished by the sessionFactory:
+ if (sessionFactory.numConnections() > 0)
{
- log.info("Connection failed: " + exception);
- signal = BACKUP_ACTIVATION.FAIL_OVER;
- latch.countDown();
+ resetReplication();
return;
}
- throw new RuntimeException(exception);
+ if (!isLiveDown())
+ {
+ try
+ {
+ // no point in repeating all the reconnection logic
+ sessionFactory.connect(RECONNECT_ATTEMPTS, false);
+ resetReplication();
+ return;
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() != HornetQException.NOT_CONNECTED)
+ log.warn("Unexpected exception while trying to reconnect", e);
+ }
+ }
- // by the time it got here, the connection might have been re-established
- // check for it...
- // if connectionIsOk:
- // replicationEndPoint must see how up-to-date it is
- // If not:
- // 1. take a vote
- // if vote result is weird... retry connecting
+ // live is assumed to be down, backup fails-over
+ signal = BACKUP_ACTIVATION.FAIL_OVER;
+ latch.countDown();
}
+ /**
+ *
+ */
+ private void resetReplication()
+ {
+ new Thread(new ServerRestart(backupServer)).run();
+ }
+
+ private static final class ServerRestart implements Runnable
+ {
+ final HornetQServer backup;
+
+ public ServerRestart(HornetQServer backup)
+ {
+ this.backup = backup;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ backup.stop();
+ backup.start();
+ }
+ catch (Exception e)
+ {
+ log.error("Error while restarting the backup server: " + backup, e);
+ }
+ }
+
+ }
enum BACKUP_ACTIVATION
{
FAIL_OVER, STOP;
@@ -212,11 +263,19 @@
}
/**
- * Releases the semaphore, causing the Activation thread to fail-over.
+ * Releases the latch, causing the backup activation thread to fail-over.
*/
public synchronized void failOver()
{
signal = BACKUP_ACTIVATION.FAIL_OVER;
latch.countDown();
}
+
+ /**
+ * @param sessionFactory the session factory used to connect to the live server
+ */
+ public void setSessionFactory(ClientSessionFactoryInternal sessionFactory)
+ {
+ this.sessionFactory = sessionFactory;
+ }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-15 09:56:54 UTC (rev 12303)
@@ -7,6 +7,8 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest
{
@@ -17,8 +19,7 @@
setupCluster();
startServers(0, 1, 2);
- // BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1],
- // PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+ BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
startServers(3, 4, 5);
for (int i : liveServerIDs)
More information about the hornetq-commits
mailing list