[hornetq-commits] JBoss hornetq SVN: r10031 - in trunk: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 10 09:56:36 EST 2010


Author: ataylor
Date: 2010-12-10 09:56:36 -0500 (Fri, 10 Dec 2010)
New Revision: 10031

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
Log:
moved creation of server locator into cluster connection

Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-12-10 14:56:36 UTC (rev 10031)
@@ -139,6 +139,8 @@
 
    private volatile boolean closed;
 
+   private volatile boolean closing;
+
    private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
 
    private static ExecutorService globalThreadPool;
@@ -408,9 +410,27 @@
       }
    }
 
-   public void start() throws Exception
+   public void start(Executor executor) throws Exception
    {
       initialise();
+
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               connect();
+            }
+            catch (Exception e)
+            {
+               if(!closing)
+               {
+                  log.warn("did not connect the cluster connection to other nodes", e);
+               }
+            }
+         }
+      });
    }
 
    public ClientSessionFactory connect() throws Exception
@@ -1000,6 +1020,8 @@
          return;
       }
 
+      closing = true;
+
       if (discoveryGroup != null)
       {
          try
@@ -1057,6 +1079,7 @@
             }
          }
       }
+      readOnly = false;
 
       closed = true;
    }

Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-12-10 14:56:36 UTC (rev 10031)
@@ -19,6 +19,8 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ServerLocator;
 
+import java.util.concurrent.Executor;
+
 /**
  * A ServerLocatorInternal
  *
@@ -28,7 +30,7 @@
  */
 public interface ServerLocatorInternal extends ServerLocator
 {
-   void start() throws Exception;
+   void start(Executor executor) throws Exception;
    
    void factoryClosed(final ClientSessionFactory factory);
 

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-12-10 14:56:36 UTC (rev 10031)
@@ -16,17 +16,16 @@
 import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
 import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -35,6 +34,7 @@
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.Bridge;
@@ -75,6 +75,8 @@
 
    private final SimpleString address;
 
+   private final long retryInterval;
+
    private final boolean useDuplicateDetection;
 
    private final boolean routeWhenNoConsumers;
@@ -95,7 +97,9 @@
 
    private final String clusterPassword;
 
-   private final ServerLocatorInternal serverLocator;
+   private final ClusterConnector clusterConnector;
+
+   private ServerLocatorInternal serverLocator;
    
    private final TransportConfiguration connector;
 
@@ -103,7 +107,7 @@
 
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
    
-   public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
+   public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
                                 final SimpleString address,
@@ -130,39 +134,89 @@
       }
 
       this.nodeUUID = nodeUUID;
-      
-      this.serverLocator = serverLocator;
 
+      this.connector = connector;
+
+      this.name = name;
+
+      this.address = address;
+
+      this.retryInterval = retryInterval;
+
+      this.useDuplicateDetection = useDuplicateDetection;
+
+      this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+      this.executorFactory = executorFactory;
+
+      this.server = server;
+
+      this.postOffice = postOffice;
+
+      this.managementService = managementService;
+
+      this.scheduledExecutor = scheduledExecutor;
+
+      this.maxHops = maxHops;
+
+      this.backup = backup;
+
+      this.clusterUser = clusterUser;
+
+      this.clusterPassword = clusterPassword;
+
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
-      if (this.serverLocator != null)
+
+      clusterConnector = new StaticClusterConnector(tcConfigs);
+
+      if (tcConfigs != null && tcConfigs.length > 0)
       {
-         this.serverLocator.setClusterConnection(true);
-         this.serverLocator.setClusterTransportConfiguration(connector);
-         this.serverLocator.setBackup(server.getConfiguration().isBackup());
-         this.serverLocator.setInitialConnectAttempts(-1);
-         if(retryInterval > 0)
-         {
-            this.serverLocator.setRetryInterval(retryInterval);
-         }
-         
          // a cluster connection will connect to other nodes only if they are directly connected
          // through a static list of connectors or broadcasting using UDP.
-         TransportConfiguration[] transportConfigurations = serverLocator.getStaticTransportConfigurations();
-         if(this.allowDirectConnectionsOnly)
+         if(allowDirectConnectionsOnly)
          {
-            for (TransportConfiguration transportConfiguration : transportConfigurations)
-            {
-               allowableConnections.add(transportConfiguration);
-            }
+            allowableConnections.addAll(Arrays.asList(tcConfigs));
          }
       }
 
+   }
+
+   public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
+                                final TransportConfiguration connector,
+                                final SimpleString name,
+                                final SimpleString address,
+                                final long retryInterval,
+                                final boolean useDuplicateDetection,
+                                final boolean routeWhenNoConsumers,
+                                final int confirmationWindowSize,
+                                final ExecutorFactory executorFactory,
+                                final HornetQServer server,
+                                final PostOffice postOffice,
+                                final ManagementService managementService,
+                                final ScheduledExecutorService scheduledExecutor,
+                                final int maxHops,
+                                final UUID nodeUUID,
+                                final boolean backup,
+                                final String clusterUser,
+                                final String clusterPassword,
+                                final boolean allowDirectConnectionsOnly) throws Exception
+   {
+
+      if (nodeUUID == null)
+      {
+         throw new IllegalArgumentException("node id is null");
+      }
+
+      this.nodeUUID = nodeUUID;
+
       this.connector = connector;
 
       this.name = name;
 
       this.address = address;
 
+      this.retryInterval = retryInterval;
+
       this.useDuplicateDetection = useDuplicateDetection;
 
       this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -184,6 +238,10 @@
       this.clusterUser = clusterUser;
 
       this.clusterPassword = clusterPassword;
+
+      this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+
+      clusterConnector = new DiscoveryClusterConnector(dg);
    }
 
    public synchronized void start() throws Exception
@@ -238,6 +296,12 @@
             managementService.sendNotification(notification);
          }
 
+         if(serverLocator != null)
+         {
+            serverLocator.close();
+            serverLocator = null;
+         }
+
          started = false;
       }
    }
@@ -279,29 +343,28 @@
 
       backup = false;
 
+      serverLocator = clusterConnector.createServerLocator();
 
+
       if (serverLocator != null)
       {
+         serverLocator.setNodeID(nodeUUID.toString());
+
+         serverLocator.setReconnectAttempts(-1);
+
+         serverLocator.setClusterConnection(true);
+         serverLocator.setClusterTransportConfiguration(connector);
+         serverLocator.setBackup(server.getConfiguration().isBackup());
+         serverLocator.setInitialConnectAttempts(-1);
+
+         if(retryInterval > 0)
+         {
+            this.serverLocator.setRetryInterval(retryInterval);
+         }
+
          serverLocator.addClusterTopologyListener(this);
-         serverLocator.start();
-         // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
-         server.getExecutorFactory().getExecutor().execute(new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  serverLocator.connect();
-               }
-               catch (Exception e)
-               {
-                  if(started)
-                  {
-                     log.warn("did not connect the cluster connection to other nodes", e);
-                  }
-               }
-            }
-         });
+
+         serverLocator.start(server.getExecutorFactory().getExecutor());
       }
 
       if (managementService != null)
@@ -909,4 +972,46 @@
       
       return out;
    }
+
+   interface ClusterConnector
+   {
+      ServerLocatorInternal createServerLocator();
+   }
+
+   private class StaticClusterConnector implements ClusterConnector
+   {
+      private final TransportConfiguration[] tcConfigs;
+
+      public StaticClusterConnector(TransportConfiguration[] tcConfigs)
+      {
+         this.tcConfigs = tcConfigs;
+      }
+
+      public ServerLocatorInternal createServerLocator()
+      {
+         if(tcConfigs != null && tcConfigs.length > 0)
+         {
+            return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
+         }
+         else
+         {
+            return null;
+         }
+      }
+   }
+
+   private class DiscoveryClusterConnector implements ClusterConnector
+   {
+      private final DiscoveryGroupConfiguration dg;
+
+      public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg)
+      {
+         this.dg = dg;
+      }
+
+      public ServerLocatorInternal createServerLocator()
+      {
+         return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
+      }
+   }
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-12-10 14:56:36 UTC (rev 10031)
@@ -732,18 +732,16 @@
          return;
       }
 
-      ServerLocatorInternal serverLocator;
 
-      if (config.getStaticConnectors() != null && config.getStaticConnectors().size() > 0)
+      if(clusterConnections.containsKey(config.getName()))
       {
-         TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
+         log.warn("Clustwer Configuration  '" + config.getConnectorName() + "' already exists. The cluster connection will not be deployed.");
+         return;
+      }
 
-         serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
-         serverLocator.setNodeID(nodeUUID.toString());
-         serverLocator.setReconnectAttempts(-1);
-         clusterLocators.add(serverLocator);
-      }
-      else if (config.getDiscoveryGroupName() != null)
+      ClusterConnectionImpl clusterConnection;
+
+      if (config.getDiscoveryGroupName() != null)
       {
          DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
                                                        .get(config.getDiscoveryGroupName());
@@ -754,38 +752,51 @@
                                         "'. The cluster connection will not be deployed.");
          }
 
-         serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg);
-         serverLocator.setNodeID(nodeUUID.toString());
-         serverLocator.setReconnectAttempts(-1);
-         clusterLocators.add(serverLocator);
+         clusterConnection = new ClusterConnectionImpl(dg,
+                                                       connector,
+                                                       new SimpleString(config.getName()),
+                                                       new SimpleString(config.getAddress()),
+                                                       config.getRetryInterval(),
+                                                       config.isDuplicateDetection(),
+                                                       config.isForwardWhenNoConsumers(),
+                                                       config.getConfirmationWindowSize(),
+                                                       executorFactory,
+                                                       server,
+                                                       postOffice,
+                                                       managementService,
+                                                       scheduledExecutor,
+                                                       config.getMaxHops(),
+                                                       nodeUUID,
+                                                       backup,
+                                                       server.getConfiguration().getClusterUser(),
+                                                       server.getConfiguration().getClusterPassword(),
+                                                       config.isAllowDirectConnectionsOnly());
       }
       else
       {
-         // no connector or discovery group are defined. The cluster connection will only be a target and will
-         // no connect to other nodes in the cluster
-         serverLocator = null;
+         TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null? connectorNameListToArray(config.getStaticConnectors()):null;
+
+         clusterConnection = new ClusterConnectionImpl(tcConfigs,
+                                                       connector,
+                                                       new SimpleString(config.getName()),
+                                                       new SimpleString(config.getAddress()),
+                                                       config.getRetryInterval(),
+                                                       config.isDuplicateDetection(),
+                                                       config.isForwardWhenNoConsumers(),
+                                                       config.getConfirmationWindowSize(),
+                                                       executorFactory,
+                                                       server,
+                                                       postOffice,
+                                                       managementService,
+                                                       scheduledExecutor,
+                                                       config.getMaxHops(),
+                                                       nodeUUID,
+                                                       backup,
+                                                       server.getConfiguration().getClusterUser(),
+                                                       server.getConfiguration().getClusterPassword(),
+                                                       config.isAllowDirectConnectionsOnly());
       }
 
-      ClusterConnectionImpl clusterConnection = new ClusterConnectionImpl(serverLocator,
-                                                                      connector,
-                                                                      new SimpleString(config.getName()),
-                                                                      new SimpleString(config.getAddress()),
-                                                                      config.getRetryInterval(),
-                                                                      config.isDuplicateDetection(),
-                                                                      config.isForwardWhenNoConsumers(),
-                                                                      config.getConfirmationWindowSize(),
-                                                                      executorFactory,
-                                                                      server,
-                                                                      postOffice,
-                                                                      managementService,
-                                                                      scheduledExecutor,
-                                                                      config.getMaxHops(),
-                                                                      nodeUUID,
-                                                                      backup,
-                                                                      server.getConfiguration().getClusterUser(),
-                                                                      server.getConfiguration().getClusterPassword(),
-                                                                      config.isAllowDirectConnectionsOnly());
-
       managementService.registerCluster(clusterConnection, config);
 
       clusterConnections.put(config.getName(), clusterConnection);

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2010-12-09 22:24:10 UTC (rev 10030)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java	2010-12-10 14:56:36 UTC (rev 10031)
@@ -774,11 +774,6 @@
 
    public void testRouteWhenNoConsumersTrueNonBalancedQueues() throws Exception
    {
-      // server #0 is connected to server #1
-      setupClusterConnection("cluster1", 0, 1, "queues", true, 1,  isNetty(), true);
-      // server #1 is connected to nobody
-      setupClusterConnection("clusterX", 1, -1, "queues", false, 1,  isNetty(), true);
-   
       startServers(1, 0);
 
       setupSessionFactory(0,  isNetty(), true);



More information about the hornetq-commits mailing list