[hornetq-commits] JBoss hornetq SVN: r10268 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: server/cluster and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 28 14:19:53 EST 2011


Author: ataylor
Date: 2011-02-28 14:19:53 -0500 (Mon, 28 Feb 2011)
New Revision: 10268

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
fixing unit tests

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-02-28 18:48:55 UTC (rev 10267)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-02-28 19:19:53 UTC (rev 10268)
@@ -147,7 +147,7 @@
                {
                   pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
                }
-               server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false);
+               server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, true);
             }
          }
       });

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-02-28 18:48:55 UTC (rev 10267)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-02-28 19:19:53 UTC (rev 10268)
@@ -15,6 +15,7 @@
 
 import java.util.Map;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -54,4 +55,6 @@
 
    // for debug
    String description();
+
+   void nodeAnnounced(String nodeID, Pair<TransportConfiguration,TransportConfiguration> connectorPair);
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-02-28 18:48:55 UTC (rev 10267)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-02-28 19:19:53 UTC (rev 10268)
@@ -50,7 +50,7 @@
 
    void notifyNodeDown(String nodeID);
 
-   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
+   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, boolean nodeAnnounce);
 
    Topology getTopology();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-02-28 18:48:55 UTC (rev 10267)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-02-28 19:19:53 UTC (rev 10268)
@@ -35,7 +35,6 @@
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
-import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.Bridge;
@@ -84,8 +83,6 @@
 
    private final Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
 
-   private final List<TransportConfiguration> conectorssss = new ArrayList<TransportConfiguration>();
-
    private final ScheduledExecutorService scheduledExecutor;
 
    private final int maxHops;
@@ -425,13 +422,13 @@
       {
          if(connectorPair.b != null)
          {
-            server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
+            server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
          }
          return;
       }
 
       // we propagate the node notifications to all cluster topology listeners
-      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
+      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
 
       // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
       if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
@@ -451,59 +448,120 @@
          return;
       }
 
-      Collection<TopologyMember> topologyMembers = serverLocator.getTopology().getMembers();
-      for (TopologyMember topologyMember : topologyMembers)
+      synchronized (records)
       {
-         if(topologyMember.getConnector().a != null && !conectorssss.contains(topologyMember.getConnector().a))
+         try
          {
-            if(!topologyMember.getConnector().a.equals(connector) && !topologyMember.getConnector().a.equals(connectorPair.a))
+            MessageFlowRecord record = records.get(nodeID);
+
+            if (record == null)
             {
-               log.debug("ClusterConnectionImpl.nodeUP");
+               // New node - create a new flow record
+
+               final SimpleString queueName = new SimpleString("sf." + name + "." + nodeID);
+
+               Binding queueBinding = postOffice.getBinding(queueName);
+
+               Queue queue;
+
+               if (queueBinding != null)
+               {
+                  queue = (Queue)queueBinding.getBindable();
+               }
+               else
+               {
+                  // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
+                  // actually routed to at that address though
+                  queue = server.createQueue(queueName, queueName, null, true, false);
+               }
+
+               createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
             }
+            else
+            {
+               // FIXME apple and orange comparison. I don't understand it...
+               //if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
+               // {
+               //   // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
+               //}
+            }
          }
+         catch (Exception e)
+         {
+            log.error("Failed to update topology", e);
+         }
       }
+   }
 
-      try
+   public void nodeAnnounced(final String nodeID,
+                                   final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
+   {
+      if (nodeID.equals(nodeUUID.toString()))
       {
-         MessageFlowRecord record = records.get(nodeID);
+         return;
+      }
 
-         if (record == null)
+      // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
+      if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
+      {
+         return;
+      }
+
+      // FIXME required to prevent cluster connections w/o discovery group
+      // and empty static connectors to create bridges... ulgy!
+      if (serverLocator == null)
+      {
+         return;
+      }
+      /*we dont create bridges to backups*/
+      if(connectorPair.a == null)
+      {
+         return;
+      }
+
+      synchronized (records)
+      {
+         try
          {
-            // New node - create a new flow record
+            MessageFlowRecord record = records.get(nodeID);
 
-            final SimpleString queueName = new SimpleString("sf." + name + "." + nodeID);
+            if (record == null)
+            {
+               // New node - create a new flow record
 
-            Binding queueBinding = postOffice.getBinding(queueName);
+               final SimpleString queueName = new SimpleString("sf." + name + "." + nodeID);
 
-            Queue queue;
+               Binding queueBinding = postOffice.getBinding(queueName);
 
-            if (queueBinding != null)
-            {
-               queue = (Queue)queueBinding.getBindable();
+               Queue queue;
+
+               if (queueBinding != null)
+               {
+                  queue = (Queue)queueBinding.getBindable();
+               }
+               else
+               {
+                  // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
+                  // actually routed to at that address though
+                  queue = server.createQueue(queueName, queueName, null, true, false);
+               }
+
+               createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
             }
             else
             {
-               // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
-               // actually routed to at that address though
-               queue = server.createQueue(queueName, queueName, null, true, false);
+               // FIXME apple and orange comparison. I don't understand it...
+               //if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
+               // {
+               //   // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
+               //}
             }
-
-            createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
-            conectorssss.add(connectorPair.a);
          }
-         else
+         catch (Exception e)
          {
-            // FIXME apple and orange comparison. I don't understand it...
-            //if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
-            // {
-            //   // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
-            //}
+            log.error("Failed to update topology", e);
          }
       }
-      catch (Exception e)
-      {
-         log.error("Failed to update topology", e);
-      }
    }
    
    private void createNewRecord(final String nodeID,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-02-28 18:48:55 UTC (rev 10267)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-02-28 19:19:53 UTC (rev 10268)
@@ -18,6 +18,7 @@
 import java.lang.reflect.Array;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 
@@ -102,6 +103,7 @@
    private volatile ServerLocatorInternal backupServerLocator;
 
    private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
+   private Executor executor;
 
    public ClusterManagerImpl(final ExecutorFactory executorFactory,
                              final HornetQServer server,
@@ -120,6 +122,8 @@
 
       this.executorFactory = executorFactory;
 
+      executor = executorFactory.getExecutor();
+
       this.server = server;
 
       this.postOffice = postOffice;
@@ -250,11 +254,13 @@
       }
    }
 
-   public void notifyNodeUp(String nodeID,
-                                   Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                                   boolean last)
+   public void notifyNodeUp(final String nodeID,
+                            final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                            final boolean last,
+                            final boolean nodeAnnounce)
    {
-      boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair));
+      TopologyMember member = new TopologyMember(connectorPair);
+      boolean updated = topology.addMember(nodeID, member);
 
       if(!updated)
       {
@@ -263,14 +269,23 @@
       
       for (ClusterTopologyListener listener : clientListeners)
       {
-         listener.nodeUP(nodeID, connectorPair, last);
+         listener.nodeUP(nodeID, member.getConnector(), last);
       }
 
 
       for (ClusterTopologyListener listener : clusterConnectionListeners)
       {
-         listener.nodeUP(nodeID, connectorPair, last);
+         listener.nodeUP(nodeID, member.getConnector(), last);
       }
+
+      //if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster connections.
+      if (nodeAnnounce)
+      {
+         for (ClusterConnection clusterConnection : clusterConnections.values())
+         {
+            clusterConnection.nodeAnnounced(nodeID, connectorPair);
+         }
+      }
    }
    
    public boolean isStarted()
@@ -836,7 +851,7 @@
          return;
       }
       log.info("announcing backup");
-      this.executorFactory.getExecutor().execute(new Runnable()
+      executor.execute(new Runnable()
       {
          public void run()
          {



More information about the hornetq-commits mailing list