[hornetq-commits] JBoss hornetq SVN: r9647 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 6 09:44:37 EDT 2010


Author: ataylor
Date: 2010-09-06 09:44:36 -0400 (Mon, 06 Sep 2010)
New Revision: 9647

Added:
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java
Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
added multiple nodes backup test and fixes

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java	2010-09-06 13:26:14 UTC (rev 9646)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java	2010-09-06 13:44:36 UTC (rev 9647)
@@ -18,6 +18,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 
 /**
@@ -59,17 +60,23 @@
       }
       else
       {
-         if(currentMember.getConnector().a == null && member.getConnector().a != null)
+         if(hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
          {
+            if(currentMember.getConnector().a == null)
+            {
+               nodes++;
+            }
             currentMember.getConnector().a =  member.getConnector().a;
             replaced = true;
-            nodes++;
          }
-         if(currentMember.getConnector().b == null && member.getConnector().b != null)
+         if(hasChanged(currentMember.getConnector().b, member.getConnector().b))
          {
+            if(currentMember.getConnector().b == null)
+            {
+               nodes++;
+            }
             currentMember.getConnector().b =  member.getConnector().b;
             replaced = true;
-            nodes++;
          }
       }
       return replaced;
@@ -89,6 +96,7 @@
             nodes--;
          }
       }
+
       return (member != null);
    }
 
@@ -129,6 +137,7 @@
       {
          desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
       }
+      desc += "\t" + "nodes=" + nodes + "\t" + "members=" + members();
       return desc;
    }
 
@@ -142,4 +151,9 @@
    {
       return topology.size();
    }
+
+   private boolean hasChanged(TransportConfiguration currentConnector, TransportConfiguration connector)
+   {
+      return (currentConnector == null && connector != null) || (currentConnector != null && !currentConnector.equals(connector));
+   }
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-09-06 13:26:14 UTC (rev 9646)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-09-06 13:44:36 UTC (rev 9647)
@@ -376,6 +376,31 @@
       {
          backup = false;
 
+         String nodeID = server.getNodeID().toString();
+
+         TopologyMember member = topology.getMember(nodeID);
+         //we swap the topology backup now = live
+         if (member != null)
+         {
+            member.getConnector().a = member.getConnector().b;
+
+            member.getConnector().b = null;
+         }
+
+         if(backupSessionFactory != null)
+         {
+            //todo we could use the topology of this to preempt it arriving from the cc
+            try
+            {
+               backupSessionFactory.close();
+            }
+            catch (Exception e)
+            {
+               log.warn("problem closing backup session factory", e);
+            }
+            backupSessionFactory = null;
+         }
+
          if (clusterConnections.size() > 0)
          {
             announceNode();

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-09-06 13:26:14 UTC (rev 9646)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-09-06 13:44:36 UTC (rev 9647)
@@ -621,32 +621,47 @@
 
       public void close() throws Exception
       {
-         long timeout = 30000;
+         if (configuration.isBackup())
+         {
+            long timeout = 30000;
 
-         long start = System.currentTimeMillis();
+            long start = System.currentTimeMillis();
 
-         while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
-         {
-            backupActivationThread.interrupt();
+            while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
+            {
+               backupActivationThread.interrupt();
 
-            Thread.sleep(1000);
+               Thread.sleep(1000);
+            }
+
+            if (System.currentTimeMillis() - start >= timeout)
+            {
+               log.warn("Timed out waiting for backup activation to exit");
+            }
+
+            if (liveLock != null)
+            {
+               liveLock.unlock();
+            }
+
+            if (backupLock != null)
+            {
+               backupLock.unlock();
+            }
          }
-
-         if (System.currentTimeMillis() - start >= timeout)
+         else
          {
-            log.warn("Timed out waiting for backup activation to exit");
-         }
+            //if we are now live, behave as live
+            // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
+            // started before the live
+            log.info("Live Server about to delete Live Lock file");
+            File liveFile = new File(configuration.getJournalDirectory(), "live.lock");
+            log.info("Live Server deleting Live Lock file");
+            liveFile.delete();
 
-         if (liveLock != null)
-         {
             liveLock.unlock();
+            log.info("Live server unlocking live lock");
          }
-
-         if (backupLock != null)
-         {
-            backupLock.unlock();
-         }
-
       }
    }
 

Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java	2010-09-06 13:44:36 UTC (rev 9647)
@@ -0,0 +1,318 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.config.BackupConnectorConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class MultipleBackupFailoverTest extends ServiceTestBase
+{
+   private ArrayList<TestableServer> servers = new ArrayList<TestableServer>(5);
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      clearData();
+      FakeLockFile.clearLocks();
+      servers.ensureCapacity(5);
+      createConfigs();
+
+      servers.get(1).start();
+      servers.get(2).start();
+      servers.get(3).start();
+      servers.get(4).start();
+      servers.get(5).start();
+      servers.get(0).start();
+   }
+
+   /**
+    * @throws Exception
+    */
+   protected void createConfigs() throws Exception
+   {
+
+      createLiveConfig(0);
+      createBackupConfig(1, 0, 2, 3, 4, 5);
+      createBackupConfig(2, 0, 1, 3, 4, 5);
+      createBackupConfig(3, 0, 1, 2, 4, 5);
+      createBackupConfig(4, 0, 1, 2, 3, 4);
+      createBackupConfig(5, 0, 1, 2, 3, 4);
+   }
+
+   public void test() throws Exception
+   {
+      ServerLocator locator = getServerLocator(0);
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      int backupNode;
+      ClientSession session = sendAndConsume(sf, true);
+      System.out.println("failing node 0");
+      fail(0, session);
+      session.close();
+      backupNode = waitForBackup(5);
+      session = sendAndConsume(sf, false);
+      System.out.println("failing node " + backupNode);
+      fail(backupNode, session);
+      session.close();
+      backupNode = waitForBackup(5);
+      session = sendAndConsume(sf, false);
+      System.out.println("failing node " + backupNode);
+      fail(backupNode, session);
+      session.close();
+      backupNode = waitForBackup(5);
+      session = sendAndConsume(sf, false);
+      System.out.println("failing node " + backupNode);
+      fail(backupNode, session);
+      session.close();
+      backupNode = waitForBackup(5);
+      session = sendAndConsume(sf, false);
+      System.out.println("failing node " + backupNode);
+      fail(backupNode, session);
+      session.close();
+      backupNode = waitForBackup(5);
+      session = sendAndConsume(sf, false);    
+      session.close();
+      servers.get(backupNode).stop();
+      System.out.println("MultipleBackupFailoverTest.test");
+   }
+
+   protected void fail(int node, final ClientSession... sessions) throws Exception
+   {
+      servers.get(node).crash(sessions);
+   }
+
+   protected int waitForBackup(long seconds)
+   {
+      long time = System.currentTimeMillis();
+      long toWait = seconds * 1000;
+      while(true)
+      {
+         for (int i = 0, serversSize = servers.size(); i < serversSize; i++)
+         {
+            TestableServer backupServer = servers.get(i);
+            if(backupServer.isInitialised())
+            {
+               return i;
+            }
+         }
+         try
+         {
+            Thread.sleep(100);
+         }
+         catch (InterruptedException e)
+         {
+            //ignore
+         }
+         if(System.currentTimeMillis() > (time + toWait))
+         {
+            fail("backup server never started");
+         }
+      }
+   }
+
+
+   private void createBackupConfig(int nodeid, int... nodes)
+   {
+      Configuration config1 = super.createDefaultConfig();
+      config1.getAcceptorConfigurations().clear();
+      config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(nodeid));
+      config1.setSecurityEnabled(false);
+      config1.setSharedStore(true);
+      config1.setBackup(true);
+      config1.setClustered(true);
+      List<String> staticConnectors = new ArrayList<String>();
+
+      for (int node : nodes)
+      {
+         TransportConfiguration liveConnector = getConnectorTransportConfiguration(node);
+         config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+         staticConnectors.add(liveConnector.getName());
+      }
+      TransportConfiguration backupConnector = getConnectorTransportConfiguration(nodeid);
+      List<String> pairs = null;
+      ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+               pairs);
+      config1.getClusterConfigurations().add(ccc1);
+      BackupConnectorConfiguration connectorConfiguration = new BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
+      config1.setBackupConnectorConfiguration(connectorConfiguration);
+      config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+      servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config1)));
+   }
+
+   public void createLiveConfig(int liveNode)
+   {
+      TransportConfiguration liveConnector = getConnectorTransportConfiguration(liveNode);
+      Configuration config0 = super.createDefaultConfig();
+      config0.getAcceptorConfigurations().clear();
+      config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(liveNode));
+      config0.setSecurityEnabled(false);
+      config0.setSharedStore(true);
+      config0.setClustered(true);
+       List<String> pairs = null;
+      ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
+               pairs);
+      config0.getClusterConfigurations().add(ccc0);
+      config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+      servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config0)));
+   }
+   private TransportConfiguration getConnectorTransportConfiguration(int node)
+   {
+      HashMap<String, Object> map = new HashMap<String, Object>();
+      map.put(TransportConstants.SERVER_ID_PROP_NAME, node);
+      return new TransportConfiguration(INVM_CONNECTOR_FACTORY, map);
+   }
+
+   private TransportConfiguration getAcceptorTransportConfiguration(int node)
+   {
+      HashMap<String, Object> map = new HashMap<String, Object>();
+      map.put(TransportConstants.SERVER_ID_PROP_NAME, node);
+      return new TransportConfiguration(INVM_ACCEPTOR_FACTORY, map);
+   }
+
+   protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
+         throws Exception
+   {
+      ClientSessionFactoryInternal sf;
+      CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+
+      locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+
+      sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+      assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+      return sf;
+   }
+
+   public ServerLocator getServerLocator(int... nodes)
+   {
+      TransportConfiguration[] configs = new TransportConfiguration[nodes.length];
+      for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+      {
+         HashMap<String, Object> map = new HashMap<String, Object>();
+         map.put(TransportConstants.SERVER_ID_PROP_NAME, i);
+         configs[i] = new TransportConfiguration(INVM_CONNECTOR_FACTORY, map);
+
+      }
+      return new ServerLocatorImpl(true, configs);
+   }
+
+   private ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean createQueue) throws Exception
+   {
+      ClientSession session = sf.createSession(false, true, true);
+
+      if (createQueue)
+      {
+         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+      }
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBodyBuffer().writeString("aardvarks");
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         Assert.assertEquals("aardvarks", message2.getBodyBuffer().readString());
+
+         Assert.assertEquals(i, message2.getObjectProperty(new SimpleString("count")));
+
+         message2.acknowledge();
+      }
+
+      ClientMessage message3 = consumer.receiveImmediate();
+
+      Assert.assertNull(message3);
+
+      return session;
+   }
+   class LatchClusterTopologyListener implements ClusterTopologyListener
+   {
+      final CountDownLatch latch;
+      int liveNodes = 0;
+      int backUpNodes = 0;
+      List<String> liveNode = new ArrayList<String>();
+      List<String> backupNode = new ArrayList<String>();
+
+      public LatchClusterTopologyListener(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+      {
+         if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
+         {
+            liveNode.add(connectorPair.a.getName());
+            latch.countDown();
+         }
+         if (connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
+         {
+            backupNode.add(connectorPair.b.getName());
+            latch.countDown();
+         }
+      }
+
+      public void nodeDown(String nodeID)
+      {
+         //To change body of implemented methods use File | Settings | File Templates.
+      }
+   }
+}



More information about the hornetq-commits mailing list