[hornetq-commits] JBoss hornetq SVN: r11226 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 25 22:41:30 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-25 22:41:29 -0400 (Thu, 25 Aug 2011)
New Revision: 11226

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
fixing testsuite

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-25 19:23:40 UTC (rev 11225)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-26 02:41:29 UTC (rev 11226)
@@ -1839,7 +1839,10 @@
 
    private void doCleanup(boolean failingOver)
    {
-      remotingConnection.removeFailureListener(this);
+      if (remotingConnection == null)
+      {
+         remotingConnection.removeFailureListener(this);
+      }
       
       if (log.isDebugEnabled())
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-25 19:23:40 UTC (rev 11225)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-26 02:41:29 UTC (rev 11226)
@@ -30,12 +30,14 @@
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author Clebert Suconic
  *         Created Aug 16, 2010
  */
 public class Topology implements Serializable
 {
 
-   private static final int BACKOF_TIMEOUT = 500;
+   // TODO: remove the backof from this class. It's probably not needed any longer
+  // private static final int BACKOF_TIMEOUT = 500;
 
    private static final long serialVersionUID = -9037171688692471371L;
 
@@ -43,7 +45,7 @@
 
    private static final Logger log = Logger.getLogger(Topology.class);
 
-   private transient HashMap<String, Pair<Long, Integer>> mapBackof = new HashMap<String, Pair<Long, Integer>>();
+  // private transient HashMap<String, Pair<Long, Integer>> mapBackof = new HashMap<String, Pair<Long, Integer>>();
 
    private Executor executor = null;
 
@@ -62,13 +64,16 @@
     * keys are node IDs
     * values are a pair of live/backup transport configurations
     */
-   private final Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
+   private final Map<String, TopologyMember> mapTopology = new ConcurrentHashMap<String, TopologyMember>();
 
    public Topology(final Object owner)
    {
       this.owner = owner;
-      Topology.log.debug("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
-                         new Exception("trace")); // Delete this line
+      if (log.isTraceEnabled())
+      {
+         Topology.log.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+                            new Exception("trace"));
+      }
    }
 
    public void setExecutor(final Executor executor)
@@ -100,143 +105,156 @@
       }
    }
 
-   public boolean addMember(final String nodeId, final TopologyMember member, final boolean last)
+   public boolean addMember(final String nodeId, final TopologyMember memberInput, final boolean last)
    {
-      boolean replaced = false;
-
       synchronized (this)
       {
-         TopologyMember currentMember = topology.get(nodeId);
+         TopologyMember currentMember = mapTopology.get(nodeId);
 
-         if (Topology.log.isDebugEnabled())
-         {
-            Topology.log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception("trace"));
-         }
-
          if (currentMember == null)
          {
-            if (!testBackof(nodeId))
+            /*if (!testBackof(nodeId))
             {
                return false;
-            }
+            } */
 
-            replaced = true;
             if (Topology.log.isDebugEnabled())
             {
-               Topology.log.debug("Add " + this +
+               Topology.log.debug(this + "::NewMemeberAdd " + this +
                                   " MEMBER WAS NULL, Add member nodeId=" +
                                   nodeId +
                                   " member = " +
-                                  member +
-                                  " replaced = " +
-                                  replaced +
+                                  memberInput +
                                   " size = " +
-                                  topology.size(), new Exception("trace"));
+                                  mapTopology.size(), new Exception("trace"));
             }
-            topology.put(nodeId, member);
+            mapTopology.put(nodeId, memberInput);
+            sendMemberUp(nodeId, memberInput);
+            return true;
          }
          else
          {
-            if (hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
+            if (log.isTraceEnabled())
             {
-               if (!testBackof(nodeId))
+               log.trace(this + ":: validating update for currentMember=" + currentMember + " of memberInput=" + memberInput);
+            }
+
+            boolean replaced = false;
+            TopologyMember memberToSend = currentMember;
+
+            if (hasChanged("a", memberToSend.getConnector().a, memberInput.getConnector().a))
+            {
+               /*if (!replaced && !testBackof(nodeId))
                {
                   return false;
-               }
-
-               currentMember.getConnector().a = member.getConnector().a;
+               }*/
+               memberToSend = new TopologyMember(memberInput.getConnector().a, memberToSend.getConnector().b);
                replaced = true;
             }
-            if (hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
+
+            if (hasChanged("b", memberToSend.getConnector().b, memberInput.getConnector().b))
             {
-               if (!testBackof(nodeId))
+               /*if (!replaced && !testBackof(nodeId))
                {
                   return false;
-               }
-
-               currentMember.getConnector().b = member.getConnector().b;
+               }*/
+               memberToSend = new TopologyMember(memberToSend.getConnector().a, memberInput.getConnector().b);
                replaced = true;
             }
 
-            if (member.getConnector().a == null)
+            if (replaced)
             {
-               member.getConnector().a = currentMember.getConnector().a;
+               mapTopology.remove(nodeId);
+               mapTopology.put(nodeId, memberToSend);
+
+               sendMemberUp(nodeId, memberToSend);
+               return true;
             }
-            if (member.getConnector().b == null)
-            {
-               member.getConnector().b = currentMember.getConnector().b;
-            }
-         }
 
-         if (Topology.log.isDebugEnabled())
-         {
-            Topology.log.debug(this + " Add member nodeId=" +
-                               nodeId +
-                               " member = " +
-                               member +
-                               " replaced = " +
-                               replaced +
-                               " size = " +
-                               topology.size(), new Exception("trace"));
          }
 
       }
 
-      if (replaced)
+      if (Topology.log.isDebugEnabled())
       {
+         Topology.log.debug(Topology.this + " Add member nodeId=" +
+                            nodeId +
+                            " member = " +
+                            memberInput +
+                            " has been ignored since there was no change", new Exception("trace"));
+      }
 
-         final ArrayList<ClusterTopologyListener> copy = copyListeners();
+      return false;
+   }
 
-         execute(new Runnable()
+   /**
+    * @param nodeId
+    * @param memberToSend
+    */
+   private void sendMemberUp(final String nodeId, final TopologyMember memberToSend)
+   {
+      final ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+      if (log.isTraceEnabled())
+      {
+         log.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements");
+      }
+
+      execute(new Runnable()
+      {
+         public void run()
          {
-            public void run()
+            for (ClusterTopologyListener listener : copy)
             {
-               for (ClusterTopologyListener listener : copy)
+               if (Topology.log.isTraceEnabled())
                {
-                  if (Topology.log.isTraceEnabled())
-                  {
-                     Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
-                  }
+                  Topology.log.trace(Topology.this + " informing " + listener + " about node up = " + nodeId);
+               }
 
-                  try
-                  {
-                     listener.nodeUP(nodeId, member.getConnector(), last);
-                  }
-                  catch (Throwable e)
-                  {
-                     log.warn(e.getMessage(), e);
-                  }
+               try
+               {
+                  listener.nodeUP(nodeId, memberToSend.getConnector(), false);
                }
+               catch (Throwable e)
+               {
+                  log.warn(e.getMessage(), e);
+               }
             }
-         });
-      }
-
-      return replaced;
+         }
+      });
    }
 
    /**
     * @param nodeId
     * @param backOfData
     */
-   private boolean testBackof(final String nodeId)
+   /*private boolean testBackof(final String nodeId)
    {
       Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
 
       if (backOfData != null)
       {
          backOfData.b += 1;
-         
+
          long timeDiff = System.currentTimeMillis() - backOfData.a;
 
          // To prevent a loop where nodes are being considered down and up
          if (backOfData.b > 5 && timeDiff < BACKOF_TIMEOUT)
          {
+
             // The cluster may get in loop without this..
             // Case one node is stll sending nodeDown while another member is sending nodeUp
-            log.warn("The topology controller identified a blast events and it's interrupting the flow of the loop",
+            log.warn(backOfData.b + ", The topology controller identified a blast events and it's interrupting the flow of the loop, nodeID=" +
+                              nodeId +
+                              ", topologyInstance=" +
+                              this,
                      new Exception("this exception is just to trace location"));
             return false;
          }
+         else if (timeDiff < BACKOF_TIMEOUT)
+         {
+            log.warn(this + "::Simple blast of " + nodeId, new Exception("this exception is just to trace location"));
+         }
          else if (timeDiff >= BACKOF_TIMEOUT)
          {
             mapBackof.remove(nodeId);
@@ -244,7 +262,7 @@
       }
 
       return true;
-   }
+   } */
 
    /**
     * @return
@@ -265,22 +283,22 @@
 
       synchronized (this)
       {
-         Pair<Long, Integer> value = mapBackof.get(nodeId);
+//         Pair<Long, Integer> value = mapBackof.get(nodeId);
+//
+//         if (value == null)
+//         {
+//            value = new Pair<Long, Integer>(0l, 0);
+//            mapBackof.put(nodeId, value);
+//         }
+//
+//         value.a = System.currentTimeMillis();
+//
+//         if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
+//         {
+//            value.b = 0;
+//         }
 
-         if (value == null)
-         {
-            value = new Pair<Long, Integer>(0l, 0);
-            mapBackof.put(nodeId, value);
-         }
-
-         value.a = System.currentTimeMillis();
-
-         if (System.currentTimeMillis() - value.a > BACKOF_TIMEOUT)
-         {
-            value.b = 0;
-         }
-
-         member = topology.remove(nodeId);
+         member = mapTopology.remove(nodeId);
       }
 
       if (Topology.log.isDebugEnabled())
@@ -291,7 +309,7 @@
                             ", result=" +
                             member +
                             ", size = " +
-                            topology.size(), new Exception("trace"));
+                            mapTopology.size(), new Exception("trace"));
       }
 
       if (member != null)
@@ -376,19 +394,19 @@
          log.debug(this + " is sending topology to " + listener);
       }
 
-      final Map<String, TopologyMember> copy;
-
-      synchronized (this)
-      {
-         copy = new HashMap<String, TopologyMember>(topology);
-      }
-
       execute(new Runnable()
       {
          public void run()
          {
             int count = 0;
 
+            final Map<String, TopologyMember> copy;
+
+            synchronized (Topology.this)
+            {
+               copy = new HashMap<String, TopologyMember>(mapTopology);
+            }
+
             for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
             {
                if (log.isDebugEnabled())
@@ -408,12 +426,12 @@
 
    public TopologyMember getMember(final String nodeID)
    {
-      return topology.get(nodeID);
+      return mapTopology.get(nodeID);
    }
 
    public boolean isEmpty()
    {
-      return topology.isEmpty();
+      return mapTopology.isEmpty();
    }
 
    public Collection<TopologyMember> getMembers()
@@ -421,7 +439,7 @@
       ArrayList<TopologyMember> members;
       synchronized (this)
       {
-         members = new ArrayList<TopologyMember>(topology.values());
+         members = new ArrayList<TopologyMember>(mapTopology.values());
       }
       return members;
    }
@@ -429,7 +447,7 @@
    public synchronized int nodes()
    {
       int count = 0;
-      for (TopologyMember member : topology.values())
+      for (TopologyMember member : mapTopology.values())
       {
          if (member.getConnector().a != null)
          {
@@ -452,7 +470,7 @@
    {
 
       String desc = text + "\n";
-      for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topology).entrySet())
+      for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(mapTopology).entrySet())
       {
          desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
       }
@@ -466,12 +484,12 @@
       {
          Topology.log.debug(this + "::clear", new Exception("trace"));
       }
-      topology.clear();
+      mapTopology.clear();
    }
 
    public int members()
    {
-      return topology.size();
+      return mapTopology.size();
    }
 
    /** The owner exists mainly for debug purposes.
@@ -482,16 +500,26 @@
       this.owner = owner;
    }
 
-   private boolean hasChanged(final TransportConfiguration currentConnector, final TransportConfiguration connector)
+   private boolean hasChanged(final String debugInfo, final TransportConfiguration a, final TransportConfiguration b)
    {
-      return currentConnector == null && connector != null ||
-             currentConnector != null &&
-             !currentConnector.equals(connector);
+      boolean changed = a == null && b != null || a != null && b != null && !a.equals(b);
+
+      if (log.isTraceEnabled())
+      {
+
+         log.trace(this + "::Validating current=" + a 
+                   + " != input=" + b +
+                   (changed ? " and it has changed" : " and it didn't change") +
+                   ", for validation of " +
+                   debugInfo);
+      }
+
+      return changed;
    }
 
    public TransportConfiguration getBackupForConnector(final TransportConfiguration connectorConfiguration)
    {
-      for (TopologyMember member : topology.values())
+      for (TopologyMember member : mapTopology.values())
       {
          if (member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
          {
@@ -509,7 +537,7 @@
    {
       if (owner == null)
       {
-         return super.toString();
+         return "Topology@" + Integer.toHexString(System.identityHashCode(this));
       }
       else
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java	2011-08-25 19:23:40 UTC (rev 11225)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/TopologyMember.java	2011-08-26 02:41:29 UTC (rev 11226)
@@ -32,6 +32,11 @@
       this.connector = connector;
    }
 
+   public TopologyMember(TransportConfiguration a, TransportConfiguration b)
+   {
+      this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
+   }
+
    public Pair<TransportConfiguration, TransportConfiguration> getConnector()
    {
       return connector;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-25 19:23:40 UTC (rev 11225)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-26 02:41:29 UTC (rev 11226)
@@ -564,6 +564,10 @@
       {
          if (connectorPair.b != null)
          {
+        	if (log.isTraceEnabled())
+        	{
+        	   log.trace(this + "::informing about backup to itself, nodeUUID=" + nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
+        	}
             server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
          }
          return;

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-25 19:23:40 UTC (rev 11225)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-26 02:41:29 UTC (rev 11226)
@@ -2021,9 +2021,9 @@
       for (int node : nodes)
       {
          log.info("#test start node " + node);
-         if (System.currentTimeMillis() - timeStarts[node] < 1000)
+         if (System.currentTimeMillis() - timeStarts[node] < 100)
          {
-            Thread.sleep(1000);
+            Thread.sleep(100);
          }
          timeStarts[node] = System.currentTimeMillis();
          
@@ -2065,10 +2065,10 @@
          {
             try
             {
-               if (System.currentTimeMillis() - timeStarts[node] < 1000)
+               if (System.currentTimeMillis() - timeStarts[node] < 100)
                {
                   // We can't stop and start a node too fast (faster than what the Topology could realize about this
-                  Thread.sleep(1000);
+                  Thread.sleep(100);
                }
                timeStarts[node] = System.currentTimeMillis();
                

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-08-25 19:23:40 UTC (rev 11225)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java	2011-08-26 02:41:29 UTC (rev 11226)
@@ -112,7 +112,6 @@
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 3, 4);
 
 
-      // startServers(3, 4, 5, 0, 1, 2);
       startServers(0, 1, 2, 3, 4, 5);
 
       log.info("");
@@ -122,11 +121,7 @@
          log.info(debugBindings(servers[i], servers[i].getConfiguration().getManagementNotificationAddress().toString()));
       }
       log.info("");
-      
-      //stopServers(3);
-      
-      Thread.sleep(1000);
-
+ 
       log.info("");
       for (int i = 0; i <= 5; i++)
       {
@@ -168,21 +163,13 @@
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
 
-      // Thread.sleep(1500);
-
       createQueue(0, "queues.testaddress", "queue0", null, false);
-      // Thread.sleep(1500);
       createQueue(1, "queues.testaddress", "queue0", null, false);
-      // Thread.sleep(1500);
       createQueue(2, "queues.testaddress", "queue0", null, false);
-      // Thread.sleep(1500);
 
       addConsumer(0, 0, "queue0", null);
-      // Thread.sleep(1500);
       addConsumer(1, 1, "queue0", null);
-      // Thread.sleep(1500);
       addConsumer(2, 2, "queue0", null);
-      // Thread.sleep(1500);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -291,21 +278,14 @@
 
       // Need to wait some time so the bridges and 
       // connectors had time to connect properly between the nodes
-      Thread.sleep(1000);
 
       createQueue(0, "queues.testaddress", "queue0", null, true);
-      // Thread.sleep(1500);
       createQueue(1, "queues.testaddress", "queue0", null, true);
-      // Thread.sleep(1500);
       createQueue(2, "queues.testaddress", "queue0", null, true);
-      // Thread.sleep(1500);
 
       addConsumer(0, 0, "queue0", null);
-      // Thread.sleep(1500);
       addConsumer(1, 1, "queue0", null);
-      // Thread.sleep(1500);
       addConsumer(2, 2, "queue0", null);
-      // Thread.sleep(1500);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -375,21 +355,13 @@
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
 
-      // Thread.sleep(1500);
-
       createQueue(0, "queues.testaddress", "queue0", null, true);
-      // Thread.sleep(1500);
       createQueue(1, "queues.testaddress", "queue0", null, true);
-      // Thread.sleep(1500);
       createQueue(2, "queues.testaddress", "queue0", null, true);
-      // Thread.sleep(1500);
 
       addConsumer(0, 0, "queue0", null);
-      // Thread.sleep(1500);
       addConsumer(1, 1, "queue0", null);
-      // Thread.sleep(1500);
       addConsumer(2, 2, "queue0", null);
-      // Thread.sleep(1500);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
       waitForBindings(1, "queues.testaddress", 1, 1, true);
@@ -402,16 +374,8 @@
       send(0, "queues.testaddress", 33, true, null);
 
       verifyReceiveRoundRobin(33, 0, 1, 2);
-
-      Thread.sleep(1000);
       
-      // TODO: need to make sure the shutdown won't be send, what will affect the test
       stopServers(2);
-//      
-//      Thread.sleep(5000);
-//
-//      waitForBindings(0, "queues.testaddress", 2, 2, false);
-//      waitForBindings(1, "queues.testaddress", 2, 2, false);
 
 
       send(0, "queues.testaddress", 100, true, null);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java	2011-08-25 19:23:40 UTC (rev 11225)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java	2011-08-26 02:41:29 UTC (rev 11226)
@@ -395,6 +395,8 @@
 
       backupJMSService.setContext(ctx2);
 
+      backupJMSService.getHornetQServer().setIdentity("JMSBackup");
+      log.info("Starting backup");
       backupJMSService.start();
 
       liveConf = createBasicConfig(0);
@@ -432,6 +434,9 @@
 
       liveJMSService.setContext(ctx1);
 
+      liveJMSService.getHornetQServer().setIdentity("JMSLive");
+      log.info("Starting life");
+
       liveJMSService.start();
 
       JMSUtil.waitForServer(backupService);



More information about the hornetq-commits mailing list