[hornetq-commits] JBoss hornetq SVN: r9527 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: protocol/core/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 11 05:15:57 EDT 2010


Author: jmesnil
Date: 2010-08-11 05:15:56 -0400 (Wed, 11 Aug 2010)
New Revision: 9527

Added:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
HA refactoring

* various fixes

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -44,6 +44,7 @@
 import org.hornetq.core.cluster.DiscoveryListener;
 import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -426,7 +427,7 @@
       initialise();
    }
    
-   public void connect()
+   public void connect(boolean backup, TransportConfiguration transportConfig)
    {
       if (initialConnectors != null)
       {
@@ -438,6 +439,11 @@
                try
                {
                   sf = createSessionFactory(connector);
+                  if (sf != null)
+                  {
+                     ClientSessionFactoryInternal internalSF = (ClientSessionFactoryInternal)sf;
+                     internalSF.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeID, backup, transportConfig));
+                  }
                }
                catch (HornetQException e)
                {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -35,9 +35,9 @@
    TransportConfiguration getBackup( TransportConfiguration live);
 
    void setNodeID(String nodeID);
-
-   void connect();
    
+   void connect(boolean backup, TransportConfiguration transportConfig);
+   
    void addClusterTopologyListener(ClusterTopologyListener listener);
    
    void removeClusterTopologyListener(ClusterTopologyListener listener);

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -30,6 +30,7 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
 import org.hornetq.core.remoting.CloseListener;
@@ -118,6 +119,11 @@
                   {
                      channel0.send(new ClusterTopologyChangeMessage(nodeID));
                   }
+                  
+                  public String toString()
+                  {
+                     return "ClusterTopologyListener[address=" + connection.getRemoteAddress() + "]";
+                  };
                };
                
                final boolean isCC = msg.isClusterConnection();
@@ -132,6 +138,22 @@
                   }
                });
             }
+            else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
+            {
+               NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
+               TransportConfiguration connector = msg.getConnector();
+               boolean backup = msg.isBackup();
+               Pair<TransportConfiguration, TransportConfiguration> pair;
+               if (backup)
+               {
+                  pair = new Pair<TransportConfiguration, TransportConfiguration>(null, connector);
+               }
+               else
+               {
+                  pair = new Pair<TransportConfiguration, TransportConfiguration>(connector, null);
+               }
+               server.getClusterManager().announceNode(msg.getNodeID(), pair);
+            }
          }
       });
       

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -21,6 +21,7 @@
 import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -90,6 +91,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -495,6 +497,11 @@
             packet = new ClusterTopologyChangeMessage();
             break;
          }
+         case NODE_ANNOUNCE:
+         {
+            packet = new NodeAnnounceMessage();
+            break;
+         }
          case SUBSCRIBE_TOPOLOGY:
          {
             packet = new SubscribeClusterTopologyUpdatesMessage();

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -185,7 +185,9 @@
    // HA
    
    public static final byte CLUSTER_TOPOLOGY = 110;
-   
+
+   public static final byte NODE_ANNOUNCE = 111;
+
    public static final byte SUBSCRIBE_TOPOLOGY = 112;
 
    // Static --------------------------------------------------------

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NodeAnnounceMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
+
+   // Attributes ----------------------------------------------------
+
+   private String nodeID;
+   
+   private boolean backup;
+   
+   private TransportConfiguration connector;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+   {
+      super(PacketImpl.NODE_ANNOUNCE);
+
+      this.nodeID = nodeID;
+      
+      this.backup = backup;
+      
+      this.connector = tc;
+   }
+
+   public NodeAnnounceMessage()
+   {
+      super(PacketImpl.NODE_ANNOUNCE);
+   }
+
+   // Public --------------------------------------------------------
+
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+   
+   public boolean isBackup()
+   {
+      return backup;
+   }
+   
+   public TransportConfiguration getConnector()
+   {
+      return connector;
+   }
+   
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeString(nodeID);
+      buffer.writeBoolean(backup);
+      connector.encode(buffer);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      this.nodeID = buffer.readString();
+      this.backup = buffer.readBoolean();
+      connector = new TransportConfiguration();
+      connector.decode(buffer);
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -18,6 +18,7 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
@@ -29,7 +30,7 @@
  *
  *
  */
-public interface ClusterConnection extends HornetQComponent
+public interface ClusterConnection extends HornetQComponent, ClusterTopologyListener
 {
    SimpleString getName();
 
@@ -56,4 +57,6 @@
 
    // for debug
    String description();
+
+   void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b);
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -16,7 +16,9 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.core.server.HornetQComponent;
 
@@ -43,4 +45,10 @@
    void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
    
    void activate();
+
+   void notifyClientsNodeDown(String nodeID);
+
+   void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b);
+
+   void announceNode(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair);
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -29,6 +29,7 @@
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
 import org.hornetq.core.logging.Logger;
@@ -65,7 +66,7 @@
 
    // Attributes ----------------------------------------------------
 
-   protected final ServerLocator serverLocator;
+   protected final ServerLocatorInternal serverLocator;
    
    private final UUID nodeUUID;
 
@@ -109,7 +110,7 @@
 
    // Public --------------------------------------------------------
 
-   public BridgeImpl(final ServerLocator serverLocator,
+   public BridgeImpl(final ServerLocatorInternal serverLocator,
                      final UUID nodeUUID,
                      final SimpleString name,
                      final Queue queue,
@@ -589,6 +590,8 @@
             {
                queue.deliverAsync();
             }
+            
+            log.info("stopped bridge " + name);
          }
          catch (Exception e)
          {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -29,6 +29,7 @@
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.persistence.StorageManager;
@@ -63,8 +64,11 @@
    
    private final TransportConfiguration connector;
 
-   public ClusterConnectionBridge(final ServerLocator serverLocator,
+   private final String targetNodeID;
+
+   public ClusterConnectionBridge(final ServerLocatorInternal serverLocator,
                                   final UUID nodeUUID,
+                                  final String targetNodeID,
                                   final SimpleString name,
                                   final Queue queue,
                                   final Executor executor,
@@ -99,6 +103,7 @@
 
       idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
 
+      this.targetNodeID = targetNodeID;
       this.managementAddress = managementAddress;
       this.managementNotificationAddress = managementNotificationAddress;
       this.flowRecord = flowRecord;
@@ -224,4 +229,20 @@
       return serverLocator.createSessionFactory(connector);      
    }
 
+   @Override
+   public void connectionFailed(HornetQException me)
+   {
+      try
+      {
+         session.cleanUp();
+      }
+      catch (Exception e)
+      {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+      }
+      serverLocator.notifyNodeDown(targetNodeID);
+      super.connectionFailed(me);
+   }
+
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -26,7 +26,6 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -58,7 +57,7 @@
  *
  *
  */
-public class ClusterConnectionImpl implements ClusterConnection, ClusterTopologyListener
+public class ClusterConnectionImpl implements ClusterConnection
 {
    private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
 
@@ -129,7 +128,10 @@
       
       this.serverLocator = serverLocator;
 
-      this.serverLocator.setClusterConnection(true);
+      if (this.serverLocator != null)
+      {
+         this.serverLocator.setClusterConnection(true);
+      }
 
       this.connector = connector;
 
@@ -167,17 +169,20 @@
          return;
       }
 
-      serverLocator.addClusterTopologyListener(this);
-      serverLocator.start();
-      
-      // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
-      server.getExecutorFactory().getExecutor().execute(new Runnable()
+      if (serverLocator != null)
       {
-         public void run()
+         serverLocator.addClusterTopologyListener(this);
+         serverLocator.start();
+
+         // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+         server.getExecutorFactory().getExecutor().execute(new Runnable()
          {
-            serverLocator.connect();
-         }
-      });
+            public void run()
+            {
+               serverLocator.connect(server.getConfiguration().isBackup(), connector);
+            }
+         });
+      }
       
       started = true;
 
@@ -198,9 +203,12 @@
       {
          return;
       }
+      
+      if (serverLocator != null)
+      {
+         serverLocator.removeClusterTopologyListener(this);
+      }
 
-      serverLocator.removeClusterTopologyListener(this);
-
       for (MessageFlowRecord record : records.values())
       {
          try
@@ -212,6 +220,12 @@
          }
       }
 
+      if (serverLocator != null)
+      {
+         serverLocator.close();
+      }
+
+
       if (managementService != null)
       {
          TypedProperties props = new TypedProperties();
@@ -277,6 +291,13 @@
 
    public synchronized void nodeDown(final String nodeID)
    {
+      if (nodeID.equals(nodeUUID.toString()))
+      {
+         return;
+      }
+      
+      server.getClusterManager().notifyClientsNodeDown(nodeID);
+      
       //Remove the flow record for that node
       
       MessageFlowRecord record = records.remove(nodeID);
@@ -285,6 +306,7 @@
       {
          try
          {
+            record.reset();
             record.close();
          }
          catch (Exception e)
@@ -298,6 +320,13 @@
                                    final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                    final boolean last)
    {
+      if (nodeID.equals(nodeUUID.toString()))
+      {
+         return;
+      }
+      
+      server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false);
+      
       try
       {
          MessageFlowRecord record = records.get(nodeID);
@@ -339,6 +368,21 @@
          log.error("Failed to update topology", e);
       }
    }
+   
+   public void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b)
+   {
+      TransportConfiguration connector = (backup) ? pair.b : pair.a;
+      if (serverLocator!= null && serverLocator.getStaticTransportConfigurations() != null)
+      {
+         for (TransportConfiguration staticConnector : serverLocator.getStaticTransportConfigurations())
+         {
+            if (connector.equals(staticConnector))
+            {
+               nodeUP(nodeID, pair, false);
+            }
+         }
+      }
+   }
 
    private void createNewRecord(final String nodeID,
                                 final TransportConfiguration connector,
@@ -350,6 +394,7 @@
 
       Bridge bridge = new ClusterConnectionBridge(serverLocator,
                                                   nodeUUID,
+                                                  nodeID,
                                                   queueName,
                                                   queue,
                                                   executorFactory.getExecutor(),

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-08-11 09:15:56 UTC (rev 9527)
@@ -30,7 +30,6 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.BroadcastGroupConfiguration;
@@ -69,8 +68,6 @@
 
    private final Map<String, Bridge> bridges = new HashMap<String, Bridge>();
 
-   private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
-
    private final ExecutorFactory executorFactory;
 
    private final HornetQServer server;
@@ -91,11 +88,23 @@
 
    private final boolean clustered;
 
-   // FIXME why do we distinguish between client listeners and cluster connection listeners?
-   // They are both notified at the same time...
+   // the cluster connections which links this node to other cluster nodes
+   private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
+
+   // regular client listeners to be notified of cluster topology changes.
+   // they correspond to regular clients using a HA ServerLocator
    private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+   
+   // cluster connections listeners to be notified of cluster topology changes
+   // they correspond to cluster connections on *other nodes connected to this one*
    private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
 
+   /*
+    * topology describes the other cluster nodes that this server knows about:
+    * 
+    * keys are node IDs
+    * values are a pair of live/backup transport configurations
+    */
    private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
 
    public ClusterManagerImpl(final ExecutorFactory executorFactory,
@@ -203,6 +212,40 @@
       started = false;
    }
 
+   public void notifyClientsNodeDown(String nodeID)
+   {
+      topology.remove(nodeID);
+
+      for (ClusterTopologyListener listener : clientListeners)
+      {
+         listener.nodeDown(nodeID);
+      }
+   }
+
+   public void notifyClientsNodeUp(String nodeID,
+                                   Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                                   boolean last)
+   {
+      topology.put(nodeID, connectorPair);
+
+      for (ClusterTopologyListener listener : clientListeners)
+      {
+         listener.nodeUP(nodeID, connectorPair, last);
+      }
+   }
+   
+   public void announceNode(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair)
+   {
+      /*
+      System.out.println("node " + server.getNodeID() + " announces " + nodeID + " to its cluster connections " + clusterConnections.keySet());
+      for (ClusterConnection clusterConnection : clusterConnections.values())
+      {
+         clusterConnection.announce(nodeID, pair, false);
+      }
+      */
+      
+   }
+   
    public boolean isStarted()
    {
       return started;
@@ -231,6 +274,7 @@
    public synchronized void addClusterTopologyListener(final ClusterTopologyListener listener,
                                                      final boolean clusterConnection)
    {
+      System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " " + clusterConnection + " " + listener);
       if (clusterConnection)
       {
          this.clusterConnectionListeners.add(listener);
@@ -241,10 +285,10 @@
       }
 
       // We now need to send the current topology to the client
-
       int count = 0;
       for (Map.Entry<String, Pair<TransportConfiguration, TransportConfiguration>> entry : topology.entrySet())
       {
+         System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " -- " + entry);
          listener.nodeUP(entry.getKey(), entry.getValue(), ++count == topology.size());
       }
    }
@@ -318,7 +362,7 @@
       {
          listener.nodeUP(nodeID, pair, false);
       }
-
+      
       for (ClusterTopologyListener listener : clusterConnectionListeners)
       {
          listener.nodeUP(nodeID, pair, false);
@@ -454,7 +498,7 @@
 
       Queue queue = (Queue)binding.getBindable();
 
-      ServerLocator serverLocator;
+      ServerLocatorInternal serverLocator;
 
       if (config.getDiscoveryGroupName() != null)
       {
@@ -470,12 +514,12 @@
 
          if (config.isHA())
          {
-            serverLocator = HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration.getGroupAddress(),
+            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration.getGroupAddress(),
                                                                     discoveryGroupConfiguration.getGroupPort());
          }
          else
          {
-            serverLocator = HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration.getGroupAddress(),
+            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration.getGroupAddress(),
                                                                        discoveryGroupConfiguration.getGroupPort());
          }
 
@@ -491,11 +535,11 @@
 
          if (config.isHA())
          {
-            serverLocator = HornetQClient.createServerLocatorWithHA(tcConfigs);
+            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
          }
          else
          {
-            serverLocator = HornetQClient.createServerLocatorWithoutHA(tcConfigs);
+            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
          }
 
       }
@@ -561,8 +605,9 @@
          TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
 
          serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
+         serverLocator.setNodeID(nodeUUID.toString());
       }
-      else
+      else if (config.getDiscoveryGroupName() != null)
       {
          DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
                                                        .get(config.getDiscoveryGroupName());
@@ -574,9 +619,15 @@
          }
 
          serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(), dg.getGroupPort());
+         serverLocator.setNodeID(nodeUUID.toString());
       }
+      else
+      {
+         // no connector or discovery group are defined. The cluster connection will only be a target and will
+         // no connect to other nodes in the cluster
+         serverLocator = null;
+      }
 
-      serverLocator.setNodeID(nodeUUID.toString());
       ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
                                                                       connector,
                                                                       new SimpleString(config.getName()),



More information about the hornetq-commits mailing list