[hornetq-commits] JBoss hornetq SVN: r11066 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/protocol/core/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 29 00:22:24 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-07-29 00:22:24 -0400 (Fri, 29 Jul 2011)
New Revision: 11066

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
test fixes

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -68,8 +68,6 @@
 
    private String identity;
 
-   private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
-
    private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
 
    private TransportConfiguration[] initialConnectors;
@@ -444,6 +442,7 @@
    public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
    {
       this(new Topology(null), useHA, groupConfiguration, null);
+      topology.setOwner(this);
    }
 
    /**
@@ -454,6 +453,7 @@
    public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
    {
       this(new Topology(null), useHA, null, transportConfigs);
+      topology.setOwner(this);
    }
 
    /**
@@ -1215,7 +1215,7 @@
       }
 
       removed = topology.removeMember(nodeID);
-
+ 
       if (!topology.isEmpty())
       {
          updateArraysAndPairs();
@@ -1232,13 +1232,6 @@
          receivedTopology = false;
       }
 
-      if (removed)
-      {
-         for (ClusterTopologyListener listener : topologyListeners)
-         {
-            listener.nodeDown(nodeID);
-         }
-      }
    }
 
    public synchronized void notifyNodeUp(final String nodeID,
@@ -1263,7 +1256,7 @@
          log.debug("XXX ZZZ NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception ("trace"));
       }
 
-      topology.addMember(nodeID, new TopologyMember(connectorPair));
+      topology.addMember(nodeID, new TopologyMember(connectorPair), last);
 
       TopologyMember actMember = topology.getMember(nodeID);
 
@@ -1286,11 +1279,6 @@
          receivedTopology = true;
       }
 
-      for (ClusterTopologyListener listener : topologyListeners)
-      {
-         listener.nodeUP(nodeID, connectorPair, last);
-      }
-
       // Notify if waiting on getting topology
       notify();
    }
@@ -1371,16 +1359,12 @@
 
    public void addClusterTopologyListener(final ClusterTopologyListener listener)
    {
-      topologyListeners.add(listener);
-      if (topology.members() > 0)
-      {
-         log.debug(this + "::ServerLocatorImpl.addClusterTopologyListener");
-      }
+      topology.addClusterTopologyListener(listener);
    }
 
    public void removeClusterTopologyListener(final ClusterTopologyListener listener)
    {
-      topologyListeners.remove(listener);
+      topology.removeClusterTopologyListener(listener);
    }
 
    public synchronized void addFactory(ClientSessionFactoryInternal factory)

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -15,10 +15,14 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -30,16 +34,13 @@
  */
 public class Topology implements Serializable
 {
-   
-   /**
-    * 
-    */
+
    private static final long serialVersionUID = -9037171688692471371L;
 
-   
+   private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
 
    private static final Logger log = Logger.getLogger(Topology.class);
-   
+
    /** Used to debug operations.
     * 
     *  Someone may argue this is not needed. But it's impossible to debg anything related to topology without knowing what node
@@ -47,13 +48,15 @@
     *  
     *  Hence I added some information to locate debugging here. 
     *  */
-   private final Object owner;
-   
-   
+   private volatile Object owner;
+
+   private volatile Executor executor;
+
    public Topology(final Object owner)
    {
       this.owner = owner;
-      log.debug("ZZZ III Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception ("trace")); // Delete this line
+      Topology.log.debug("ZZZ III Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+                         new Exception("trace")); // Delete this line
    }
 
    /*
@@ -62,89 +65,224 @@
     * keys are node IDs
     * values are a pair of live/backup transport configurations
     */
-   private Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
+   private final Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
 
-   private boolean debug = log.isDebugEnabled();
+   public void setExecutor(Executor executor)
+   {
+      this.executor = executor;
+   }
 
-   public synchronized boolean addMember(String nodeId, TopologyMember member)
+   public void addClusterTopologyListener(final ClusterTopologyListener listener)
    {
-      boolean replaced = false;
-      TopologyMember currentMember = topology.get(nodeId);
-      if (debug)
+      if (log.isDebugEnabled())
       {
-         log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception ("trace"));
-         log.debug(describe("Before:"));
+         log.debug(this + "::PPP Adding topology listener " + listener, new Exception("Trace"));
       }
-      if(currentMember == null)
+      synchronized (topologyListeners)
       {
-         replaced = true;
-        if (log.isDebugEnabled())
-         {
-            log.debug("ZZZ " + this + " MEMBER WAS NULL, Add member nodeId=" + nodeId + " member = " + member + " replaced = " + replaced + " size = " + topology.size(), new Exception ("trace"));
-         }
-         topology.put(nodeId, member);
+         topologyListeners.add(listener);
       }
-      else
+   }
+
+   public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+   {
+      if (log.isDebugEnabled())
       {
-         if(hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
+         log.debug(this + "::PPP Removing topology listener " + listener, new Exception("Trace"));
+      }
+      synchronized (topologyListeners)
+      {
+         topologyListeners.remove(listener);
+      }
+   }
+
+   public  boolean addMember(final String nodeId, final TopologyMember member, final boolean last)
+   {
+      boolean replaced = false;
+
+      synchronized (this)
+      {
+         TopologyMember currentMember = topology.get(nodeId);
+
+         if (Topology.log.isDebugEnabled())
          {
-            currentMember.getConnector().a =  member.getConnector().a;
-            replaced = true;
+            Topology.log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception("trace"));
+            Topology.log.debug(describe("Before:"));
          }
-         if(hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
+
+         if (currentMember == null)
          {
-            currentMember.getConnector().b =  member.getConnector().b;
             replaced = true;
+            if (Topology.log.isDebugEnabled())
+            {
+               Topology.log.debug("ZZZ " + this +
+                                  " MEMBER WAS NULL, Add member nodeId=" +
+                                  nodeId +
+                                  " member = " +
+                                  member +
+                                  " replaced = " +
+                                  replaced +
+                                  " size = " +
+                                  topology.size(), new Exception("trace"));
+            }
+            topology.put(nodeId, member);
          }
+         else
+         {
+            if (hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
+            {
+               currentMember.getConnector().a = member.getConnector().a;
+               replaced = true;
+            }
+            if (hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
+            {
+               currentMember.getConnector().b = member.getConnector().b;
+               replaced = true;
+            }
 
-         if(member.getConnector().a == null)
+            if (member.getConnector().a == null)
+            {
+               member.getConnector().a = currentMember.getConnector().a;
+            }
+            if (member.getConnector().b == null)
+            {
+               member.getConnector().b = currentMember.getConnector().b;
+            }
+         }
+
+         if (Topology.log.isDebugEnabled())
          {
-            member.getConnector().a = currentMember.getConnector().a;
+            Topology.log.debug(this + "::Topology updated=" + replaced);
+            Topology.log.debug(describe(this + "::After:"));
          }
-         if(member.getConnector().b == null)
+
+         if (Topology.log.isDebugEnabled())
          {
-            member.getConnector().b = currentMember.getConnector().b;
+            Topology.log.debug("ZZZ " + this +
+                               " Add member nodeId=" +
+                               nodeId +
+                               " member = " +
+                               member +
+                               " replaced = " +
+                               replaced +
+                               " size = " +
+                               topology.size(), new Exception("trace"));
          }
+
       }
-      if(debug)
+      
+      if (replaced)
       {
-         log.debug(this + "::Topology updated=" + replaced);
-         log.debug(describe(this + "::After:"));
+         ArrayList<ClusterTopologyListener> copy = copyListeners();
+         for (ClusterTopologyListener listener : copy)
+         {
+            if (Topology.log.isTraceEnabled())
+            {
+               Topology.log.trace("XXX ZZZ " + this + " informing " + listener + " about node up = " + nodeId);
+            }
+
+            listener.nodeUP(nodeId, member.getConnector(), last);
+         }
       }
+
+      return replaced;
+   }
+
+   /**
+    * @return
+    */
+   private ArrayList<ClusterTopologyListener> copyListeners()
+   {
+      ArrayList <ClusterTopologyListener> listenersCopy;
+      synchronized (topologyListeners)
+      {
+         listenersCopy = new ArrayList<ClusterTopologyListener>(topologyListeners);
+      }
+      return listenersCopy;
+   }
+
+   public boolean removeMember(final String nodeId)
+   {
+      TopologyMember member;
       
-      if (log.isDebugEnabled())
+      synchronized (this)
       {
-         log.debug("ZZZ " + this + " Add member nodeId=" + nodeId + " member = " + member + " replaced = " + replaced + " size = " + topology.size(), new Exception ("trace"));
+         member = topology.remove(nodeId);
       }
-      
-      return replaced;
+
+      if (member != null)
+      {
+         ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+         for (ClusterTopologyListener listener : copy)
+         {
+            if (Topology.log.isTraceEnabled())
+            {
+               Topology.log.trace("XXX ZZZ " + this + " informing " + listener + " about node down = " + nodeId);
+            }
+            listener.nodeDown(nodeId);
+         }
+      }
+
+      if (Topology.log.isDebugEnabled())
+      {
+         Topology.log.debug("ZZZ " + this +
+                            " removing nodeID=" +
+                            nodeId +
+                            ", result=" +
+                            member +
+                            ", size = " +
+                            topology.size(), new Exception("trace"));
+      }
+
+      return member != null;
    }
 
-   public synchronized boolean removeMember(String nodeId)
+   /**
+    * it will send all the member updates to listeners, independently of being changed or not
+    * @param nodeID
+    * @param member
+    */
+   public void sendMemberToListeners(String nodeID, TopologyMember member)
    {
-      TopologyMember member = topology.remove(nodeId);
-      if (log.isDebugEnabled())
+      // To make sure it was updated
+      addMember(nodeID, member, false);
+      
+      ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+      // Now force sending it
+      for (ClusterTopologyListener listener : copy)
       {
-         log.debug("ZZZ " + this + " removing nodeID=" + nodeId + ", result=" + member + ", size = " + topology.size(), new Exception ("trace"));
+         if (log.isDebugEnabled())
+         {
+            log.debug("Informing client listener " + listener +
+                      " about itself node " +
+                      nodeID +
+                      " with connector=" +
+                      member.getConnector());
+         }
+         listener.nodeUP(nodeID, member.getConnector(), false);
       }
-      return (member != null);
    }
 
-   public void sendTopology(ClusterTopologyListener listener)
+   public synchronized void sendTopology(final ClusterTopologyListener listener)
    {
       int count = 0;
+
       Map<String, TopologyMember> copy;
+
       synchronized (this)
       {
          copy = new HashMap<String, TopologyMember>(topology);
       }
+
       for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
       {
          listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
       }
    }
 
-   public TopologyMember getMember(String nodeID)
+   public TopologyMember getMember(final String nodeID)
    {
       return topology.get(nodeID);
    }
@@ -169,8 +307,6 @@
       int count = 0;
       for (TopologyMember member : topology.values())
       {
-         
-         // ARRUMAR ISSO
          if (member.getConnector().a != null)
          {
             count++;
@@ -182,12 +318,13 @@
       }
       return count;
    }
+
    public synchronized String describe()
    {
       return describe("");
    }
 
-   public synchronized String describe(String text)
+   public synchronized String describe(final String text)
    {
 
       String desc = text + "\n";
@@ -201,11 +338,11 @@
 
    public void clear()
    {
-      if (log.isDebugEnabled())
+      if (Topology.log.isDebugEnabled())
       {
-         log.debug("ZZZ " + this + "::clear", new Exception ("trace"));
+         Topology.log.debug("ZZZ III " + this + "::clear", new Exception("trace"));
       }
-      topology.clear();
+      // topology.clear();
    }
 
    public int members()
@@ -213,28 +350,30 @@
       return topology.size();
    }
 
-   private boolean hasChanged(TransportConfiguration currentConnector, TransportConfiguration connector)
+   public void setOwner(final Object owner)
    {
-      return (currentConnector == null && connector != null) || (currentConnector != null && !currentConnector.equals(connector));
+      this.owner = owner;
    }
 
-   public TransportConfiguration getBackupForConnector(TransportConfiguration connectorConfiguration)
+   private boolean hasChanged(final TransportConfiguration currentConnector, final TransportConfiguration connector)
    {
+      return currentConnector == null && connector != null ||
+             currentConnector != null &&
+             !currentConnector.equals(connector);
+   }
+
+   public TransportConfiguration getBackupForConnector(final TransportConfiguration connectorConfiguration)
+   {
       for (TopologyMember member : topology.values())
       {
-         if(member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
+         if (member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
          {
-            return member.getConnector().b;  
+            return member.getConnector().b;
          }
       }
       return null;
    }
 
-   public void setDebug(boolean b)
-   {
-      debug = b;
-   }
-
    /* (non-Javadoc)
     * @see java.lang.Object#toString()
     */
@@ -250,5 +389,5 @@
          return "Topology@" + Integer.toHexString(System.identityHashCode(this)) + "[owner=" + owner + "]";
       }
    }
-   
+
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -118,13 +118,18 @@
                {
                   public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
                   {
-                      channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+                      channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
                   }
                   
                   public void nodeDown(String nodeID)
                   {
-                      channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID));
+                      channel0.send(new ClusterTopologyChangeMessage(nodeID));
                   }
+                  
+                  public String toString()
+                  {
+                     return "Remote Proxy on channel " + Integer.toHexString(System.identityHashCode(this));
+                  }
                };
                
                final boolean isCC = msg.isClusterConnection();

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -105,7 +106,7 @@
 
    private final boolean routeWhenNoConsumers;
 
-   private final Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
+   private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<String, MessageFlowRecord>();
 
    private final ScheduledExecutorService scheduledExecutor;
 
@@ -498,7 +499,7 @@
 
    // ClusterTopologyListener implementation ------------------------------------------------------------------
 
-   public synchronized void nodeDown(final String nodeID)
+   public void nodeDown(final String nodeID)
    {
       if (log.isDebugEnabled())
       {
@@ -533,7 +534,7 @@
    }
 
 
-   public synchronized void nodeUP(final String nodeID,
+   public void nodeUP(final String nodeID,
                                    final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                    final boolean last)
    {

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -100,8 +100,6 @@
    // the cluster connections which links this node to other cluster nodes
    private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
 
-   private final Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
    private final Topology topology = new Topology(this);
 
    private volatile ServerLocatorInternal backupServerLocator;
@@ -258,10 +256,7 @@
       });
       started = false;
 
-      topologyListeners.clear();
       clusterConnections.clear();
-      topology.clear();
-
    }
 
    public void notifyNodeDown(String nodeID)
@@ -273,16 +268,8 @@
       
       log.debug("XXX " + this + "::removing nodeID=" + nodeID, new Exception ("trace"));
 
-      boolean removed = topology.removeMember(nodeID);
+      topology.removeMember(nodeID);
 
-      if (removed)
-      {
-
-         for (ClusterTopologyListener listener : topologyListeners)
-         {
-            listener.nodeDown(nodeID);
-         }
-      }
    }
 
    public void notifyNodeUp(final String nodeID,
@@ -296,7 +283,7 @@
       }
 
       TopologyMember member = new TopologyMember(connectorPair);
-      boolean updated = topology.addMember(nodeID, member);
+      boolean updated = topology.addMember(nodeID, member, last);
       
       if (!updated)
       {
@@ -307,11 +294,6 @@
          return;
       }
 
-      for (ClusterTopologyListener listener : topologyListeners)
-      {
-         listener.nodeUP(nodeID, member.getConnector(), last);
-      }
-
       if (log.isDebugEnabled())
       {
          log.debug("XXX " + this + " received notifyNodeUp nodeID=" + nodeID + " connectorPair=" + connectorPair + 
@@ -365,7 +347,7 @@
 
    public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
    {
-      topologyListeners.add(listener);
+      topology.addClusterTopologyListener(listener);
 
       // We now need to send the current topology to the client
       executor.execute(new Runnable(){
@@ -380,7 +362,7 @@
    public void removeClusterTopologyListener(final ClusterTopologyListener listener,
                                                           final boolean clusterConnection)
    {
-      topologyListeners.add(listener);
+      topology.removeClusterTopologyListener(listener);
    }
 
    public Topology getTopology()
@@ -398,14 +380,10 @@
          String nodeID = server.getNodeID().toString();
 
          TopologyMember member = topology.getMember(nodeID);
-         // we swap the topology backup now = live
-         if (member != null)
-         {
-            member.getConnector().a = member.getConnector().b;
+         //swap backup as live and send it to everybody
+         member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b, null));
+         topology.addMember(nodeID, member, false);
 
-            member.getConnector().b = null;
-         }
-
          if (backupServerLocator != null)
          {
             // todo we could use the topology of this to preempt it arriving from the cc
@@ -456,15 +434,8 @@
                log.warn("unable to start bridge " + bridge.getName(), e);
             }
          }
-
-         for (ClusterTopologyListener listener : topologyListeners)
-         {
-            if (log.isDebugEnabled())
-            {
-               log.debug("Informing client listener " + listener + " about itself node " + nodeID + " with connector=" + member.getConnector());
-            }
-            listener.nodeUP(nodeID, member.getConnector(), false);
-         }
+         
+         topology.sendMemberToListeners(nodeID, member);
       }
    }
 
@@ -519,7 +490,7 @@
                                                                                                  null));
          }
 
-         topology.addMember(nodeID, member);
+         topology.addMember(nodeID, member, false);
       }
       else
       {
@@ -532,13 +503,6 @@
             // pair.a = cc.getConnector();
          }
       }
-
-      // Propagate the announcement
-
-      for (ClusterTopologyListener listener : topologyListeners)
-      {
-         listener.nodeUP(nodeID, member.getConnector(), false);
-      }
    }
 
    private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -456,7 +456,7 @@
                                  }
                                  catch (Exception e)
                                  {
-                                    log.info("unable to restart server, please kill and restart manually", e);
+                                    log.warn("unable to restart server, please kill and restart manually", e);
                                  }
                               }
                            });
@@ -465,6 +465,7 @@
                      }
                      catch (Exception e)
                      {
+                        log.debug(e.getMessage(), e);
                         //hopefully it will work next call
                      }
                   }

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -265,14 +265,16 @@
       {
          if (nodes == topology.getMembers().size())
          {
-            return;
+            
+           log.info("ZZZ III look up for topology on " + topology + " size = " + topology.getMembers().size());
+           return;
          }
 
          Thread.sleep(10);
       }
       while (System.currentTimeMillis() - start < timeout);
       
-      String msg = "Timed out waiting for cluster topology of " + nodes + " (received " + topology.getMembers().size() + ") nodes on server = " + server + ")\n Current topology:" + topology.describe();
+      String msg = "ZZZ Timed out waiting for cluster topology of " + nodes + " (received " + topology.getMembers().size() + ") topology = " + topology + ")\n Current topology:" + topology.describe();
 
       ClusterTestBase.log.error(msg);
       
@@ -1988,6 +1990,7 @@
    {
       for (int node : nodes)
       {
+         log.info("#test start node " + node);
          servers[node].setIdentity("server " + node);
          ClusterTestBase.log.info("starting server " + servers[node]);
          servers[node].start();
@@ -1997,20 +2000,14 @@
          ClusterTestBase.log.info("started server " + node);
 
          waitForServer(servers[node]);
-         
-         for (int i = 0 ; i <= node; i++)
-         {
-            try
-            {
-               log.info("Describing Server " + servers[i]);
-               log.info(servers[i].describe());
-            }
-            catch (Throwable ignored)
-            {
-               
-            }
-         }
       }
+      
+      for (int node: nodes)
+      {
+         System.out.println(servers[node].describe());
+      }
+      
+      
    }
 
    protected void waitForServer(HornetQServer server)
@@ -2050,6 +2047,7 @@
       log.info("Stopping nodes "  + Arrays.toString(nodes));
       for (int node : nodes)
       {
+         log.info("#test stop server " + node);
          if (servers[node] != null && servers[node].isStarted())
          {
             try

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -149,7 +149,6 @@
       setupCluster();
 
       startServers(5, 0);
-      servers[0].getClusterManager().getTopology().setDebug(true);
       setupSessionFactory(0, isNetty());
 
       createQueue(0, "queues.testaddress", "queue0", null, false);

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -128,21 +128,12 @@
       stopServers(0, 1);
    }
 
-   public void _testLoop() throws Exception
-   {
-      for (int i = 0; i < 1000; i++)
-      {
-         log.info("#test " + i);
-         testStopStart();
-         tearDown();
-         setUp();
-      }
-   }
-
    public void testRestartTest() throws Throwable
    {
       startServers(0, 1);
       waitForTopology(servers[0], 2);
+      
+      log.info("ZZZ Server 0 " + servers[0].describe());
 
       // try
       // {
@@ -160,15 +151,30 @@
       for (int i = 0; i < 100; i++)
       {
          log.info("#stop #test #" + i);
+         Thread.sleep(500);
          stopServers(1);
          waitForTopology(servers[0], 1, 2000);
          log.info("#start #test #" + i);
+         Thread.sleep(500);
          startServers(1);
+         Thread.sleep(500);
          waitForTopology(servers[0], 2, 2000);
+         waitForTopology(servers[1], 2, 2000);
       }
 
    }
 
+   public void testLoop() throws Exception
+   {
+      for (int i = 0; i < 100; i++)
+      {
+         log.info("#test " + i);
+         testStopStart();
+         tearDown();
+         setUp();
+      }
+   }
+
    public void testStopStart() throws Exception
    {
       startServers(0, 1);

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2011-07-29 04:22:24 UTC (rev 11066)
@@ -18,6 +18,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
@@ -26,6 +27,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.cluster.ClusterConnection;
@@ -194,6 +196,8 @@
       startServers(0);
 
       ServerLocator locator = createHAServerLocator();
+      
+      ((ServerLocatorImpl)locator).getTopology().setOwner("ZZZ III testReceive");
 
       final List<String> nodes = new ArrayList<String>();
       final CountDownLatch upLatch = new CountDownLatch(5);
@@ -207,18 +211,32 @@
          {
             if(!nodes.contains(nodeID))
             {
+               System.out.println("Node UP " + nodeID + " added");
+               log.info("ZZZ III Node UP " + nodeID + " added");
                nodes.add(nodeID);
                upLatch.countDown();
             }
+            else
+            {
+               System.out.println("Node UP " + nodeID + " was already here");
+               log.info("ZZZ III Node UP " + nodeID + " was already here");
+            }
          }
 
          public void nodeDown(String nodeID)
          {
             if (nodes.contains(nodeID))
             {
+               log.info("ZZZ III Node down " + nodeID + " accepted");
+               System.out.println("Node down " + nodeID + " accepted");
                nodes.remove(nodeID);
                downLatch.countDown();
             }
+            else
+            {
+               log.info("ZZZ III Node down " + nodeID + " already removed");
+               System.out.println("Node down " + nodeID + " already removed");
+            }
          }
       });
 



More information about the hornetq-commits mailing list