[hornetq-commits] JBoss hornetq SVN: r11220 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 24 19:02:39 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-24 19:02:39 -0400 (Wed, 24 Aug 2011)
New Revision: 11220

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
Log:
fixing tests

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-24 23:02:39 UTC (rev 11220)
@@ -1415,12 +1415,19 @@
          if (type == PacketImpl.DISCONNECT)
          {
             final DisconnectMessage msg = (DisconnectMessage)packet;
+
+            SimpleString nodeID = msg.getNodeID();
             
             if (log.isTraceEnabled())
             {
-               log.trace("Disconnect being called on client:" + msg + " server locator = " + serverLocator, new Exception ("trace"));
+               log.trace("Disconnect being called on client:" + msg + " server locator = " + serverLocator + " notifying node " + nodeID + " as down", new Exception ("trace"));
             }
 
+            if (nodeID != null)
+            {
+               serverLocator.notifyNodeDown(msg.getNodeID().toString());
+            }
+
             closeExecutor.execute(new Runnable()
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1430,15 +1437,6 @@
                   conn.fail(new HornetQException(HornetQException.DISCONNECTED,
                                                  "The connection was disconnected because of server shutdown"));
 
-                  SimpleString nodeID = msg.getNodeID();
-                  if (log.isTraceEnabled())
-                  {
-                     log.trace("notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator + " csf created at ", ClientSessionFactoryImpl.this.e);
-                  }
-                  if (nodeID != null)
-                  {
-                     serverLocator.notifyNodeDown(msg.getNodeID().toString());
-                  }
                }
             });
          }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-24 23:02:39 UTC (rev 11220)
@@ -23,6 +23,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.core.logging.Logger;
@@ -35,15 +36,15 @@
 {
 
    private static final int BACKOF_TIMEOUT = 50;
-	
+
    private static final long serialVersionUID = -9037171688692471371L;
 
    private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
 
    private static final Logger log = Logger.getLogger(Topology.class);
-   
-   private transient HashMap<String, Long> mapBackof = new HashMap<String, Long>();
 
+   private transient HashMap<String, Pair<Long, Integer>> mapBackof = new HashMap<String, Pair<Long, Integer>>();
+
    private Executor executor = null;
 
    /** Used to debug operations.
@@ -55,7 +56,6 @@
     *  */
    private volatile Object owner;
 
-
    /**
     * topology describes the other cluster nodes that this server knows about:
     *
@@ -80,7 +80,7 @@
    {
       if (log.isDebugEnabled())
       {
-         log.debug(this + "::PPP Adding topology listener " + listener, new Exception("Trace"));
+         log.debug(this + "::Adding topology listener " + listener, new Exception("Trace"));
       }
       synchronized (topologyListeners)
       {
@@ -106,16 +106,6 @@
 
       synchronized (this)
       {
-         Long lastTime = mapBackof.get(nodeId);
-         
-         if (lastTime != null && System.currentTimeMillis() - lastTime.longValue() < BACKOF_TIMEOUT)
-         {
-            // The cluster may get in loop without this..
-            // Case one node is stll sending nodeDown while another member is sending nodeUp
-            log.warn("Node was considered down too fast, ignoring addMember on Topology", new Exception("trace"));
-            return false;
-         }
-
          TopologyMember currentMember = topology.get(nodeId);
 
          if (Topology.log.isDebugEnabled())
@@ -125,6 +115,11 @@
 
          if (currentMember == null)
          {
+            if (!testBackof(nodeId))
+            {
+               return false;
+            }
+
             replaced = true;
             if (Topology.log.isDebugEnabled())
             {
@@ -144,11 +139,21 @@
          {
             if (hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
             {
+               if (!testBackof(nodeId))
+               {
+                  return false;
+               }
+
                currentMember.getConnector().a = member.getConnector().a;
                replaced = true;
             }
             if (hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
             {
+               if (!testBackof(nodeId))
+               {
+                  return false;
+               }
+
                currentMember.getConnector().b = member.getConnector().b;
                replaced = true;
             }
@@ -180,9 +185,8 @@
       if (replaced)
       {
 
-         
          final ArrayList<ClusterTopologyListener> copy = copyListeners();
-         
+
          execute(new Runnable()
          {
             public void run()
@@ -211,6 +215,38 @@
    }
 
    /**
+    * @param nodeId
+    * @param backOfData
+    */
+   private boolean testBackof(final String nodeId)
+   {
+      Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
+
+      if (backOfData != null)
+      {
+         backOfData.b += 1;
+         
+         long timeDiff = System.currentTimeMillis() - backOfData.a;
+
+         // To prevent a loop where nodes are being considered down and up
+         if (backOfData.b > 5 && timeDiff < BACKOF_TIMEOUT)
+         {
+            // The cluster may get in loop without this..
+            // Case one node is stll sending nodeDown while another member is sending nodeUp
+            log.warn("The topology controller identified a blast of nodeUp and down and it is ignoring a nodeUP",
+                     new Exception("this exception is just to trace location"));
+            return false;
+         }
+         else if (timeDiff >= BACKOF_TIMEOUT)
+         {
+            mapBackof.remove(nodeId);
+         }
+      }
+
+      return true;
+   }
+
+   /**
     * @return
     */
    private ArrayList<ClusterTopologyListener> copyListeners()
@@ -229,7 +265,21 @@
 
       synchronized (this)
       {
-         mapBackof.put(nodeId, new Long(System.currentTimeMillis()));
+         Pair<Long, Integer> value = mapBackof.get(nodeId);
+
+         if (value == null)
+         {
+            value = new Pair<Long, Integer>(0l, 0);
+            mapBackof.put(nodeId, value);
+         }
+
+         value.a = System.currentTimeMillis();
+
+         if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
+         {
+            value.b = 0;
+         }
+
          member = topology.remove(nodeId);
       }
 
@@ -273,7 +323,7 @@
       }
       return member != null;
    }
-   
+
    protected void execute(final Runnable runnable)
    {
       if (executor != null)
@@ -291,43 +341,69 @@
     * @param nodeID
     * @param member
     */
-   public void sendMemberToListeners(String nodeID, TopologyMember member)
+   public void sendMemberToListeners(final String nodeID, final TopologyMember member)
    {
       // To make sure it was updated
       addMember(nodeID, member, false);
 
-      ArrayList<ClusterTopologyListener> copy = copyListeners();
+      final ArrayList<ClusterTopologyListener> copy = copyListeners();
 
-      // Now force sending it
-      for (ClusterTopologyListener listener : copy)
+      execute(new Runnable()
       {
-         if (log.isDebugEnabled())
+         public void run()
          {
-            log.debug("Informing client listener " + listener +
-                      " about itself node " +
-                      nodeID +
-                      " with connector=" +
-                      member.getConnector());
+            // Now force sending it
+            for (ClusterTopologyListener listener : copy)
+            {
+               if (log.isDebugEnabled())
+               {
+                  log.debug("Informing client listener " + listener +
+                            " about itself node " +
+                            nodeID +
+                            " with connector=" +
+                            member.getConnector());
+               }
+               listener.nodeUP(nodeID, member.getConnector(), false);
+            }
          }
-         listener.nodeUP(nodeID, member.getConnector(), false);
-      }
+      });
    }
 
-   public void sendTopology(final ClusterTopologyListener listener)
+   public synchronized void sendTopology(final ClusterTopologyListener listener)
    {
-      int count = 0;
+      if (log.isDebugEnabled())
+      {
+         log.debug(this + " is sending topology to " + listener);
+      }
 
-      Map<String, TopologyMember> copy;
+      final Map<String, TopologyMember> copy;
 
       synchronized (this)
       {
          copy = new HashMap<String, TopologyMember>(topology);
       }
 
-      for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
+      execute(new Runnable()
       {
-         listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
-      }
+         public void run()
+         {
+            int count = 0;
+
+            for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
+            {
+               if (log.isDebugEnabled())
+               {
+                  log.debug(Topology.this + " sending " +
+                            entry.getKey() +
+                            " / " +
+                            entry.getValue().getConnector() +
+                            " to " +
+                            listener);
+               }
+               listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
+            }
+         }
+      });
    }
 
    public TopologyMember getMember(final String nodeID)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-24 23:02:39 UTC (rev 11220)
@@ -380,15 +380,8 @@
    {
       topology.addClusterTopologyListener(listener);
 
-      // We now need to send the current topology to the client
-      executor.execute(new Runnable()
-      {
-         public void run()
-         {
-            topology.sendTopology(listener);
-
-         }
-      });
+      // no need to use an executor here since the Topology is already using one
+      topology.sendTopology(listener);
    }
 
    public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-08-24 23:02:39 UTC (rev 11220)
@@ -169,7 +169,7 @@
       removeConsumer(1);
       
       // Need to wait some time as we need to handle all redistributions before we stop the servers
-      Thread.sleep(5000);
+      Thread.sleep(1000);
 
       for (int i = 0; i <= 2; i++)
       {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-08-24 23:02:39 UTC (rev 11220)
@@ -173,7 +173,8 @@
       ClientSessionFactoryInternal sf;
       CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
 
-      locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+      LatchClusterTopologyListener topListener = new LatchClusterTopologyListener(countDownLatch);
+      locator.addClusterTopologyListener(topListener);
 
       sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
@@ -182,6 +183,7 @@
       {
          System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
       }
+      locator.removeClusterTopologyListener(topListener);
       assertTrue(ok);
       return sf;
    }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java	2011-08-24 02:16:47 UTC (rev 11219)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java	2011-08-24 23:02:39 UTC (rev 11220)
@@ -20,10 +20,12 @@
 
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.NodeManager;
 import org.hornetq.core.server.impl.InVMNodeManager;
 import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -36,7 +38,19 @@
 
    protected Map<Integer, TestableServer> servers = new HashMap<Integer, TestableServer>();
    private NodeManager nodeManager;
+   
+   Logger log = Logger.getLogger(SingleLiveMultipleBackupsFailoverTest.class);
 
+   public void _testLoop() throws Exception
+   {
+      for (int i = 0 ; i < 100; i++)
+      {
+         log.info("#test " + i);
+         testMultipleFailovers();
+         tearDown();
+         setUp();
+      }
+   }
    public void testMultipleFailovers() throws Exception
    {
       nodeManager = new InVMNodeManager();
@@ -56,7 +70,12 @@
       servers.get(4).start();
       servers.get(5).start();
 
-      ServerLocator locator = getServerLocator(0);
+      ServerLocatorImpl locator = (ServerLocatorImpl)getServerLocator(0);
+      
+      Topology topology = locator.getTopology();
+      
+      // for logging and debugging
+      topology.setOwner("testMultipleFailovers");
 
       locator.setBlockOnNonDurableSend(true);
       locator.setBlockOnDurableSend(true);
@@ -65,36 +84,32 @@
       ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
       int backupNode;
       ClientSession session = sendAndConsume(sf, true);
-      System.out.println("failing node 0");
-      Thread.sleep(500);
+
+      log.info("failing node 0");
       servers.get(0).crash(session);
       
       session.close();
       backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
       session = sendAndConsume(sf, false);
-      System.out.println("failing node " + backupNode);
-      Thread.sleep(500);
+      log.info("failing node " + backupNode);
       servers.get(backupNode).crash(session);
 
       session.close();
       backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
       session = sendAndConsume(sf, false);
-      System.out.println("failing node " + backupNode);
-      Thread.sleep(1000);
+      log.info("failing node " + backupNode);
       servers.get(backupNode).crash(session);
 
       session.close();
       backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
       session = sendAndConsume(sf, false);
-      System.out.println("failing node " + backupNode);
-      Thread.sleep(500);
+      log.info("failing node " + backupNode);
       servers.get(backupNode).crash(session);
 
       session.close();
       backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
       session = sendAndConsume(sf, false);
-      System.out.println("failing node " + backupNode);
-      Thread.sleep(500);
+      log.info("failing node " + backupNode);
       servers.get(backupNode).crash(session);
 
       session.close();



More information about the hornetq-commits mailing list