[hornetq-commits] JBoss hornetq SVN: r9455 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq: core/client/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jul 22 09:01:26 EDT 2010


Author: jmesnil
Date: 2010-07-22 09:01:25 -0400 (Thu, 22 Jul 2010)
New Revision: 9455

Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA refactoring

* fix receiving list of initial connectors when using static connectors
* clean up ServerLocator interface and move topology-related methods to ServerLocatorInternal

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -15,7 +15,6 @@
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
 
@@ -656,14 +655,6 @@
     * Closes this factory and release all its resources
     */
    void close();
-   
-   void registerTopologyListener(ClusterTopologyListener listener);
-   
-   void unregisterTopologyListener(ClusterTopologyListener listener);
-   
-   void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
-   
-   void notifyNodeDown(String nodeID);
 
    boolean isHA();
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -1160,7 +1160,7 @@
             }
             else
             {
-               serverLocator.notifyNodeUP(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+               serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
             }
          }
       }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -297,16 +297,6 @@
 
             discoveryGroup.start();
          }
-         
-         if (initialConnectors != null)
-         {
-            System.out.println(">>>>>>>> Static initial connectors = " + Arrays.asList(initialConnectors));
-            for (int i = 0; i < initialConnectors.length; i++)
-            {
-            	// FIXME and now what do I do?
-               TransportConfiguration connector = initialConnectors[i];
-            }
-         }
 
          readOnly = true;
       }
@@ -432,6 +422,36 @@
       initialise();
    }
    
+   public void connect()
+   {
+      if (initialConnectors != null)
+      {
+         for (TransportConfiguration connector : initialConnectors)
+         {
+            ClientSessionFactory sf = null;
+            do
+            {
+               try
+               {
+                  sf = createSessionFactory(connector);
+               }
+               catch (HornetQException e)
+               {
+                  if (e.getCode() == HornetQException.NOT_CONNECTED)
+                  {
+                     continue;
+                  }
+               }
+               catch (Exception e)
+               {
+                  break;
+               }
+            }
+            while (sf == null);
+         }
+      }
+   }
+   
    public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
    {
       if (closed)
@@ -1069,7 +1089,7 @@
       }
    }
 
-   public synchronized void notifyNodeUP(final String nodeID,
+   public synchronized void notifyNodeUp(final String nodeID,
                                    final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                    final boolean last)
    {
@@ -1125,7 +1145,7 @@
       {
          this.initialConnectors[count++] = entry.getConnector();
 
-         notifyNodeUP(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true);
+         notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true);
       }
       
       System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
@@ -1146,12 +1166,12 @@
       }
    }
 
-   public void registerTopologyListener(final ClusterTopologyListener listener)
+   public void addClusterTopologyListener(final ClusterTopologyListener listener)
    {
       topologyListeners.add(listener);
    }
 
-   public void unregisterTopologyListener(final ClusterTopologyListener listener)
+   public void removeClusterTopologyListener(final ClusterTopologyListener listener)
    {
       topologyListeners.remove(listener);
    }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -13,8 +13,10 @@
 
 package org.hornetq.core.client.impl;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.ServerLocator;
 
 /**
@@ -34,4 +36,13 @@
 
    void setNodeID(String nodeID);
 
+   void connect();
+   
+   void addClusterTopologyListener(ClusterTopologyListener listener);
+   
+   void removeClusterTopologyListener(ClusterTopologyListener listener);
+   
+   void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+   
+   void notifyNodeDown(String nodeID);
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -30,7 +30,6 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
 import org.hornetq.core.remoting.CloseListener;
@@ -123,13 +122,13 @@
                
                final boolean isCC = msg.isClusterConnection();
                
-               server.getClusterManager().registerTopologyListener(listener, isCC);
+               server.getClusterManager().addClusterTopologyListener(listener, isCC);
                
                rc.addCloseListener(new CloseListener()
                {
                   public void connectionClosed()
                   {
-                     server.getClusterManager().unregisterTopologyListener(listener, isCC);
+                     server.getClusterManager().removeClusterTopologyListener(listener, isCC);
                   }
                });
             }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -53,4 +53,7 @@
    Pair<TransportConfiguration, TransportConfiguration>[] getTopology();
    
    TransportConfiguration getConnector();
+
+   // for debug
+   String description();
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -16,9 +16,7 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.core.server.HornetQComponent;
 
@@ -40,13 +38,9 @@
 
    Set<BroadcastGroup> getBroadcastGroups();
 
-   void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+   void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
    
-   void notifyNodeDown(String nodeID);
-
-   void registerTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+   void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
    
-   void unregisterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
-   
    void activate();
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -485,7 +485,7 @@
 
       do
       {
-         BridgeImpl.log.info("Connecting bridge " + name + " to its destination");
+         BridgeImpl.log.info("Connecting bridge " + name + " to its destination [" + nodeUUID.toString() + "]");
 
          try
          {      
@@ -510,7 +510,7 @@
             queue.addConsumer(BridgeImpl.this);
             queue.deliverAsync();
 
-            BridgeImpl.log.info("Bridge " + name + " is connected to its destination");
+            BridgeImpl.log.info("Bridge " + name + " is connected [" + nodeUUID + "-> " +  name +"]");
 
             return true;
          }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -164,9 +164,18 @@
          return;
       }
 
-      serverLocator.registerTopologyListener(this);
+      serverLocator.addClusterTopologyListener(this);
       serverLocator.start();
       
+      // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+      server.getExecutorFactory().getExecutor().execute(new Runnable()
+      {
+         public void run()
+         {
+            serverLocator.connect();
+         }
+      });
+      
       started = true;
 
       if (managementService != null)
@@ -187,7 +196,7 @@
          return;
       }
 
-      serverLocator.unregisterTopologyListener(this);
+      serverLocator.removeClusterTopologyListener(this);
 
       for (MessageFlowRecord record : records.values())
       {
@@ -334,7 +343,6 @@
                                 final Queue queue,
                                 final boolean start) throws Exception
    {
-      System.out.println("ClusterConnectionImpl.createNewRecord() " + connector);
       MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
 
       Bridge bridge = new ClusterConnectionBridge(serverLocator,
@@ -801,4 +809,18 @@
    {
       return records;
    }
+   
+   public String description()
+   {
+      String out = name + " connected to\n";
+      for (Entry<String, MessageFlowRecord> messageFlow : records.entrySet())
+      {
+         String nodeID = messageFlow.getKey();
+         Bridge bridge = messageFlow.getValue().getBridge();
+         
+         out += "\t" + nodeID + " -- " + bridge.isStarted() + "\n";
+      }
+      
+      return out;
+   }
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -91,6 +91,13 @@
 
    private final boolean clustered;
 
+   // FIXME why do we distinguish between client listeners and cluster connection listeners?
+   // They are both notified at the same time...
+   private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+   private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+
+   private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
+
    public ClusterManagerImpl(final ExecutorFactory executorFactory,
                              final HornetQServer server,
                              final PostOffice postOffice,
@@ -221,13 +228,7 @@
       return clusterConnections.get(name.toString());
    }
 
-   private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
-   private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
-   private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
-
-   public synchronized void registerTopologyListener(final ClusterTopologyListener listener,
+   public synchronized void addClusterTopologyListener(final ClusterTopologyListener listener,
                                                      final boolean clusterConnection)
    {
       if (clusterConnection)
@@ -248,7 +249,7 @@
       }
    }
 
-   public synchronized void unregisterTopologyListener(final ClusterTopologyListener listener,
+   public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener,
                                                        final boolean clusterConnection)
    {
       if (clusterConnection)
@@ -325,28 +326,6 @@
 
    }
 
-   public synchronized void notifyNodeDown(final String nodeID)
-   {
-      topology.remove(nodeID);
-
-      for (ClusterTopologyListener listener : clientListeners)
-      {
-         listener.nodeDown(nodeID);
-      }
-   }
-
-   public synchronized void notifyNodeUP(final String nodeID,
-                                   final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                                   final boolean last)
-   {
-      topology.put(nodeID, connectorPair);
-
-      for (ClusterTopologyListener listener : clientListeners)
-      {
-         listener.nodeUP(nodeID, connectorPair, false);
-      }
-   }
-
    private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
    {
       if (broadcastGroups.containsKey(config.getName()))

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-07-22 13:01:25 UTC (rev 9455)
@@ -146,7 +146,7 @@
    private volatile SimpleString nodeID;
 
    private volatile UUID uuid;
-
+   
    private final Version version;
 
    private final HornetQSecurityManager securityManager;
@@ -736,7 +736,7 @@
 
       started = true;
 
-      HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " started");
+      HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "]Êstarted");
 
       if (configuration.isBackup())
       {
@@ -909,7 +909,7 @@
             backupActivationThread.join();
          }
 
-         HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " stopped");
+         HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "]Êstopped");
 
          Logger.reset();
       }
@@ -1586,12 +1586,11 @@
       // when the cluster manager is started, it will form a cluster -> other nodes will then create bridges
       // to connect to this server. If the remoting service is not started before, the connection will fail
       // and the cluster will not be formed...
+      initialised = true;
+
       remotingService.start();
 
       clusterManager.start();
-
-      initialised = true;
-
    }
 
    /**



More information about the hornetq-commits mailing list