[hornetq-commits] JBoss hornetq SVN: r11655 - in trunk/hornetq-core/src/main/java/org/hornetq/core: replication/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 3 09:42:43 EDT 2011


Author: borges
Date: 2011-11-03 09:42:42 -0400 (Thu, 03 Nov 2011)
New Revision: 11655

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
Log:
HORNETQ-720 Use ClusterTopologyListener to monitor the live's status in the remote replication case

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java	2011-11-03 13:42:27 UTC (rev 11654)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java	2011-11-03 13:42:42 UTC (rev 11655)
@@ -19,6 +19,7 @@
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.ChannelHandler;
 import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.impl.QuorumManager;
 
 /**
  * A ReplicationEndpoint
@@ -38,4 +39,11 @@
 
    void registerJournal(final byte id, final Journal journal);
 
+   /**
+    * Sets the quorumManager used by the server in the replicationEndpoint. It is used to inform the
+    * backup server of the live's nodeID.
+    * @param quorumManager
+    */
+   void setQuorumManager(QuorumManager quorumManager);
+
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-11-03 13:42:27 UTC (rev 11654)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-11-03 13:42:42 UTC (rev 11655)
@@ -68,6 +68,7 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QuorumManager;
 
 /**
  *
@@ -119,6 +120,8 @@
    private boolean deletePages = true;
    private boolean started;
 
+   private QuorumManager quorumManager;
+
    // Constructors --------------------------------------------------
    public ReplicationEndpointImpl(final HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener)
    {
@@ -416,7 +419,7 @@
 
    // Private -------------------------------------------------------
 
-   private void finishSynchronization(String nodeID) throws Exception
+   private void finishSynchronization(String liveID) throws Exception
    {
       for (JournalContent jc : EnumSet.allOf(JournalContent.class))
       {
@@ -477,7 +480,8 @@
          }
       }
       journalsHolder = null;
-      server.setRemoteBackupUpToDate(nodeID);
+      quorumManager.setLiveID(liveID);
+      server.setRemoteBackupUpToDate(liveID);
       log.info("Backup server " + server + " is synchronized with live-server.");
       return;
    }
@@ -903,4 +907,10 @@
          return "JournalSyncFile(file=" + file.getAbsolutePath() + ")";
       }
    }
+
+   @Override
+   public void setQuorumManager(QuorumManager quorumManager)
+   {
+      this.quorumManager = quorumManager;
+   }
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-03 13:42:27 UTC (rev 11654)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-03 13:42:42 UTC (rev 11655)
@@ -1959,11 +1959,8 @@
             while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
             {
                nodeManager.interrupt();
-
                backupActivationThread.interrupt();
-
                backupActivationThread.join(1000);
-
             }
 
             if (System.currentTimeMillis() - start >= timeout)
@@ -2049,6 +2046,7 @@
             final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
             serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
             final QuorumManager quorumManager = new QuorumManager(serverLocator);
+            replicationEndpoint.setQuorumManager(quorumManager);
 
             serverLocator.setReconnectAttempts(-1);
 
@@ -2086,14 +2084,18 @@
 
             // Server node (i.e. Life node) is not running, now the backup takes over.
             // we must remember to close stuff we don't need any more
+            synchronized (quorumManager)
+            {
             while (true)
             {
-               nodeManager.awaitLiveNode();
+                  quorumManager.wait();
+                  // nodeManager.awaitLiveNode();
                break;
 //               if (!started || quorumManager.isNodeDown())
 //               {
 //                  break;
 //               }
+               }
             }
 
             serverLocator.close();

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2011-11-03 13:42:27 UTC (rev 11654)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2011-11-03 13:42:42 UTC (rev 11655)
@@ -25,14 +25,14 @@
  * quorum will help a remote backup in deciding whether to replace its 'live' server or to wait for
  * it.
  */
-final class QuorumManager implements ClusterTopologyListener
+public final class QuorumManager implements ClusterTopologyListener
 {
 
    // private static final Logger LOG = Logger.getLogger(QuorumManager.class);
 
    // volatile boolean started;
    private final ServerLocator locator;
-   private final String targetServerName = "";
+   private String targetServerID = "";
    private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
             new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
    private static final long DISCOVERY_TIMEOUT = 3;
@@ -47,7 +47,7 @@
    public void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair,
             boolean last)
    {
-      if (targetServerName.equals(nodeID))
+      if (targetServerID.equals(nodeID))
       {
          return;
       }
@@ -57,20 +57,25 @@
    @Override
    public void nodeDown(long eventUID, String nodeID)
    {
-      if (targetServerName.equals(nodeID))
+      if (targetServerID.equals(nodeID))
       {
-         // targetReturned = false;
-         // trigger action
-
-         // decide to wake backup
-         locator.removeClusterTopologyListener(this);
+         if (!targetServerID.isEmpty())
+            synchronized (this)
+            {
+               notify();
+            }
       }
       nodes.remove(nodeID);
    }
 
+   public void setLiveID(String liveID)
+   {
+      targetServerID = liveID;
+   }
+
    public boolean isNodeDown()
    {
-      boolean liveShutdownCleanly = !nodes.containsKey(targetServerName);
+      boolean liveShutdownCleanly = !nodes.containsKey(targetServerID);
       boolean noOtherServersAround = nodes.size() == 0;
       if (liveShutdownCleanly || noOtherServersAround)
          return true;
@@ -85,7 +90,7 @@
       {
          for (Entry<String, Pair<TransportConfiguration, TransportConfiguration>> pair : nodes.entrySet())
          {
-            if (targetServerName.equals(pair.getKey()))
+            if (targetServerID.equals(pair.getKey()))
                continue;
             TransportConfiguration serverTC = pair.getValue().getA();
             ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);



More information about the hornetq-commits mailing list