[hornetq-commits] JBoss hornetq SVN: r10064 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 21 03:05:48 EST 2010


Author: ataylor
Date: 2010-12-21 03:05:47 -0500 (Tue, 21 Dec 2010)
New Revision: 10064

Modified:
   trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   trunk/src/main/org/hornetq/core/client/impl/Topology.java
   trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
   trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
Log:
removed distance parameter from nodeup as not needed

Modified: trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -26,7 +26,7 @@
  */
 public interface ClusterTopologyListener
 {
-   void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+   void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
    
    void nodeDown(String nodeID);
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -1243,8 +1243,7 @@
             {
                serverLocator.notifyNodeUp(topMessage.getNodeID(),
                                           topMessage.getPair(),
-                                          topMessage.isLast(),
-                                          topMessage.getDistance());
+                                          topMessage.isLast());
             }
          }
       }

Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -1139,15 +1139,14 @@
 
    public synchronized void notifyNodeUp(final String nodeID,
                                          final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                                         final boolean last,
-                                         final int distance)
+                                         final boolean last)
    {
       if (!ha)
       {
          return;
       }
 
-      topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+      topology.addMember(nodeID, new TopologyMember(connectorPair));
 
       TopologyMember actMember = topology.getMember(nodeID);
 
@@ -1172,7 +1171,7 @@
 
       for (ClusterTopologyListener listener : topologyListeners)
       {
-         listener.nodeUP(nodeID, connectorPair, last, distance);
+         listener.nodeUP(nodeID, connectorPair, last);
       }
 
       // Notify if waiting on getting topology
@@ -1242,6 +1241,10 @@
    public void addClusterTopologyListener(final ClusterTopologyListener listener)
    {
       topologyListeners.add(listener);
+      if(topology.members() > 0)
+      {
+         System.out.println("ServerLocatorImpl.addClusterTopologyListener");
+      }
    }
 
    public void removeClusterTopologyListener(final ClusterTopologyListener listener)

Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -40,7 +40,7 @@
 
    ClientSessionFactory connect() throws  Exception;
 
-   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
 
    void notifyNodeDown(String nodeID);
 

Modified: trunk/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/Topology.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/Topology.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -47,7 +47,10 @@
       TopologyMember currentMember = topology.get(nodeId);
       if (debug)
       {
-         //System.out.println("member.getConnector() = " + member.getConnector());
+         if(member.getConnector().toString().contains("server-id=4"))
+         {
+            System.out.println("member.getConnector() = " + member.getConnector());
+         }
       }
       if(currentMember == null)
       {
@@ -90,7 +93,7 @@
       int count = 0;
       for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
       {
-         listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+         listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size());
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -27,27 +27,19 @@
 
    private final Pair<TransportConfiguration, TransportConfiguration> connector;
 
-   private final int distance;
-
-   public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
+   public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector)
    {
       this.connector = connector;
-      this.distance = distance;
    }
 
    public Pair<TransportConfiguration, TransportConfiguration> getConnector()
    {
       return connector;
    }
-
-   public int getDistance()
-   {
-      return distance;
-   }
    
    @Override
    public String toString()
    {
-      return "TopologyMember[distance=" + distance + ", connector=" + connector + "]";
+      return "TopologyMember[connector=" + connector + "]";
    }
 }

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -111,9 +111,9 @@
                
                final ClusterTopologyListener listener = new ClusterTopologyListener()
                {
-                  public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+                  public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
                   {
-                     channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last, distance + 1));
+                     channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
                   }
                   
                   public void nodeDown(String nodeID)
@@ -147,7 +147,7 @@
                {
                   pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
                }
-               server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
+               server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false);
             }
          }
       });

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -39,13 +39,11 @@
    
    private boolean last;
 
-   private int distance;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int distance)
+   public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
    {
       super(PacketImpl.CLUSTER_TOPOLOGY);
 
@@ -56,8 +54,6 @@
       this.last = last;
       
       this.exit = false;
-
-      this.distance = distance;
    }
    
    public ClusterTopologyChangeMessage(final String nodeID)
@@ -96,16 +92,6 @@
       return exit;
    }
 
-   public int getDistance()
-   {
-      return distance;
-   }
-
-   public void setDistance(int distance)
-   {
-      this.distance = distance;
-   }
-
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
@@ -132,7 +118,6 @@
             buffer.writeBoolean(false);
          }
          buffer.writeBoolean(last);
-         buffer.writeInt(distance);
       }
    }
 
@@ -167,7 +152,6 @@
          }
          pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
          last = buffer.readBoolean();
-         distance = buffer.readInt();
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -50,7 +50,7 @@
 
    void notifyNodeDown(String nodeID);
 
-   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
+   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
 
    Topology getTopology();
 

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -29,6 +29,7 @@
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -83,6 +84,8 @@
 
    private final Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
 
+   private final List<TransportConfiguration> conectorssss = new ArrayList<TransportConfiguration>();
+
    private final ScheduledExecutorService scheduledExecutor;
 
    private final int maxHops;
@@ -413,8 +416,7 @@
 
    public synchronized void nodeUP(final String nodeID,
                                    final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                                   final boolean last,
-                                   final int distance)
+                                   final boolean last)
    {
       // discard notifications about ourselves unless its from our backup
 
@@ -422,16 +424,16 @@
       {
          if(connectorPair.b != null)
          {
-            server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+            server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
          }
          return;
       }
 
       // we propagate the node notifications to all cluster topology listeners
-      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
 
       // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
-      if (allowDirectConnectionsOnly && distance > 1 && !allowableConnections.contains(connectorPair.a))
+      if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
       {
          return;
       }
@@ -448,6 +450,18 @@
          return;
       }
 
+      Collection<TopologyMember> topologyMembers = serverLocator.getTopology().getMembers();
+      for (TopologyMember topologyMember : topologyMembers)
+      {
+         if(topologyMember.getConnector().a != null && !conectorssss.contains(topologyMember.getConnector().a))
+         {
+            if(!topologyMember.getConnector().a.equals(connector) && !topologyMember.getConnector().a.equals(connectorPair.a))
+            {
+               System.out.println("ClusterConnectionImpl.nodeUP");
+            }
+         }
+      }
+
       try
       {
          MessageFlowRecord record = records.get(nodeID);
@@ -474,6 +488,7 @@
             }
 
             createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
+            conectorssss.add(connectorPair.a);
          }
          else
          {

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -253,10 +253,9 @@
 
    public void notifyNodeUp(String nodeID,
                                    Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                                   boolean last,
-                                   int distance)
+                                   boolean last)
    {
-      boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+      boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair));
 
       if(!updated)
       {
@@ -265,15 +264,13 @@
       
       for (ClusterTopologyListener listener : clientListeners)
       {
-         listener.nodeUP(nodeID, connectorPair, last, distance);
+         listener.nodeUP(nodeID, connectorPair, last);
       }
 
-      if (distance < topology.nodes())
+
+      for (ClusterTopologyListener listener : clusterConnectionListeners)
       {
-         for (ClusterTopologyListener listener : clusterConnectionListeners)
-         {
-            listener.nodeUP(nodeID, connectorPair, last, distance);
-         }
+         listener.nodeUP(nodeID, connectorPair, last);
       }
    }
    
@@ -407,12 +404,12 @@
 
          for (ClusterTopologyListener listener : clientListeners)
          {
-            listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+            listener.nodeUP(nodeID, member.getConnector(), false);
          }
 
          for (ClusterTopologyListener listener : clusterConnectionListeners)
          {
-            listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+            listener.nodeUP(nodeID, member.getConnector(), false);
          }
       }
    }
@@ -455,11 +452,11 @@
       {
          if (backup)
          {
-            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()), 0);
+            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()));
          }
          else
          {
-            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null), 0);
+            member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null));
          }
 
          topology.addMember(nodeID, member);
@@ -480,12 +477,12 @@
 
       for (ClusterTopologyListener listener : clientListeners)
       {
-         listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+         listener.nodeUP(nodeID, member.getConnector(), false);
       }
       
       for (ClusterTopologyListener listener : clusterConnectionListeners)
       {
-         listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+         listener.nodeUP(nodeID, member.getConnector(), false);
       }
 
    }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -149,7 +149,7 @@
       setupCluster();
 
       startServers(5, 0);
-
+      servers[0].getClusterManager().getTopology().setDebug(true);
       setupSessionFactory(0, isNetty());
 
       createQueue(0, "queues.testaddress", "queue0", null, false);

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -398,7 +398,7 @@
          this.latch = latch;
       }
 
-      public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+      public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
       {
          if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
          {

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -215,8 +215,7 @@
 
       public void nodeUP(String nodeID,
                          Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                         boolean last,
-                         int distance)
+                         boolean last)
       {
          if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
          {

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -203,8 +203,7 @@
       {
          public void nodeUP(String nodeID,
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last,
-                            int distance)
+                            boolean last)
          {
             if(!nodes.contains(nodeID))
             {
@@ -264,8 +263,7 @@
       {
          public void nodeUP(String nodeID,
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last,
-                            int distance)
+                            boolean last)
          {
             if (!nodes.contains(nodeID))
             {
@@ -337,8 +335,7 @@
       {
          public void nodeUP(String nodeID,
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last,
-                            int distance)
+                            boolean last)
          {
             if (!nodes.contains(nodeID))
             {
@@ -420,8 +417,7 @@
       {
          public void nodeUP(String nodeID, 
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last,
-                            int distance)
+                            boolean last)
          {
             if (!nodes.contains(nodeID))
             {

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java	2010-12-21 08:05:47 UTC (rev 10064)
@@ -298,8 +298,7 @@
 
       public void nodeUP(String nodeID,
                          Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                         boolean last,
-                         int distance)
+                         boolean last)
       {
          if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
          {



More information about the hornetq-commits mailing list