[hornetq-commits] JBoss hornetq SVN: r11064 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: protocol/core/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jul 28 13:01:31 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-07-28 13:01:30 -0400 (Thu, 28 Jul 2011)
New Revision: 11064

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
test fixes

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-28 17:01:30 UTC (rev 11064)
@@ -78,7 +78,7 @@
 
    private StaticConnector staticConnector = new StaticConnector();
 
-   private final Topology topology = new Topology(this);
+   private final Topology topology;
 
    private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
 
@@ -358,11 +358,14 @@
       }
    }
 
-   private ServerLocatorImpl(final boolean useHA,
+   private ServerLocatorImpl(final Topology topology,
+                             final boolean useHA,
                              final DiscoveryGroupConfiguration discoveryGroupConfiguration,
                              final TransportConfiguration[] transportConfigs)
    {
       e.fillInStackTrace();
+      
+      this.topology = topology;
       this.ha = useHA;
 
       this.discoveryGroupConfiguration = discoveryGroupConfiguration;
@@ -440,7 +443,7 @@
     */
    public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
    {
-      this(useHA, groupConfiguration, null);
+      this(new Topology(null), useHA, groupConfiguration, null);
    }
 
    /**
@@ -450,9 +453,30 @@
     */
    public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
    {
-      this(useHA, null, transportConfigs);
+      this(new Topology(null), useHA, null, transportConfigs);
    }
 
+   /**
+    * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+    *
+    * @param discoveryAddress
+    * @param discoveryPort
+    */
+   public ServerLocatorImpl(final Topology topology, final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      this(topology, useHA, groupConfiguration, null);
+   }
+
+   /**
+    * Create a ServerLocatorImpl using a static list of live servers
+    *
+    * @param transportConfigs
+    */
+   public ServerLocatorImpl(final Topology topology, final boolean useHA, final TransportConfiguration... transportConfigs)
+   {
+      this(topology, useHA, null, transportConfigs);
+   }
+
    private TransportConfiguration selectConnector()
    {
       if (receivedTopology)
@@ -1187,7 +1211,7 @@
 
       if (log.isDebugEnabled())
       {
-         log.debug("XXX YYY nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
+         log.debug("XXX ZZZ nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
       }
 
       removed = topology.removeMember(nodeID);
@@ -1236,7 +1260,7 @@
 
       if (log.isDebugEnabled())
       {
-         log.debug("XXX YYY NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair);
+         log.debug("XXX ZZZ NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception ("trace"));
       }
 
       topology.addMember(nodeID, new TopologyMember(connectorPair));

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java	2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java	2011-07-28 17:01:30 UTC (rev 11064)
@@ -13,6 +13,7 @@
 package org.hornetq.core.client.impl;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -52,6 +53,7 @@
    public Topology(final Object owner)
    {
       this.owner = owner;
+      log.debug("ZZZ III Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception ("trace")); // Delete this line
    }
 
    /*
@@ -75,8 +77,12 @@
       }
       if(currentMember == null)
       {
+         replaced = true;
+        if (log.isDebugEnabled())
+         {
+            log.debug("ZZZ " + this + " MEMBER WAS NULL, Add member nodeId=" + nodeId + " member = " + member + " replaced = " + replaced + " size = " + topology.size(), new Exception ("trace"));
+         }
          topology.put(nodeId, member);
-         replaced = true;
       }
       else
       {
@@ -103,8 +109,14 @@
       if(debug)
       {
          log.debug(this + "::Topology updated=" + replaced);
-         log.debug(describe("After:"));
+         log.debug(describe(this + "::After:"));
       }
+      
+      if (log.isDebugEnabled())
+      {
+         log.debug("ZZZ " + this + " Add member nodeId=" + nodeId + " member = " + member + " replaced = " + replaced + " size = " + topology.size(), new Exception ("trace"));
+      }
+      
       return replaced;
    }
 
@@ -113,12 +125,12 @@
       TopologyMember member = topology.remove(nodeId);
       if (log.isDebugEnabled())
       {
-         log.debug("XXX " + this + " removing nodeID=" + nodeId + ", result=" + member, new Exception ("trace"));
+         log.debug("ZZZ " + this + " removing nodeID=" + nodeId + ", result=" + member + ", size = " + topology.size(), new Exception ("trace"));
       }
       return (member != null);
    }
 
-   public synchronized void sendTopology(ClusterTopologyListener listener)
+   public void sendTopology(ClusterTopologyListener listener)
    {
       int count = 0;
       Map<String, TopologyMember> copy;
@@ -144,14 +156,21 @@
 
    public Collection<TopologyMember> getMembers()
    {
-      return topology.values();
+      ArrayList<TopologyMember> members;
+      synchronized (this)
+      {
+         members = new ArrayList<TopologyMember>(topology.values());
+      }
+      return members;
    }
 
-   public int nodes()
+   public synchronized int nodes()
    {
       int count = 0;
       for (TopologyMember member : topology.values())
       {
+         
+         // ARRUMAR ISSO
          if (member.getConnector().a != null)
          {
             count++;
@@ -182,6 +201,10 @@
 
    public void clear()
    {
+      if (log.isDebugEnabled())
+      {
+         log.debug("ZZZ " + this + "::clear", new Exception ("trace"));
+      }
       topology.clear();
    }
 
@@ -224,7 +247,7 @@
       }
       else
       {
-         return "Topology [owner=" + owner + "]";
+         return "Topology@" + Integer.toHexString(System.identityHashCode(this)) + "[owner=" + owner + "]";
       }
    }
    

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-07-28 17:01:30 UTC (rev 11064)
@@ -118,12 +118,12 @@
                {
                   public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
                   {
-                     channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+                      channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
                   }
                   
                   public void nodeDown(String nodeID)
                   {
-                     channel0.send(new ClusterTopologyChangeMessage(nodeID));
+                      channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID));
                   }
                };
                

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-07-28 17:01:30 UTC (rev 11064)
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -34,7 +35,9 @@
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -70,7 +73,11 @@
    
    private static final boolean isTrace = log.isTraceEnabled();
 
-   private final org.hornetq.utils.ExecutorFactory executorFactory;
+   private final ExecutorFactory executorFactory;
+   
+   private final Topology clusterManagerTopology;
+   
+   private final Executor executor;
 
    private final HornetQServer server;
 
@@ -127,6 +134,7 @@
    private final ClusterManagerImpl manager;
    
    public ClusterConnectionImpl(final ClusterManagerImpl manager,
+                                final Topology clusterManagerTopology,
                                 final TransportConfiguration[] tcConfigs,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
@@ -183,6 +191,8 @@
       this.routeWhenNoConsumers = routeWhenNoConsumers;
 
       this.executorFactory = executorFactory;
+      
+      this.executor = executorFactory.getExecutor();
 
       this.server = server;
 
@@ -203,6 +213,8 @@
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
       
       this.manager = manager;
+      
+      this.clusterManagerTopology = clusterManagerTopology;
 
       clusterConnector = new StaticClusterConnector(tcConfigs);
 
@@ -219,6 +231,7 @@
    }
 
    public ClusterConnectionImpl(final ClusterManagerImpl manager,
+                                final Topology clusterManagerTopology,
                                 DiscoveryGroupConfiguration dg,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
@@ -275,6 +288,8 @@
       this.routeWhenNoConsumers = routeWhenNoConsumers;
 
       this.executorFactory = executorFactory;
+      
+      this.executor = executorFactory.getExecutor();
 
       this.server = server;
 
@@ -297,6 +312,8 @@
       clusterConnector = new DiscoveryClusterConnector(dg);
       
       this.manager = manager;
+      
+      this.clusterManagerTopology = clusterManagerTopology;
    }
 
    public synchronized void start() throws Exception
@@ -352,12 +369,18 @@
                                                          props);
             managementService.sendNotification(notification);
          }
+         
+         executor.execute(new Runnable(){
+            public void run()
+            {
+               if(serverLocator != null)
+               {
+                  serverLocator.close();
+                  serverLocator = null;
+               }
 
-         if(serverLocator != null)
-         {
-            serverLocator.close();
-            serverLocator = null;
-         }
+            }
+         });
 
          started = false;
       }
@@ -1258,7 +1281,7 @@
             {
                log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
             }
-            return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
+            return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
          }
          else
          {
@@ -1289,7 +1312,7 @@
 
       public ServerLocatorInternal createServerLocator()
       {
-         return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
+         return new ServerLocatorImpl(clusterManagerTopology, true, dg);
       }
    }
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-07-28 17:01:30 UTC (rev 11064)
@@ -245,13 +245,17 @@
          backupServerLocator = null;
       }
 
-      for (ServerLocator clusterLocator : clusterLocators)
+      executor.execute(new Runnable()
       {
-         log.info("WWW Closing clusterLocator " + clusterLocator);
-         clusterLocator.close();
-         log.info("WWW Closed clusterLocator " + clusterLocator);
-      }
-      clusterLocators.clear();
+         public void run()
+         {
+            for (ServerLocator clusterLocator : clusterLocators)
+            {
+               clusterLocator.close();
+            }
+            clusterLocators.clear();
+         }
+      });
       started = false;
 
       topologyListeners.clear();
@@ -829,6 +833,7 @@
          }
 
          clusterConnection = new ClusterConnectionImpl(this,
+                                                       topology,
                                                        dg,
                                                        connector,
                                                        new SimpleString(config.getName()),
@@ -865,6 +870,7 @@
          }
 
          clusterConnection = new ClusterConnectionImpl(this,
+                                                       topology,
                                                        tcConfigs,
                                                        connector,
                                                        new SimpleString(config.getName()),



More information about the hornetq-commits mailing list