[hornetq-commits] JBoss hornetq SVN: r9816 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq: core/cluster/impl and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 27 09:43:11 EDT 2010


Author: ataylor
Date: 2010-10-27 09:43:09 -0400 (Wed, 27 Oct 2010)
New Revision: 9816

Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
Log:
added failback restart backup server support

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -1159,7 +1159,7 @@
                // cause reconnect loop
                public void run()
                {
-                  conn.fail(new HornetQException(HornetQException.DISCONNECTED,
+                  conn.fail(new HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
                                                  "The connection was disconnected because of server shutdown"));
                   if (msg.getNodeID() != null)
                   {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -165,6 +165,11 @@
          started = false;
       }
 
+      synchronized (waitLock)
+      {
+         waitLock.notify();
+      }
+
       try
       {
          thread.join();
@@ -173,6 +178,7 @@
       {
       }
 
+
       socket.close();
 
       socket = null;
@@ -222,7 +228,7 @@
 
          long toWait = timeout;
 
-         while (!received && (toWait > 0 || timeout == 0))
+         while (started && !received && (toWait > 0 || timeout == 0))
          {
             try
             {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -288,7 +288,7 @@
       callClosingListeners();
    }
    
-   public void disconnect()
+   public void disconnect(boolean failoverOnServerShutdown)
    {
       Channel channel0 = getChannel(0, -1);
 
@@ -307,7 +307,7 @@
          channel.flushConfirmations();
       }
 
-      Packet disconnect = new DisconnectMessage(nodeID);
+      Packet disconnect = new DisconnectMessage(nodeID, failoverOnServerShutdown);
       channel0.sendAndFlush(disconnect);
    }
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -31,16 +31,20 @@
    // Attributes ----------------------------------------------------
 
    private SimpleString nodeID;
+
+   private boolean failoverOnServerShutdown;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public DisconnectMessage(final SimpleString nodeID)
+   public DisconnectMessage(final SimpleString nodeID, boolean failoverOnServerShutdown)
    {
       super(PacketImpl.DISCONNECT);
 
       this.nodeID = nodeID;
+
+      this.failoverOnServerShutdown = failoverOnServerShutdown;
    }
 
    public DisconnectMessage()
@@ -55,16 +59,24 @@
       return nodeID;
    }
 
+   public boolean isFailoverOnServerShutdown()
+   {
+      return failoverOnServerShutdown;
+   }
+
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeNullableSimpleString(nodeID);
+
+      buffer.writeBoolean(failoverOnServerShutdown);
    }
 
    @Override
    public void decodeRest(final HornetQBuffer buffer)
    {
       nodeID = buffer.readNullableSimpleString();
+      failoverOnServerShutdown = buffer.readBoolean();
    }
 
    @Override

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -89,7 +89,7 @@
       manager.cleanup(this);
    }
 
-   public void disconnect()
+   public void disconnect(boolean clientFailover)
    {
    }
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -45,4 +45,6 @@
    void freeze();
 
    RemotingConnection getServerSideReplicatingConnection();
+
+   void stop(boolean failoverOnServerShutdown) throws Exception;
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -30,9 +30,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
@@ -255,6 +253,11 @@
 
    public void stop() throws Exception
    {
+      stop(false);
+   }
+
+   public void stop(boolean clientFailover) throws Exception
+   {
       if (!started)
       {
          return;
@@ -279,7 +282,7 @@
       {
          RemotingConnection conn = entry.connection;
 
-         conn.disconnect();
+         conn.disconnect(clientFailover);
       }
 
       for (Acceptor acceptor : acceptors)

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -74,5 +74,5 @@
 
    public abstract boolean isAwaitingFailback() throws Exception;
 
-   public abstract void killServer();
+   public abstract boolean isBackupLive() throws Exception;
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -52,4 +52,6 @@
    void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
 
    Topology getTopology();
+
+   void announceBackup() throws Exception;
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -210,7 +210,10 @@
                }
                catch (Exception e)
                {
-                  log.warn("did not connect the cluster connection to other nodes", e);
+                  if(started)
+                  {
+                     log.warn("did not connect the cluster connection to other nodes", e);
+                  }
                }
             }
          });

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -362,6 +362,7 @@
             try
             {
                broadcastGroup.start();
+               broadcastGroup.activate();
             }
             catch (Exception e)
             {
@@ -405,6 +406,29 @@
       }
    }
 
+   public void announceBackup() throws Exception
+   {
+      List<ClusterConnectionConfiguration> configs = this.configuration.getClusterConfigurations();
+      if(!configs.isEmpty())
+      {
+         ClusterConnectionConfiguration config = configs.get(0);
+
+         TransportConfiguration connector = configuration.getConnectorConfigurations().get(config.getConnectorName());
+
+         if (connector == null)
+         {
+            log.warn("No connecor with name '" + config.getConnectorName() +
+                     "'. backup cannot be announced.");
+            return;
+         }
+         announceBackup(config, connector);
+      }
+      else
+      {
+         log.warn("no cluster connections defined, unable to announce backup");
+      }
+   }
+
    private synchronized void announceNode()
    {
       // TODO does this really work with more than one cluster connection? I think not
@@ -784,6 +808,7 @@
       {
          return;
       }
+      log.info("announcing backup");
       backupSessionFactory = locator.connect();
       backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, connector));
    }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -36,6 +36,14 @@
 
    private final String SERVER_LOCK_NAME = "server.lock";
 
+   private static final String ACCESS_MODE = "rw";
+
+   private static  final int LIVE_LOCK_POS = 1;
+
+   private static  final int BACKUP_LOCK_POS = 2;
+
+   private static final int LOCK_LENGTH = 1;
+
    private static final byte LIVE = 'L';
 
    private static final byte FAILINGBACK = 'F';
@@ -52,6 +60,7 @@
 
    private final String directory;
 
+
    public FileLockNodeManager(final String directory)
    {
       this.directory = directory;
@@ -70,7 +79,7 @@
          file.createNewFile();
       }
 
-      RandomAccessFile raFile = new RandomAccessFile(file, "rw");
+      RandomAccessFile raFile = new RandomAccessFile(file, ACCESS_MODE);
 
       channel = raFile.getChannel();
 
@@ -92,12 +101,20 @@
       return getState() == FAILINGBACK;
    }
 
-   @Override
-   public void killServer()
+   public boolean isBackupLive() throws Exception
    {
-      System.exit(0);
+      FileLock liveAttemptLock;
+      liveAttemptLock = channel.tryLock(LIVE_LOCK_POS, LOCK_LENGTH, false);
+      if(liveAttemptLock == null)
+      {
+         return true;
+      }
+      else
+      {
+         liveAttemptLock.release();
+         return false;
+      }
    }
-
    @Override
    public void releaseBackup() throws Exception
    {
@@ -115,7 +132,7 @@
             Thread.sleep(2000);
          }
 
-         liveLock = channel.lock(1, 1, false);
+         liveLock = channel.lock(LIVE_LOCK_POS, 1, false);
 
          byte state = getState();
 
@@ -144,7 +161,7 @@
 
       log.info("Waiting to become backup node");
 
-      backupLock = channel.lock(2, 1, false);
+      backupLock = channel.lock(BACKUP_LOCK_POS, LOCK_LENGTH, false);
 
       log.info("** got backup lock");
 
@@ -157,7 +174,7 @@
 
       log.info("Waiting to obtain live lock");
 
-      liveLock = channel.lock(1, 1, false);
+      liveLock = channel.lock(LIVE_LOCK_POS, LOCK_LENGTH, false);
 
       log.info("Live Server Obtained live lock");
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -305,10 +305,19 @@
 
             checkJournalDirectory();
 
+            initialisePart1();
+
+            if(nodeManager.isBackupLive())
+            {
+               //looks like we've failed over at some point need to inform that we are the backup so when the current live
+               // goes down they failover to us
+               clusterManager.announceBackup();
+               //
+               Thread.sleep(2000);
+            }
+
             nodeManager.startLiveNode();
 
-            initialisePart1();
-
             initialisePart2();
             
             log.info("Server is now live");
@@ -362,25 +371,43 @@
             nodeManager.releaseBackup();
             if(configuration.isAllowAutoFailBack())
             {
-               //todo dont hardcode schedule timings
-               scheduledPool.scheduleAtFixedRate(new Runnable()
+               class FailbackChecker implements Runnable
                {
+                  boolean restarting = false;
                   public void run()
                   {
                      try
                      {
-                        if(nodeManager.isAwaitingFailback())
+                        if(!restarting && nodeManager.isAwaitingFailback())
                         {
-                           log.info("live server wants to restart, killing server");
-                           nodeManager.killServer();
+                           log.info("live server wants to restart, restarting server in backup");
+                           restarting = true;
+                           Thread t = new Thread(new Runnable()
+                           {
+                              public void run()
+                              {
+                                 try
+                                 {
+                                    stop(true);
+                                    configuration.setBackup(true);
+                                    start();
+                                 }
+                                 catch (Exception e)
+                                 {
+                                    log.info("unable to restart server, please kill and restart manually", e);
+                                 }
+                              }
+                           });
+                           t.start();
                         }
                      }
                      catch (Exception e)
                      {
-                        log.warn("unable to kill server, please kill manually to force failback");
+                        //hopefully it will work next call
                      }
                   }
-               },  1000l, 1000l, TimeUnit.MILLISECONDS);
+               }
+               scheduledPool.scheduleAtFixedRate(new FailbackChecker(),  1000l, 1000l, TimeUnit.MILLISECONDS);
             }
          }
          catch (InterruptedException e)
@@ -586,7 +613,7 @@
       {
          System.out.println("HornetQServerImpl.stop");
       }
-      remotingService.stop();
+      remotingService.stop(permanently);
 
       synchronized (this)
       {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -123,9 +123,9 @@
    }
 
    @Override
-   public void killServer()
+   public boolean isBackupLive() throws Exception
    {
-      //todo
+      return liveLock.availablePermits() == 0;
    }
 
    private void releaseBackupNode()

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java	2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java	2010-10-27 13:43:09 UTC (rev 9816)
@@ -137,8 +137,9 @@
    
    /**
     * Disconnect the connection, closing all channels
+    * @param clientFailover
     */
-   void disconnect();
+   void disconnect(boolean clientFailover);
    
    /**
     * returns true if any data has been received since the last time this method was called.



More information about the hornetq-commits mailing list