[hornetq-commits] JBoss hornetq SVN: r11559 - in trunk/hornetq-core/src/main/java/org/hornetq/core: server/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 17 11:33:57 EDT 2011


Author: borges
Date: 2011-10-17 11:33:57 -0400 (Mon, 17 Oct 2011)
New Revision: 11559

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Merge activation classes

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-10-17 15:33:40 UTC (rev 11558)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-10-17 15:33:57 UTC (rev 11559)
@@ -51,8 +51,6 @@
  * A CoreProtocolManager
  *
  * @author Tim Fox
- *
- *
  */
 public class CoreProtocolManager implements ProtocolManager
 {
@@ -216,8 +214,8 @@
                    * HORNETQ-720 Instantiate a new server locator to call notifyNodeUp(...)? Or send
                    * a CLUSTER_TOPOLOGY(_2?) message?
                    */
-//                  server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), true), true,
-//                                                          true);
+                  acceptorUsed.getClusterConnection().nodeAnnounced(System.currentTimeMillis(), msg.getNodeID(),
+                                                                    getPair(msg.getConnector(), true), true);
                }
             }
          }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-10-17 15:33:40 UTC (rev 11558)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-10-17 15:33:57 UTC (rev 11559)
@@ -40,8 +40,12 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.CoreQueueConfiguration;
@@ -83,6 +87,7 @@
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.replication.ReplicationEndpoint;
@@ -362,17 +367,14 @@
             if (configuration.isSharedStore() && configuration.isPersistenceEnabled())
             {
                activation = new SharedStoreLiveActivation();
-
-               // This should block until the lock is got
-
-               activation.run();
             }
             else
             {
                activation = new NoSharedStoreLiveActivation();
+            }
 
-               activation.run();
-            }
+            activation.run();
+
             started = true;
 
             HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() +
@@ -1862,7 +1864,7 @@
       }
    }
 
-   private class SharedStoreLiveActivation implements Activation
+   private final class SharedStoreLiveActivation implements Activation
    {
       public void run()
       {
@@ -1923,7 +1925,7 @@
       }
    }
 
-   private class SharedStoreBackupActivation implements Activation
+   private final class SharedStoreBackupActivation implements Activation
    {
       public void run()
       {
@@ -1984,9 +1986,6 @@
          }
       }
 
-      /**
-       *
-       */
       public void close(boolean permanently) throws Exception
       {
          if (configuration.isBackup())
@@ -2029,7 +2028,7 @@
       }
    }
 
-   private class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
+   private final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
    {
       boolean failedAlready = false;
 
@@ -2065,30 +2064,143 @@
       void close(boolean permanently) throws Exception;
    }
 
-   private class SharedNothingBackupActivation implements Activation
+   private final class SharedNothingBackupActivation implements Activation
    {
+      private ServerLocatorInternal serverLocator;
+
       public void run()
       {
          try
          {
-            // TODO
+            nodeManager.startBackup();
 
-            // Try-Connect to live server using live-connector-ref
+            initialisePart1();
 
-            // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+            final String liveConnectorName = configuration.getLiveConnectorName();
+            if (liveConnectorName == null)
+            {
+               throw new IllegalArgumentException(
+                                                  "Cannot have a replicated backup without configuring its live-server!");
+            }
+            clusterManager.start();
+
+            final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
+            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
+            final QuorumManager quorumManager = new QuorumManager(serverLocator, nodeManager.getNodeId().toString());
+
+            serverLocator.setReconnectAttempts(-1);
+
+            threadPool.execute(new Runnable()
+            {
+               @Override
+               public void run()
+               {
+                  try
+                  {
+                     final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
+                     if (liveServerSessionFactory == null)
+                     {
+                        // XXX HORNETQ-768
+                        throw new RuntimeException("Need to retry?");
+                     }
+                     CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
+                     Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
+                     Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+
+                     connectToReplicationEndpoint(replicationChannel);
+                     replicationEndpoint.start();
+                     clusterManager.announceReplicatingBackup(pingChannel);
+                  }
+                  catch (Exception e)
+                  {
+                     log.warn("Unable to announce backup for replication.", e);
+                  }
+               }
+            });
+
+            log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
+                     "] started, waiting live to fail before it gets active");
+            started = true;
+
+            // Server node (i.e. Life node) is not running, now the backup takes over.
+            // we must remember to close stuff we don't need any more
+            while (true)
+            {
+               nodeManager.awaitLiveNode();
+               if (quorumManager.isNodeDown())
+               {
+                  break;
+               }
+            }
+
+            serverLocator.close();
+            replicationEndpoint.stop();
+
+            if (!isRemoteBackupUpToDate())
+            {
+               /*
+                * XXX HORNETQ-768 Live is down, and this server was not in sync. Perhaps we should
+                * first try to wait a little longer to see if the 'live' comes back?
+                */
+               throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup Server was not yet in sync with live");
+            }
+
+            configuration.setBackup(false);
+            synchronized (startUpLock)
+            {
+               if (!started)
+                  return;
+               storageManager.start();
+               initialisePart2();
+               clusterManager.activate();
+            }
+
          }
          catch (Exception e)
          {
+            if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !started)
+               // do not log these errors if the server is being stopped.
+               return;
             log.error("Failure in initialisation", e);
+            e.printStackTrace();
          }
       }
 
-      public void close(boolean permanently) throws Exception
+      public void close(final boolean permanently) throws Exception
       {
+         if (serverLocator != null)
+         {
+            serverLocator.close();
+            serverLocator = null;
+         }
+
+         if (configuration.isBackup())
+         {
+
+            long timeout = 30000;
+
+            long start = System.currentTimeMillis();
+
+            while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
+            {
+               nodeManager.interrupt();
+
+               backupActivationThread.interrupt();
+
+               Thread.sleep(1000);
+            }
+
+            if (System.currentTimeMillis() - start >= timeout)
+            {
+               log.warn("Timed out waiting for backup activation to exit");
+            }
+
+            nodeManager.stopBackup();
+         }
       }
    }
 
-   private class NoSharedStoreLiveActivation implements Activation
+   private final class NoSharedStoreLiveActivation implements Activation
    {
       public void run()
       {
@@ -2115,7 +2227,14 @@
 
       public void close(boolean permanently) throws Exception
       {
-
+         if (permanently)
+         {
+            nodeManager.crashLiveServer();
+         }
+         else
+         {
+            nodeManager.pauseLiveServer();
+         }
       }
    }
 



More information about the hornetq-commits mailing list