[hornetq-commits] JBoss hornetq SVN: r9444 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq: core/client/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 21 06:17:34 EDT 2010


Author: jmesnil
Date: 2010-07-21 06:17:33 -0400 (Wed, 21 Jul 2010)
New Revision: 9444

Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA refactoring

* fix cluster code using discovery group (example/jms/clustered-queue passes again)
* ServerLocator and ClusterManager no longer implement ClusterTopologyListener as they /notify/ the listeners
* DiscoveryEntry keeps track of the nodeID to be able to notify listeners of the node which has joined the cluster
* in ServerLocatorImpl.connectorsChanged(), notify the listeners that a node is UP
  /!\ we must also keep track of previous entries to notify when a node is DOWN
* in HornetQServerImpl.initialisePart2(), starts the remoting service *before* the cluster manager. Otherwise the
  bridges created when the cluster is formed will not be able to connect to the server and the cluster will not be formed
  /!\ this is a modification with lots of implications... not sure about this one

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -15,6 +15,7 @@
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
 
@@ -660,5 +661,9 @@
    
    void unregisterTopologyListener(ClusterTopologyListener listener);
    
+   void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+   
+   void notifyNodeDown(String nodeID);
+
    boolean isHA();
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -1156,11 +1156,11 @@
             
             if (topMessage.isExit())
             {
-               serverLocator.nodeDown(topMessage.getNodeID());
+               serverLocator.notifyNodeDown(topMessage.getNodeID());
             }
             else
             {
-               serverLocator.nodeUP(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+               serverLocator.notifyNodeUP(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
             }
          }
       }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -297,6 +297,16 @@
 
             discoveryGroup.start();
          }
+         
+         if (initialConnectors != null)
+         {
+            System.out.println(">>>>>>>> Static initial connectors = " + Arrays.asList(initialConnectors));
+            for (int i = 0; i < initialConnectors.length; i++)
+            {
+            	// FIXME and now what do I do?
+               TransportConfiguration connector = initialConnectors[i];
+            }
+         }
 
          readOnly = true;
       }
@@ -1031,7 +1041,7 @@
       closed = true;
    }
 
-   public synchronized void nodeDown(final String nodeID)
+   public synchronized void notifyNodeDown(final String nodeID)
    {
       if (!ha)
       {
@@ -1059,7 +1069,7 @@
       }
    }
 
-   public synchronized void nodeUP(final String nodeID,
+   public synchronized void notifyNodeUP(final String nodeID,
                                    final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                    final boolean last)
    {
@@ -1114,9 +1124,11 @@
       for (DiscoveryEntry entry : newConnectors)
       {
          this.initialConnectors[count++] = entry.getConnector();
+
+         notifyNodeUP(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true);
       }
       
-      System.out.println(">>>>>>>> Initial connectors = " + Arrays.asList(initialConnectors));
+      System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
    }
 
    public synchronized void factoryClosed(final ClientSessionFactory factory)

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -15,7 +15,6 @@
 
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.ServerLocator;
 
 /**
@@ -25,7 +24,7 @@
  *
  *
  */
-public interface ServerLocatorInternal extends ServerLocator, ClusterTopologyListener
+public interface ServerLocatorInternal extends ServerLocator
 {
    void start() throws Exception;
    

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -24,17 +24,23 @@
  */
 public class DiscoveryEntry
 {
+   private final String nodeID;
    private final TransportConfiguration connector;
-
    private final long lastUpdate;
 
-   public DiscoveryEntry(final TransportConfiguration connector, final long lastUpdate)
+
+   public DiscoveryEntry(final String nodeID, final TransportConfiguration connector, final long lastUpdate)
    {
+      this.nodeID = nodeID;
       this.connector = connector;
-      
       this.lastUpdate = lastUpdate;
    }
 
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+
    public TransportConfiguration getConnector()
    {
       return connector;

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -330,7 +330,7 @@
 
                   connector.decode(buffer);
                  
-                  DiscoveryEntry entry = new DiscoveryEntry(connector, System.currentTimeMillis());
+                  DiscoveryEntry entry = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
 
                   DiscoveryEntry oldVal = connectors.put(originatingNodeID, entry);
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -16,7 +16,9 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.core.server.HornetQComponent;
 
@@ -28,7 +30,7 @@
  * Created 18 Nov 2008 09:23:26
  *
  */
-public interface ClusterManager extends HornetQComponent, ClusterTopologyListener
+public interface ClusterManager extends HornetQComponent
 {
    Map<String, Bridge> getBridges();
 
@@ -38,6 +40,10 @@
 
    Set<BroadcastGroup> getBroadcastGroups();
 
+   void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+   
+   void notifyNodeDown(String nodeID);
+
    void registerTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
    
    void unregisterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -490,7 +490,6 @@
          try
          {      
             csf = createSessionFactory();
-
             // Session is pre-acknowledge
             session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
 
@@ -501,9 +500,7 @@
             }
 
             producer = session.createProducer();
-
             session.addFailureListener(BridgeImpl.this);
-
             session.setSendAcknowledgementHandler(BridgeImpl.this);
 
             afterConnect();
@@ -511,7 +508,6 @@
             active = true;
 
             queue.addConsumer(BridgeImpl.this);
-
             queue.deliverAsync();
 
             BridgeImpl.log.info("Bridge " + name + " is connected to its destination");

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -265,8 +265,6 @@
 
    public synchronized void nodeDown(final String nodeID)
    {
-      server.getClusterManager().nodeDown(nodeID);
-
       //Remove the flow record for that node
       
       MessageFlowRecord record = records.remove(nodeID);
@@ -290,8 +288,6 @@
    {
       try
       {
-         server.getClusterManager().nodeUP(nodeID, connectorPair, false);
-
          MessageFlowRecord record = records.get(nodeID);
 
          if (record == null)
@@ -307,25 +303,23 @@
             if (queueBinding != null)
             {
                queue = (Queue)queueBinding.getBindable();
-
-               createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
             }
             else
             {
                // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
                // actually routed to at that address though
-
                queue = server.createQueue(queueName, queueName, null, true, false);
-
-               createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
             }
+            
+            createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
          }
          else
          {
-            if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
-            {
-               // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
-            }
+            // FIXME apple and orange comparison. I don't understand it...
+            //if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
+            // {
+            //   // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
+            //}
          }
       }
       catch (Exception e)
@@ -340,6 +334,7 @@
                                 final Queue queue,
                                 final boolean start) throws Exception
    {
+      System.out.println("ClusterConnectionImpl.createNewRecord() " + connector);
       MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
 
       Bridge bridge = new ClusterConnectionBridge(serverLocator,

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -325,7 +325,7 @@
 
    }
 
-   public synchronized void nodeDown(final String nodeID)
+   public synchronized void notifyNodeDown(final String nodeID)
    {
       topology.remove(nodeID);
 
@@ -335,7 +335,7 @@
       }
    }
 
-   public synchronized void nodeUP(final String nodeID,
+   public synchronized void notifyNodeUP(final String nodeID,
                                    final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                    final boolean last)
    {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-07-21 10:17:33 UTC (rev 9444)
@@ -1579,14 +1579,19 @@
          deploymentManager.start();
       }
 
+      // We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
+      // it is activated
+
+      // FIXME -- I inverted the order to start the remoting service before the cluster manager.
+      // when the cluster manager is started, it will form a cluster -> other nodes will then create bridges
+      // to connect to this server. If the remoting service is not started before, the connection will fail
+      // and the cluster will not be formed...
+      remotingService.start();
+
       clusterManager.start();
 
       initialised = true;
 
-      // We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
-      // it is activated
-
-      remotingService.start();
    }
 
    /**



More information about the hornetq-commits mailing list