Author: borges
Date: 2011-11-03 09:42:42 -0400 (Thu, 03 Nov 2011)
New Revision: 11655
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
Log:
HORNETQ-720 Use ClusterTopologyListener to monitor the live's status in the remote
replication case
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-11-03
13:42:27 UTC (rev 11654)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-11-03
13:42:42 UTC (rev 11655)
@@ -19,6 +19,7 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.impl.QuorumManager;
/**
* A ReplicationEndpoint
@@ -38,4 +39,11 @@
void registerJournal(final byte id, final Journal journal);
+ /**
+ * Sets the quorumManager used by the server in the replicationEndpoint. It is used to
inform the
+ * backup server of the live's nodeID.
+ * @param quorumManager
+ */
+ void setQuorumManager(QuorumManager quorumManager);
+
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-03
13:42:27 UTC (rev 11654)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-03
13:42:42 UTC (rev 11655)
@@ -68,6 +68,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QuorumManager;
/**
*
@@ -119,6 +120,8 @@
private boolean deletePages = true;
private boolean started;
+ private QuorumManager quorumManager;
+
// Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServerImpl server, IOCriticalErrorListener
criticalErrorListener)
{
@@ -416,7 +419,7 @@
// Private -------------------------------------------------------
- private void finishSynchronization(String nodeID) throws Exception
+ private void finishSynchronization(String liveID) throws Exception
{
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
@@ -477,7 +480,8 @@
}
}
journalsHolder = null;
- server.setRemoteBackupUpToDate(nodeID);
+ quorumManager.setLiveID(liveID);
+ server.setRemoteBackupUpToDate(liveID);
log.info("Backup server " + server + " is synchronized with
live-server.");
return;
}
@@ -903,4 +907,10 @@
return "JournalSyncFile(file=" + file.getAbsolutePath() +
")";
}
}
+
+ @Override
+ public void setQuorumManager(QuorumManager quorumManager)
+ {
+ this.quorumManager = quorumManager;
+ }
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-03
13:42:27 UTC (rev 11654)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-03
13:42:42 UTC (rev 11655)
@@ -1959,11 +1959,8 @@
while (backupActivationThread.isAlive() && System.currentTimeMillis()
- start < timeout)
{
nodeManager.interrupt();
-
backupActivationThread.interrupt();
-
backupActivationThread.join(1000);
-
}
if (System.currentTimeMillis() - start >= timeout)
@@ -2049,6 +2046,7 @@
final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
final QuorumManager quorumManager = new QuorumManager(serverLocator);
+ replicationEndpoint.setQuorumManager(quorumManager);
serverLocator.setReconnectAttempts(-1);
@@ -2086,14 +2084,18 @@
// Server node (i.e. Life node) is not running, now the backup takes over.
// we must remember to close stuff we don't need any more
+ synchronized (quorumManager)
+ {
while (true)
{
- nodeManager.awaitLiveNode();
+ quorumManager.wait();
+ // nodeManager.awaitLiveNode();
break;
// if (!started || quorumManager.isNodeDown())
// {
// break;
// }
+ }
}
serverLocator.close();
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-11-03
13:42:27 UTC (rev 11654)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-11-03
13:42:42 UTC (rev 11655)
@@ -25,14 +25,14 @@
* quorum will help a remote backup in deciding whether to replace its 'live'
server or to wait for
* it.
*/
-final class QuorumManager implements ClusterTopologyListener
+public final class QuorumManager implements ClusterTopologyListener
{
// private static final Logger LOG = Logger.getLogger(QuorumManager.class);
// volatile boolean started;
private final ServerLocator locator;
- private final String targetServerName = "";
+ private String targetServerID = "";
private final Map<String, Pair<TransportConfiguration,
TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration,
TransportConfiguration>>();
private static final long DISCOVERY_TIMEOUT = 3;
@@ -47,7 +47,7 @@
public void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
boolean last)
{
- if (targetServerName.equals(nodeID))
+ if (targetServerID.equals(nodeID))
{
return;
}
@@ -57,20 +57,25 @@
@Override
public void nodeDown(long eventUID, String nodeID)
{
- if (targetServerName.equals(nodeID))
+ if (targetServerID.equals(nodeID))
{
- // targetReturned = false;
- // trigger action
-
- // decide to wake backup
- locator.removeClusterTopologyListener(this);
+ if (!targetServerID.isEmpty())
+ synchronized (this)
+ {
+ notify();
+ }
}
nodes.remove(nodeID);
}
+ public void setLiveID(String liveID)
+ {
+ targetServerID = liveID;
+ }
+
public boolean isNodeDown()
{
- boolean liveShutdownCleanly = !nodes.containsKey(targetServerName);
+ boolean liveShutdownCleanly = !nodes.containsKey(targetServerID);
boolean noOtherServersAround = nodes.size() == 0;
if (liveShutdownCleanly || noOtherServersAround)
return true;
@@ -85,7 +90,7 @@
{
for (Entry<String, Pair<TransportConfiguration,
TransportConfiguration>> pair : nodes.entrySet())
{
- if (targetServerName.equals(pair.getKey()))
+ if (targetServerID.equals(pair.getKey()))
continue;
TransportConfiguration serverTC = pair.getValue().getA();
ServerLocatorImpl locator =
(ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);