[hornetq-commits] JBoss hornetq SVN: r10150 - in branches/HORNETQ-316: src/main/org/hornetq/api/core/client and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 27 09:06:21 EST 2011


Author: igarashitm
Date: 2011-01-27 09:06:20 -0500 (Thu, 27 Jan 2011)
New Revision: 10150

Added:
   branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java
   branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
   branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
Removed:
   branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Modified:
   branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
   branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java
   branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
   branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
   branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
   branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
   branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
Log:
https://issues.jboss.org/browse/HORNETQ-316
first commit. just implemented pluggable discovery strategy and removed compile errors. not yet debugged.


Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009 Red Hat, Inc.
+ * Copyright 2010 Red Hat, Inc.
  * Red Hat licenses this file to you under the Apache License, version
  * 2.0 (the "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
@@ -14,169 +14,45 @@
 package org.hornetq.api.core;
 
 import java.io.Serializable;
-
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.UUIDGenerator;
-
+import java.util.Map;
+import java.util.Properties;
 /**
  * A DiscoveryGroupConfiguration
  *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 18 Nov 2008 08:47:30
- *
- *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  */
 public class DiscoveryGroupConfiguration implements Serializable
 {
-   private static final long serialVersionUID = 8657206421727863400L;
-   
-   private static final Logger log = Logger.getLogger(DiscoveryGroupConfiguration.class);
+   private static final long serialVersionUID = 2877108926493109407L;
 
-
-   private String name;
+   private final String serverLocatorClassName;
    
-   private String localBindAddress;
-
-   private String groupAddress;
-
-   private int groupPort;
-
-   private long refreshTimeout;
+   private final String name; 
    
-   private long discoveryInitialWaitTimeout;
-
-   public DiscoveryGroupConfiguration(final String name,
-                                      final String localBindAddress,
-                                      final String groupAddress,
-                                      final int groupPort,
-                                      final long refreshTimeout,
-                                      final long discoveryInitialWaitTimeout)
-   {
-      this.name = name;
-      this.groupAddress = groupAddress;
-      this.localBindAddress = localBindAddress;
-      this.groupPort = groupPort;
-      this.refreshTimeout = refreshTimeout;
-      this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
-   }
-
-   public DiscoveryGroupConfiguration(final String groupAddress,
-                                      final int groupPort)
-   {
-      this(UUIDGenerator.getInstance().generateStringUUID(), null, groupAddress, groupPort, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
-   }
-
-   public String getName()
-   {
-      return name;
-   }
+   private final Map<String, Object> params;
    
-   public String getLocalBindAddress()
+   public DiscoveryGroupConfiguration(final String clazz, final Map<String, Object> params, final String name) 
    {
-      return localBindAddress;
-   }
-
-   public String getGroupAddress()
-   {
-      return groupAddress;
-   }
-
-   public int getGroupPort()
-   {
-      return groupPort;
-   }
-
-   public long getRefreshTimeout()
-   {
-      return refreshTimeout;
-   }
-
-   /**
-    * @param name the name to set
-    */
-   public void setName(final String name)
-   {
+      this.serverLocatorClassName = clazz;
+      
+      this.params = params;
+      
       this.name = name;
    }
    
-   /**
-    * @param localBindAddress the localBindAddress to set
-    */
-   public void setLocalBindAdress(final String localBindAddress)
+   public String getServerLocatorClassName()
    {
-      this.localBindAddress = localBindAddress;
+      return this.serverLocatorClassName;
    }
 
-   /**
-    * @param groupAddress the groupAddress to set
-    */
-   public void setGroupAddress(final String groupAddress)
+   public Map<String, Object> getParams()
    {
-      this.groupAddress = groupAddress;
+      return this.params;
    }
 
-   /**
-    * @param groupPort the groupPort to set
-    */
-   public void setGroupPort(final int groupPort)
+   public String getName()
    {
-      this.groupPort = groupPort;
+      return this.name;
    }
-
-   /**
-    * @param refreshTimeout the refreshTimeout to set
-    */
-   public void setRefreshTimeout(final long refreshTimeout)
-   {
-      this.refreshTimeout = refreshTimeout;
-   }
-
-   /**
-    * @return the discoveryInitialWaitTimeout
-    */
-   public long getDiscoveryInitialWaitTimeout()
-   {
-      return discoveryInitialWaitTimeout;
-   }
-
-   /**
-    * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout to set
-    */
-   public void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout)
-   {
-      this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
-   }
-
-   @Override
-   public boolean equals(Object o)
-   {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      DiscoveryGroupConfiguration that = (DiscoveryGroupConfiguration) o;
-
-      if (discoveryInitialWaitTimeout != that.discoveryInitialWaitTimeout) return false;
-      if (groupPort != that.groupPort) return false;
-      if (refreshTimeout != that.refreshTimeout) return false;
-      if (groupAddress != null ? !groupAddress.equals(that.groupAddress) : that.groupAddress != null) return false;
-      if (localBindAddress != null ? !localBindAddress.equals(that.localBindAddress) : that.localBindAddress != null)
-         return false;
-      if (name != null ? !name.equals(that.name) : that.name != null) return false;
-
-      return true;
-   }
-
-   @Override
-   public int hashCode()
-   {
-      int result = name != null ? name.hashCode() : 0;
-      result = 31 * result + (localBindAddress != null ? localBindAddress.hashCode() : 0);
-      result = 31 * result + (groupAddress != null ? groupAddress.hashCode() : 0);
-      result = 31 * result + groupPort;
-      result = 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>> 32));
-      result = 31 * result + (int) (discoveryInitialWaitTimeout ^ (discoveryInitialWaitTimeout >>> 32));
-      return result;
-   }
+   
 }

Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -13,13 +13,15 @@
 package org.hornetq.api.core.client;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
 
-import java.util.List;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Utility class for creating HornetQ {@link ClientSessionFactory} objects.
@@ -107,21 +109,34 @@
     */
    public static ServerLocator createServerLocatorWithoutHA(TransportConfiguration... transportConfigurations)
    {
-      return new ServerLocatorImpl(false, transportConfigurations);
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, transportConfigurations);
+      DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, null);
+      return createServerLocatorWithoutHA(config);
    }
    
    /**
     * Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
     * 
-    * The UDP address and port are used to listen for live servers in the cluster
-    * 
-    * @param discoveryAddress The UDP group address to listen for updates
-    * @param discoveryPort the UDP port to listen for updates
+    * @param groupConfiguration The configuration for server discovery
     * @return the ServerLocator
     */
    public static ServerLocator createServerLocatorWithoutHA(final DiscoveryGroupConfiguration groupConfiguration)
    {
-      return new ServerLocatorImpl(false, groupConfiguration);
+      ServerLocator serverLocator = null;
+      String className = groupConfiguration.getServerLocatorClassName();
+      try
+      {
+         ClassLoader loader = Thread.currentThread().getContextClassLoader();
+         Class<?> clazz = loader.loadClass(className);
+         Constructor<?> constructor = clazz.getConstructor(Boolean.class, DiscoveryGroupConfiguration.class);
+         serverLocator = (ServerLocator)constructor.newInstance(Boolean.FALSE, groupConfiguration);
+      }
+      catch(Exception e)
+      {
+          new HornetQException(HornetQException.INTERNAL_ERROR, "Could not instantiate ServerLocator implementation class: " + className, e);
+      }
+         return serverLocator;
    }
    
    /**
@@ -135,9 +150,12 @@
     */
    public static ServerLocator createServerLocatorWithHA(TransportConfiguration... initialServers)
    {
-      return new ServerLocatorImpl(true, initialServers);
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, initialServers);
+      DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, null);
+      return createServerLocatorWithHA(config);
    }
-   
+
    /**
     * Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
     * The discoveryAddress and discoveryPort parameters in this method are used to listen for UDP broadcasts which contain connection information for members of the cluster.
@@ -150,7 +168,20 @@
     */
    public static ServerLocator createServerLocatorWithHA(final DiscoveryGroupConfiguration groupConfiguration)
    {
-      return new ServerLocatorImpl(true, groupConfiguration);
+      ServerLocator serverLocator = null;
+      String className = groupConfiguration.getServerLocatorClassName();
+      try
+      {
+         ClassLoader loader = Thread.currentThread().getContextClassLoader();
+         Class<?> clazz = loader.loadClass(className);
+         Constructor<?> constructor = clazz.getConstructor(Boolean.class, DiscoveryGroupConfiguration.class);
+         serverLocator = (ServerLocator)constructor.newInstance(Boolean.TRUE, groupConfiguration);
+      }
+      catch(Exception e)
+      {
+          new HornetQException(HornetQException.INTERNAL_ERROR, "Could not instantiate ServerLocator implementation class: " + className, e);
+      }
+      return serverLocator;
    }
    
 

Copied: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java (from rev 10124, branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java)
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,1026 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A AbstractServerLocator
+ *
+ * @author Tim Fox
+ */
+public abstract class AbstractServerLocator implements ServerLocatorInternal, Serializable
+{
+   private static final long serialVersionUID = -1615857864410205260L;
+
+   private static final Logger log = Logger.getLogger(AbstractServerLocator.class);
+
+   private final boolean ha;
+
+   private boolean finalizeCheck = true;
+
+   private boolean clusterConnection;
+
+   private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
+
+   private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
+
+   private TransportConfiguration[] initialConnectors;
+
+   private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+
+   private Topology topology = new Topology();
+
+   private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
+
+   private boolean receivedTopology;
+
+   private boolean compressLargeMessage;
+
+   private ExecutorService threadPool;
+
+   private ScheduledExecutorService scheduledThreadPool;
+
+   private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+   private boolean readOnly;
+
+   // Settable attributes:
+
+   private boolean cacheLargeMessagesClient;
+
+   private long clientFailureCheckPeriod;
+
+   private long connectionTTL;
+
+   private long callTimeout;
+
+   private int minLargeMessageSize;
+
+   private int consumerWindowSize;
+
+   private int consumerMaxRate;
+
+   private int confirmationWindowSize;
+
+   private int producerWindowSize;
+
+   private int producerMaxRate;
+
+   private boolean blockOnAcknowledge;
+
+   private boolean blockOnDurableSend;
+
+   private boolean blockOnNonDurableSend;
+
+   private boolean autoGroup;
+
+   private boolean preAcknowledge;
+
+   private String connectionLoadBalancingPolicyClassName;
+
+   private int ackBatchSize;
+
+   private boolean useGlobalPools;
+
+   private int scheduledThreadPoolMaxSize;
+
+   private int threadPoolMaxSize;
+
+   private long retryInterval;
+
+   private double retryIntervalMultiplier;
+
+   private long maxRetryInterval;
+
+   private int reconnectAttempts;
+
+   private int initialConnectAttempts;
+
+   private boolean failoverOnInitialConnection;
+
+   private int initialMessagePacketSize;
+
+   private volatile boolean closed;
+
+   private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+
+   private static ExecutorService globalThreadPool;
+
+   private static ScheduledExecutorService globalScheduledThreadPool;
+
+   private String groupID;
+
+   private String nodeID;
+
+   private TransportConfiguration clusterTransportConfiguration;
+
+   private boolean backup;
+
+   private final Exception e = new Exception();
+   
+   // To be called when there are ServerLocator being finalized.
+   // To be used on test assertions
+   public static Runnable finalizeCallback = null;
+
+   private static synchronized ExecutorService getGlobalThreadPool()
+   {
+      if (globalThreadPool == null)
+      {
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
+
+         globalThreadPool = Executors.newCachedThreadPool(factory);
+      }
+
+      return globalThreadPool;
+   }
+
+   public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+   {
+      if (globalScheduledThreadPool == null)
+      {
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
+                                                          true,
+                                                          getThisClassLoader());
+
+         globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+                                                                      factory);
+      }
+
+      return globalScheduledThreadPool;
+   }
+
+   protected void setThreadPools()
+   {
+      if (useGlobalPools)
+      {
+         threadPool = getGlobalThreadPool();
+
+         scheduledThreadPool = getGlobalScheduledThreadPool();
+      }
+      else
+      {
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
+                                                          true,
+                                                          getThisClassLoader());
+
+         if (threadPoolMaxSize == -1)
+         {
+            threadPool = Executors.newCachedThreadPool(factory);
+         }
+         else
+         {
+            threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+         }
+
+         factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
+                                            true,
+                                            getThisClassLoader());
+
+         scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+      }
+   }
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return ClientSessionFactoryImpl.class.getClassLoader();
+         }
+      });
+
+   }
+
+   protected void instantiateLoadBalancingPolicy()
+   {
+      if (connectionLoadBalancingPolicyClassName == null)
+      {
+         throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
+      }
+
+      AccessController.doPrivileged(new PrivilegedAction<Object>()
+      {
+         public Object run()
+         {
+            ClassLoader loader = Thread.currentThread().getContextClassLoader();
+            try
+            {
+               Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
+               loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+               return null;
+            }
+            catch (Exception e)
+            {
+               throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
+                                                           "\"",
+                                                  e);
+            }
+         }
+      });
+   }
+
+   public AbstractServerLocator(final boolean useHA,
+                             final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+   {
+      e.fillInStackTrace();
+      this.ha = useHA;
+
+      this.discoveryGroupConfiguration = discoveryGroupConfiguration;
+
+      this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
+
+      clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+      connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+      callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+      minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+      consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+      confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+      producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+      producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+      blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+      blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+      blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+      autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+      preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+      ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+      connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+      useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+      scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+      threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+      retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+      retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+      maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+      reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+      initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
+
+      failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
+
+      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+      initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+
+      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+      compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
+
+      clusterConnection = false;
+   }
+
+   protected TransportConfiguration selectConnector()
+   {
+      if (receivedTopology)
+      {
+         int pos = loadBalancingPolicy.select(topologyArray.length);
+
+         Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
+
+         return pair.a;
+      }
+      else
+      {
+         // Get from initialconnectors
+
+         int pos = loadBalancingPolicy.select(initialConnectors.length);
+
+         return initialConnectors[pos];
+      }
+   }
+
+   protected int getConnectorLength()
+   {
+      if(receivedTopology)
+      {
+         return topologyArray.length;
+      }
+      else if(initialConnectors == null)
+      {
+         return -1;
+      }
+      else
+      {
+         return initialConnectors.length;
+      }
+   }
+   
+   public void disableFinalizeCheck()
+   {
+      finalizeCheck = false;
+   }
+
+   public synchronized boolean isHA()
+   {
+      return ha;
+   }
+
+   public synchronized boolean isCacheLargeMessagesClient()
+   {
+      return cacheLargeMessagesClient;
+   }
+
+   public synchronized void setCacheLargeMessagesClient(final boolean cached)
+   {
+      cacheLargeMessagesClient = cached;
+   }
+
+   public synchronized long getClientFailureCheckPeriod()
+   {
+      return clientFailureCheckPeriod;
+   }
+
+   public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+   {
+      checkWrite();
+      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+   }
+
+   public synchronized long getConnectionTTL()
+   {
+      return connectionTTL;
+   }
+
+   public synchronized void setConnectionTTL(final long connectionTTL)
+   {
+      checkWrite();
+      this.connectionTTL = connectionTTL;
+   }
+
+   public synchronized long getCallTimeout()
+   {
+      return callTimeout;
+   }
+
+   public synchronized void setCallTimeout(final long callTimeout)
+   {
+      checkWrite();
+      this.callTimeout = callTimeout;
+   }
+
+   public synchronized int getMinLargeMessageSize()
+   {
+      return minLargeMessageSize;
+   }
+
+   public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+   {
+      checkWrite();
+      this.minLargeMessageSize = minLargeMessageSize;
+   }
+
+   public synchronized int getConsumerWindowSize()
+   {
+      return consumerWindowSize;
+   }
+
+   public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+   {
+      checkWrite();
+      this.consumerWindowSize = consumerWindowSize;
+   }
+
+   public synchronized int getConsumerMaxRate()
+   {
+      return consumerMaxRate;
+   }
+
+   public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+   {
+      checkWrite();
+      this.consumerMaxRate = consumerMaxRate;
+   }
+
+   public synchronized int getConfirmationWindowSize()
+   {
+      return confirmationWindowSize;
+   }
+
+   public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+   {
+      checkWrite();
+      this.confirmationWindowSize = confirmationWindowSize;
+   }
+
+   public synchronized int getProducerWindowSize()
+   {
+      return producerWindowSize;
+   }
+
+   public synchronized void setProducerWindowSize(final int producerWindowSize)
+   {
+      checkWrite();
+      this.producerWindowSize = producerWindowSize;
+   }
+
+   public synchronized int getProducerMaxRate()
+   {
+      return producerMaxRate;
+   }
+
+   public synchronized void setProducerMaxRate(final int producerMaxRate)
+   {
+      checkWrite();
+      this.producerMaxRate = producerMaxRate;
+   }
+
+   public synchronized boolean isBlockOnAcknowledge()
+   {
+      return blockOnAcknowledge;
+   }
+
+   public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+   {
+      checkWrite();
+      this.blockOnAcknowledge = blockOnAcknowledge;
+   }
+
+   public synchronized boolean isBlockOnDurableSend()
+   {
+      return blockOnDurableSend;
+   }
+
+   public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+   {
+      checkWrite();
+      this.blockOnDurableSend = blockOnDurableSend;
+   }
+
+   public synchronized boolean isBlockOnNonDurableSend()
+   {
+      return blockOnNonDurableSend;
+   }
+
+   public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+   {
+      checkWrite();
+      this.blockOnNonDurableSend = blockOnNonDurableSend;
+   }
+
+   public synchronized boolean isAutoGroup()
+   {
+      return autoGroup;
+   }
+
+   public synchronized void setAutoGroup(final boolean autoGroup)
+   {
+      checkWrite();
+      this.autoGroup = autoGroup;
+   }
+
+   public synchronized boolean isPreAcknowledge()
+   {
+      return preAcknowledge;
+   }
+
+   public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+   {
+      checkWrite();
+      this.preAcknowledge = preAcknowledge;
+   }
+
+   public synchronized int getAckBatchSize()
+   {
+      return ackBatchSize;
+   }
+
+   public synchronized void setAckBatchSize(final int ackBatchSize)
+   {
+      checkWrite();
+      this.ackBatchSize = ackBatchSize;
+   }
+
+   public synchronized boolean isUseGlobalPools()
+   {
+      return useGlobalPools;
+   }
+
+   public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+   {
+      checkWrite();
+      this.useGlobalPools = useGlobalPools;
+   }
+
+   public synchronized int getScheduledThreadPoolMaxSize()
+   {
+      return scheduledThreadPoolMaxSize;
+   }
+
+   public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+   {
+      checkWrite();
+      this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+   }
+
+   public synchronized int getThreadPoolMaxSize()
+   {
+      return threadPoolMaxSize;
+   }
+
+   public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+   {
+      checkWrite();
+      this.threadPoolMaxSize = threadPoolMaxSize;
+   }
+
+   public synchronized long getRetryInterval()
+   {
+      return retryInterval;
+   }
+
+   public synchronized void setRetryInterval(final long retryInterval)
+   {
+      checkWrite();
+      this.retryInterval = retryInterval;
+   }
+
+   public synchronized long getMaxRetryInterval()
+   {
+      return maxRetryInterval;
+   }
+
+   public synchronized void setMaxRetryInterval(final long retryInterval)
+   {
+      checkWrite();
+      maxRetryInterval = retryInterval;
+   }
+
+   public synchronized double getRetryIntervalMultiplier()
+   {
+      return retryIntervalMultiplier;
+   }
+
+   public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+   {
+      checkWrite();
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+   }
+
+   public synchronized int getReconnectAttempts()
+   {
+      return reconnectAttempts;
+   }
+
+   public synchronized void setReconnectAttempts(final int reconnectAttempts)
+   {
+      checkWrite();
+      this.reconnectAttempts = reconnectAttempts;
+   }
+
+   public void setInitialConnectAttempts(int initialConnectAttempts)
+   {
+      checkWrite();
+      this.initialConnectAttempts = initialConnectAttempts;
+   }
+
+   public int getInitialConnectAttempts()
+   {
+      return initialConnectAttempts;
+   }
+
+   public synchronized boolean isFailoverOnInitialConnection()
+   {
+      return this.failoverOnInitialConnection;
+   }
+
+   public synchronized void setFailoverOnInitialConnection(final boolean failover)
+   {
+      checkWrite();
+      this.failoverOnInitialConnection = failover;
+   }
+
+   public synchronized String getConnectionLoadBalancingPolicyClassName()
+   {
+      return connectionLoadBalancingPolicyClassName;
+   }
+
+   public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+   {
+      checkWrite();
+      connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+   }
+
+   public TransportConfiguration[] getStaticTransportConfigurations()
+   {
+      return this.initialConnectors;
+   }
+
+   public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
+   {
+      return discoveryGroupConfiguration;
+   }
+
+   public void addInterceptor(final Interceptor interceptor)
+   {
+      interceptors.add(interceptor);
+   }
+
+   public boolean removeInterceptor(final Interceptor interceptor)
+   {
+      return interceptors.remove(interceptor);
+   }
+
+   public synchronized int getInitialMessagePacketSize()
+   {
+      return initialMessagePacketSize;
+   }
+
+   public synchronized void setInitialMessagePacketSize(final int size)
+   {
+      checkWrite();
+      initialMessagePacketSize = size;
+   }
+
+   public void setGroupID(final String groupID)
+   {
+      checkWrite();
+      this.groupID = groupID;
+   }
+
+   public String getGroupID()
+   {
+      return groupID;
+   }
+
+   public boolean isCompressLargeMessage()
+   {
+      return compressLargeMessage;
+   }
+
+   public void setCompressLargeMessage(boolean compress)
+   {
+      this.compressLargeMessage = compress;
+   }
+
+   private void checkWrite()
+   {
+      if (readOnly)
+      {
+         throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
+      }
+   }
+
+   public void setNodeID(String nodeID)
+   {
+      this.nodeID = nodeID;
+   }
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+
+   public void setClusterConnection(boolean clusterConnection)
+   {
+      this.clusterConnection = clusterConnection;
+   }
+
+   public boolean isClusterConnection()
+   {
+      return clusterConnection;
+   }
+
+   public TransportConfiguration getClusterTransportConfiguration()
+   {
+      return clusterTransportConfiguration;
+   }
+
+   public void setClusterTransportConfiguration(TransportConfiguration tc)
+   {
+      this.clusterTransportConfiguration = tc;
+   }
+
+   public boolean isBackup()
+   {
+      return backup;
+   }
+
+   public void setBackup(boolean backup)
+   {
+      this.backup = backup;
+   }
+
+   protected boolean isReceivedTopology()
+   {
+      return this.receivedTopology;
+   }
+   
+   protected boolean doFinalizeCheck()
+   {
+      return this.finalizeCheck;
+   }
+   
+   protected ConnectionLoadBalancingPolicy getLoadBalancingPolicy()
+   {
+      return this.loadBalancingPolicy;
+   }
+   
+   protected ExecutorService getThreadPool()
+   {
+      return threadPool;
+   }
+
+   protected ScheduledExecutorService getScheduledThreadPool()
+   {
+      return scheduledThreadPool;
+   }
+
+   protected List<Interceptor> getInterceptors()
+   {
+      return this.interceptors;
+   }
+   
+   protected TransportConfiguration[] getInitialConnectors()
+   {
+      return this.initialConnectors;
+   }
+   
+   protected void setInitialConnectors(TransportConfiguration[] initialConnectors)
+   {
+      this.initialConnectors = initialConnectors;
+   }
+   
+   protected boolean isReadOnly()
+   {
+      return this.readOnly;
+   }
+   
+   protected void setReadOnly(boolean readOnly)
+   {
+      this.readOnly = readOnly;
+   }
+   
+   protected boolean isClosed()
+   {
+      return this.closed;
+   }
+   
+   @Override
+   protected void finalize() throws Throwable
+   {
+      if (finalizeCheck)
+      {
+         close();
+      }
+
+      super.finalize();
+   }
+
+   public void close()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+
+      for (ClientSessionFactory factory : factories)
+      {
+         factory.close();
+      }
+
+      factories.clear();
+
+      if (!useGlobalPools)
+      {
+         if (threadPool != null)
+         {
+            threadPool.shutdown();
+
+            try
+            {
+               if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+               {
+                  log.warn("Timed out waiting for pool to terminate");
+               }
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+         }
+
+         if (scheduledThreadPool != null)
+         {
+            scheduledThreadPool.shutdown();
+
+            try
+            {
+               if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+               {
+                  log.warn("Timed out waiting for scheduled pool to terminate");
+               }
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+         }
+      }
+      readOnly = false;
+
+      closed = true;
+   }
+   
+   public synchronized void notifyNodeDown(final String nodeID)
+   {
+      boolean removed = false;
+
+      if (!ha)
+      {
+         return;
+      }
+
+      removed = topology.removeMember(nodeID);
+
+      if (!topology.isEmpty())
+      {
+         updateArraysAndPairs();
+
+         if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+         {
+            receivedTopology = false;
+         }
+      }
+      else
+      {
+         topologyArray = null;
+
+         receivedTopology = false;
+      }
+
+      if (removed)
+      {
+         for (ClusterTopologyListener listener : topologyListeners)
+         {
+            listener.nodeDown(nodeID);
+         }
+      }
+   }
+
+   public synchronized void notifyNodeUp(final String nodeID,
+                                         final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                                         final boolean last)
+   {
+      if (!ha)
+      {
+         return;
+      }
+
+      topology.addMember(nodeID, new TopologyMember(connectorPair));
+
+      TopologyMember actMember = topology.getMember(nodeID);
+
+      if (actMember.getConnector().a != null && actMember.getConnector().b != null)
+      {
+         for (ClientSessionFactory factory : factories)
+         {
+            ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+                                                                       actMember.getConnector().b);
+         }
+      }
+
+      if (connectorPair.a != null)
+      {
+         updateArraysAndPairs();
+      }
+
+      if (last)
+      {
+         receivedTopology = true;
+      }
+
+      for (ClusterTopologyListener listener : topologyListeners)
+      {
+         listener.nodeUP(nodeID, connectorPair, last);
+      }
+
+      // Notify if waiting on getting topology
+      notify();
+   }
+
+   private void updateArraysAndPairs()
+   {
+      topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
+                                                                                                topology.members());
+
+      int count = 0;
+      for (TopologyMember pair : topology.getMembers())
+      {
+         topologyArray[count++] = pair.getConnector();
+      }
+   }
+
+   public synchronized void factoryClosed(final ClientSessionFactory factory)
+   {
+      factories.remove(factory);
+
+      if (factories.isEmpty())
+      {
+         // Go back to using the broadcast or static list
+
+         receivedTopology = false;
+
+         topology = null;
+
+      }
+   }
+
+   public Topology getTopology()
+   {
+      return topology;
+   }
+
+   public void addClusterTopologyListener(final ClusterTopologyListener listener)
+   {
+      topologyListeners.add(listener);
+      if(topology.members() > 0)
+      {
+         System.out.println("ServerLocatorImpl.addClusterTopologyListener");
+      }
+   }
+
+   public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+   {
+      topologyListeners.remove(listener);
+   }
+
+   public synchronized void addFactory(ClientSessionFactoryInternal factory)
+   {
+      if (factory != null)
+      {
+         TransportConfiguration backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+         factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+         factories.add(factory);
+      }
+   }
+
+   public static void shutdown()
+   {
+      if (globalScheduledThreadPool != null)
+      {
+         globalScheduledThreadPool.shutdown();
+         globalScheduledThreadPool = null;
+      }
+      if (globalThreadPool != null)
+      {
+         globalThreadPool.shutdown();
+         globalThreadPool = null;
+      }
+   }
+   
+   public boolean isStaticDirectConnection(TransportConfiguration conf)
+   {
+      return false;
+   }
+}

Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ */
+public class DiscoveryGroupConstants
+{
+   // for static discovery
+   public static final String STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME = "static-connector-ref-list";
+   public static final String STATIC_CONNECTORS_LIST_NAME = "static-connector-list";
+   
+   // for simple UDP discovery
+   public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+   public static final String GROUP_ADDRESS_NAME = "group-address";
+   public static final String GROUP_PORT_NAME = "group-port";
+   public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+   public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}


Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Deleted: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -1,1456 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * A ServerLocatorImpl
- *
- * @author Tim Fox
- */
-public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
-{
-   private static final long serialVersionUID = -1615857864410205260L;
-
-   private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
-
-   private final boolean ha;
-
-   private boolean finalizeCheck = true;
-
-   private boolean clusterConnection;
-
-   private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
-
-   private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
-
-   private TransportConfiguration[] initialConnectors;
-
-   private DiscoveryGroupConfiguration discoveryGroupConfiguration;
-
-   private StaticConnector staticConnector = new StaticConnector();
-
-   private Topology topology = new Topology();
-
-   private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
-
-   private boolean receivedTopology;
-
-   private boolean compressLargeMessage;
-
-   private ExecutorService threadPool;
-
-   private ScheduledExecutorService scheduledThreadPool;
-
-   private DiscoveryGroup discoveryGroup;
-
-   private ConnectionLoadBalancingPolicy loadBalancingPolicy;
-
-   private boolean readOnly;
-
-   // Settable attributes:
-
-   private boolean cacheLargeMessagesClient;
-
-   private long clientFailureCheckPeriod;
-
-   private long connectionTTL;
-
-   private long callTimeout;
-
-   private int minLargeMessageSize;
-
-   private int consumerWindowSize;
-
-   private int consumerMaxRate;
-
-   private int confirmationWindowSize;
-
-   private int producerWindowSize;
-
-   private int producerMaxRate;
-
-   private boolean blockOnAcknowledge;
-
-   private boolean blockOnDurableSend;
-
-   private boolean blockOnNonDurableSend;
-
-   private boolean autoGroup;
-
-   private boolean preAcknowledge;
-
-   private String connectionLoadBalancingPolicyClassName;
-
-   private int ackBatchSize;
-
-   private boolean useGlobalPools;
-
-   private int scheduledThreadPoolMaxSize;
-
-   private int threadPoolMaxSize;
-
-   private long retryInterval;
-
-   private double retryIntervalMultiplier;
-
-   private long maxRetryInterval;
-
-   private int reconnectAttempts;
-
-   private int initialConnectAttempts;
-
-   private boolean failoverOnInitialConnection;
-
-   private int initialMessagePacketSize;
-
-   private volatile boolean closed;
-
-   private volatile boolean closing;
-
-   private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
-
-   private static ExecutorService globalThreadPool;
-
-   private static ScheduledExecutorService globalScheduledThreadPool;
-
-   private String groupID;
-
-   private String nodeID;
-
-   private TransportConfiguration clusterTransportConfiguration;
-
-   private boolean backup;
-
-   private final Exception e = new Exception();
-   
-   // To be called when there are ServerLocator being finalized.
-   // To be used on test assertions
-   public static Runnable finalizeCallback = null;
-
-   private static synchronized ExecutorService getGlobalThreadPool()
-   {
-      if (globalThreadPool == null)
-      {
-         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
-
-         globalThreadPool = Executors.newCachedThreadPool(factory);
-      }
-
-      return globalThreadPool;
-   }
-
-   public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
-   {
-      if (globalScheduledThreadPool == null)
-      {
-         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
-                                                          true,
-                                                          getThisClassLoader());
-
-         globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-
-                                                                      factory);
-      }
-
-      return globalScheduledThreadPool;
-   }
-
-   private void setThreadPools()
-   {
-      if (useGlobalPools)
-      {
-         threadPool = getGlobalThreadPool();
-
-         scheduledThreadPool = getGlobalScheduledThreadPool();
-      }
-      else
-      {
-         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
-                                                          true,
-                                                          getThisClassLoader());
-
-         if (threadPoolMaxSize == -1)
-         {
-            threadPool = Executors.newCachedThreadPool(factory);
-         }
-         else
-         {
-            threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
-         }
-
-         factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
-                                            true,
-                                            getThisClassLoader());
-
-         scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
-      }
-   }
-
-   private static ClassLoader getThisClassLoader()
-   {
-      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
-      {
-         public ClassLoader run()
-         {
-            return ClientSessionFactoryImpl.class.getClassLoader();
-         }
-      });
-
-   }
-
-   private void instantiateLoadBalancingPolicy()
-   {
-      if (connectionLoadBalancingPolicyClassName == null)
-      {
-         throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
-      }
-
-      AccessController.doPrivileged(new PrivilegedAction<Object>()
-      {
-         public Object run()
-         {
-            ClassLoader loader = Thread.currentThread().getContextClassLoader();
-            try
-            {
-               Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
-               loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
-               return null;
-            }
-            catch (Exception e)
-            {
-               throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
-                                                           "\"",
-                                                  e);
-            }
-         }
-      });
-   }
-
-   private synchronized void initialise() throws Exception
-   {
-      if (!readOnly)
-      {
-         setThreadPools();
-
-         instantiateLoadBalancingPolicy();
-
-         if (discoveryGroupConfiguration != null)
-         {
-            InetAddress groupAddress = InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
-
-            InetAddress lbAddress;
-
-            if (discoveryGroupConfiguration.getLocalBindAddress() != null)
-            {
-               lbAddress = InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
-            }
-            else
-            {
-               lbAddress = null;
-            }
-
-            discoveryGroup = new DiscoveryGroupImpl(nodeID,
-                                                    discoveryGroupConfiguration.getName(),
-                                                    lbAddress,
-                                                    groupAddress,
-                                                    discoveryGroupConfiguration.getGroupPort(),
-                                                    discoveryGroupConfiguration.getRefreshTimeout());
-
-            discoveryGroup.registerListener(this);
-
-            discoveryGroup.start();
-         }
-
-         readOnly = true;
-      }
-   }
-
-   private ServerLocatorImpl(final boolean useHA,
-                             final DiscoveryGroupConfiguration discoveryGroupConfiguration,
-                             final TransportConfiguration[] transportConfigs)
-   {
-      e.fillInStackTrace();
-      this.ha = useHA;
-
-      this.discoveryGroupConfiguration = discoveryGroupConfiguration;
-
-      this.initialConnectors = transportConfigs;
-
-      this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
-
-      clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
-      connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
-      callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
-
-      minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
-      consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
-      consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
-
-      confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
-      producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
-      producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
-
-      blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
-      blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
-      blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
-      autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
-
-      preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
-      ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
-
-      connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
-      useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
-
-      scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
-      threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
-      retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
-
-      retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
-      maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
-      reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
-      initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
-
-      failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
-
-      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
-      initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
-
-      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
-      compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
-      clusterConnection = false;
-   }
-
-   /**
-    * Create a ServerLocatorImpl using UDP discovery to lookup cluster
-    *
-    * @param discoveryAddress
-    * @param discoveryPort
-    */
-   public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
-   {
-      this(useHA, groupConfiguration, null);
-   }
-
-   /**
-    * Create a ServerLocatorImpl using a static list of live servers
-    *
-    * @param transportConfigs
-    */
-   public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
-   {
-      this(useHA, null, transportConfigs);
-   }
-
-   private TransportConfiguration selectConnector()
-   {
-      if (receivedTopology)
-      {
-         int pos = loadBalancingPolicy.select(topologyArray.length);
-
-         Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
-
-         return pair.a;
-      }
-      else
-      {
-         // Get from initialconnectors
-
-         int pos = loadBalancingPolicy.select(initialConnectors.length);
-
-         return initialConnectors[pos];
-      }
-   }
-
-   public void start(Executor executor) throws Exception
-   {
-      initialise();
-
-      executor.execute(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               connect();
-            }
-            catch (Exception e)
-            {
-               if (!closing)
-               {
-                  log.warn("did not connect the cluster connection to other nodes", e);
-               }
-            }
-         }
-      });
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
-    */
-   public void disableFinalizeCheck()
-   {
-      finalizeCheck = false;
-   }
-
-   public ClientSessionFactory connect() throws Exception
-   {
-      ClientSessionFactoryInternal sf;
-      // static list of initial connectors
-      if (initialConnectors != null && discoveryGroup == null)
-      {
-         sf = (ClientSessionFactoryInternal)staticConnector.connect();
-      }
-      // wait for discovery group to get the list of initial connectors
-      else
-      {
-         sf = (ClientSessionFactoryInternal)createSessionFactory();
-      }
-      addFactory(sf);
-      return sf;
-   }
-
-   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
-   {
-      if (closed)
-      {
-         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
-      }
-
-      try
-      {
-         initialise();
-      }
-      catch (Exception e)
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-      }
-
-      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
-                                                                          transportConfiguration,
-                                                                          callTimeout,
-                                                                          clientFailureCheckPeriod,
-                                                                          connectionTTL,
-                                                                          retryInterval,
-                                                                          retryIntervalMultiplier,
-                                                                          maxRetryInterval,
-                                                                          reconnectAttempts,
-                                                                          threadPool,
-                                                                          scheduledThreadPool,
-                                                                          interceptors);
-
-      factory.connect(reconnectAttempts, failoverOnInitialConnection);
-
-      addFactory(factory);
-
-      return factory;
-   }
-
-   public ClientSessionFactory createSessionFactory() throws Exception
-   {
-      if (closed)
-      {
-         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
-      }
-
-      try
-      {
-         initialise();
-      }
-      catch (Exception e)
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-      }
-
-      if (initialConnectors == null && discoveryGroup != null)
-      {
-         // Wait for an initial broadcast to give us at least one node in the cluster
-         long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
-         boolean ok = discoveryGroup.waitForBroadcast(timeout);
-
-         if (!ok)
-         {
-            throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                       "Timed out waiting to receive initial broadcast from cluster");
-         }
-      }
-
-      ClientSessionFactoryInternal factory = null;
-
-      synchronized (this)
-      {
-         boolean retry;
-         int attempts = 0;
-         do
-         {
-            retry = false;
-
-            TransportConfiguration tc = selectConnector();
-
-            // try each factory in the list until we find one which works
-
-            try
-            {
-               factory = new ClientSessionFactoryImpl(this,
-                                                      tc,
-                                                      callTimeout,
-                                                      clientFailureCheckPeriod,
-                                                      connectionTTL,
-                                                      retryInterval,
-                                                      retryIntervalMultiplier,
-                                                      maxRetryInterval,
-                                                      reconnectAttempts,
-                                                      threadPool,
-                                                      scheduledThreadPool,
-                                                      interceptors);
-               factory.connect(initialConnectAttempts, failoverOnInitialConnection);
-            }
-            catch (HornetQException e)
-            {
-               factory.close();
-               factory = null;
-               if (e.getCode() == HornetQException.NOT_CONNECTED)
-               {
-                  attempts++;
-
-                  if (topologyArray != null && attempts == topologyArray.length)
-                  {
-                     throw new HornetQException(HornetQException.NOT_CONNECTED,
-                                                "Cannot connect to server(s). Tried with all available servers.");
-                  }
-                  if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
-                  {
-                     throw new HornetQException(HornetQException.NOT_CONNECTED,
-                                                "Cannot connect to server(s). Tried with all available servers.");
-                  }
-                  retry = true;
-               }
-               else
-               {
-                  throw e;
-               }
-            }
-         }
-         while (retry);
-
-         if (ha)
-         {
-            long toWait = 30000;
-            long start = System.currentTimeMillis();
-            while (!receivedTopology && toWait > 0)
-            {
-               // Now wait for the topology
-
-               try
-               {
-                  wait(toWait);
-               }
-               catch (InterruptedException ignore)
-               {
-               }
-
-               long now = System.currentTimeMillis();
-
-               toWait -= now - start;
-
-               start = now;
-            }
-
-            if (toWait <= 0)
-            {
-               throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                          "Timed out waiting to receive cluster topology");
-            }
-         }
-
-         addFactory(factory);
-
-         return factory;
-      }
-   }
-
-   public synchronized boolean isHA()
-   {
-      return ha;
-   }
-
-   public synchronized boolean isCacheLargeMessagesClient()
-   {
-      return cacheLargeMessagesClient;
-   }
-
-   public synchronized void setCacheLargeMessagesClient(final boolean cached)
-   {
-      cacheLargeMessagesClient = cached;
-   }
-
-   public synchronized long getClientFailureCheckPeriod()
-   {
-      return clientFailureCheckPeriod;
-   }
-
-   public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
-   {
-      checkWrite();
-      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-   }
-
-   public synchronized long getConnectionTTL()
-   {
-      return connectionTTL;
-   }
-
-   public synchronized void setConnectionTTL(final long connectionTTL)
-   {
-      checkWrite();
-      this.connectionTTL = connectionTTL;
-   }
-
-   public synchronized long getCallTimeout()
-   {
-      return callTimeout;
-   }
-
-   public synchronized void setCallTimeout(final long callTimeout)
-   {
-      checkWrite();
-      this.callTimeout = callTimeout;
-   }
-
-   public synchronized int getMinLargeMessageSize()
-   {
-      return minLargeMessageSize;
-   }
-
-   public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
-   {
-      checkWrite();
-      this.minLargeMessageSize = minLargeMessageSize;
-   }
-
-   public synchronized int getConsumerWindowSize()
-   {
-      return consumerWindowSize;
-   }
-
-   public synchronized void setConsumerWindowSize(final int consumerWindowSize)
-   {
-      checkWrite();
-      this.consumerWindowSize = consumerWindowSize;
-   }
-
-   public synchronized int getConsumerMaxRate()
-   {
-      return consumerMaxRate;
-   }
-
-   public synchronized void setConsumerMaxRate(final int consumerMaxRate)
-   {
-      checkWrite();
-      this.consumerMaxRate = consumerMaxRate;
-   }
-
-   public synchronized int getConfirmationWindowSize()
-   {
-      return confirmationWindowSize;
-   }
-
-   public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
-   {
-      checkWrite();
-      this.confirmationWindowSize = confirmationWindowSize;
-   }
-
-   public synchronized int getProducerWindowSize()
-   {
-      return producerWindowSize;
-   }
-
-   public synchronized void setProducerWindowSize(final int producerWindowSize)
-   {
-      checkWrite();
-      this.producerWindowSize = producerWindowSize;
-   }
-
-   public synchronized int getProducerMaxRate()
-   {
-      return producerMaxRate;
-   }
-
-   public synchronized void setProducerMaxRate(final int producerMaxRate)
-   {
-      checkWrite();
-      this.producerMaxRate = producerMaxRate;
-   }
-
-   public synchronized boolean isBlockOnAcknowledge()
-   {
-      return blockOnAcknowledge;
-   }
-
-   public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
-   {
-      checkWrite();
-      this.blockOnAcknowledge = blockOnAcknowledge;
-   }
-
-   public synchronized boolean isBlockOnDurableSend()
-   {
-      return blockOnDurableSend;
-   }
-
-   public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
-   {
-      checkWrite();
-      this.blockOnDurableSend = blockOnDurableSend;
-   }
-
-   public synchronized boolean isBlockOnNonDurableSend()
-   {
-      return blockOnNonDurableSend;
-   }
-
-   public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
-   {
-      checkWrite();
-      this.blockOnNonDurableSend = blockOnNonDurableSend;
-   }
-
-   public synchronized boolean isAutoGroup()
-   {
-      return autoGroup;
-   }
-
-   public synchronized void setAutoGroup(final boolean autoGroup)
-   {
-      checkWrite();
-      this.autoGroup = autoGroup;
-   }
-
-   public synchronized boolean isPreAcknowledge()
-   {
-      return preAcknowledge;
-   }
-
-   public synchronized void setPreAcknowledge(final boolean preAcknowledge)
-   {
-      checkWrite();
-      this.preAcknowledge = preAcknowledge;
-   }
-
-   public synchronized int getAckBatchSize()
-   {
-      return ackBatchSize;
-   }
-
-   public synchronized void setAckBatchSize(final int ackBatchSize)
-   {
-      checkWrite();
-      this.ackBatchSize = ackBatchSize;
-   }
-
-   public synchronized boolean isUseGlobalPools()
-   {
-      return useGlobalPools;
-   }
-
-   public synchronized void setUseGlobalPools(final boolean useGlobalPools)
-   {
-      checkWrite();
-      this.useGlobalPools = useGlobalPools;
-   }
-
-   public synchronized int getScheduledThreadPoolMaxSize()
-   {
-      return scheduledThreadPoolMaxSize;
-   }
-
-   public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
-   {
-      checkWrite();
-      this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
-   }
-
-   public synchronized int getThreadPoolMaxSize()
-   {
-      return threadPoolMaxSize;
-   }
-
-   public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
-   {
-      checkWrite();
-      this.threadPoolMaxSize = threadPoolMaxSize;
-   }
-
-   public synchronized long getRetryInterval()
-   {
-      return retryInterval;
-   }
-
-   public synchronized void setRetryInterval(final long retryInterval)
-   {
-      checkWrite();
-      this.retryInterval = retryInterval;
-   }
-
-   public synchronized long getMaxRetryInterval()
-   {
-      return maxRetryInterval;
-   }
-
-   public synchronized void setMaxRetryInterval(final long retryInterval)
-   {
-      checkWrite();
-      maxRetryInterval = retryInterval;
-   }
-
-   public synchronized double getRetryIntervalMultiplier()
-   {
-      return retryIntervalMultiplier;
-   }
-
-   public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
-   {
-      checkWrite();
-      this.retryIntervalMultiplier = retryIntervalMultiplier;
-   }
-
-   public synchronized int getReconnectAttempts()
-   {
-      return reconnectAttempts;
-   }
-
-   public synchronized void setReconnectAttempts(final int reconnectAttempts)
-   {
-      checkWrite();
-      this.reconnectAttempts = reconnectAttempts;
-   }
-
-   public void setInitialConnectAttempts(int initialConnectAttempts)
-   {
-      checkWrite();
-      this.initialConnectAttempts = initialConnectAttempts;
-   }
-
-   public int getInitialConnectAttempts()
-   {
-      return initialConnectAttempts;
-   }
-
-   public synchronized boolean isFailoverOnInitialConnection()
-   {
-      return this.failoverOnInitialConnection;
-   }
-
-   public synchronized void setFailoverOnInitialConnection(final boolean failover)
-   {
-      checkWrite();
-      this.failoverOnInitialConnection = failover;
-   }
-
-   public synchronized String getConnectionLoadBalancingPolicyClassName()
-   {
-      return connectionLoadBalancingPolicyClassName;
-   }
-
-   public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
-   {
-      checkWrite();
-      connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
-   }
-
-   public TransportConfiguration[] getStaticTransportConfigurations()
-   {
-      return this.initialConnectors;
-   }
-
-   public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
-   {
-      return discoveryGroupConfiguration;
-   }
-
-   public void addInterceptor(final Interceptor interceptor)
-   {
-      interceptors.add(interceptor);
-   }
-
-   public boolean removeInterceptor(final Interceptor interceptor)
-   {
-      return interceptors.remove(interceptor);
-   }
-
-   public synchronized int getInitialMessagePacketSize()
-   {
-      return initialMessagePacketSize;
-   }
-
-   public synchronized void setInitialMessagePacketSize(final int size)
-   {
-      checkWrite();
-      initialMessagePacketSize = size;
-   }
-
-   public void setGroupID(final String groupID)
-   {
-      checkWrite();
-      this.groupID = groupID;
-   }
-
-   public String getGroupID()
-   {
-      return groupID;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
-    */
-   public boolean isCompressLargeMessage()
-   {
-      return compressLargeMessage;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
-    */
-   public void setCompressLargeMessage(boolean compress)
-   {
-      this.compressLargeMessage = compress;
-   }
-
-   private void checkWrite()
-   {
-      if (readOnly)
-      {
-         throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
-      }
-   }
-
-   public void setNodeID(String nodeID)
-   {
-      this.nodeID = nodeID;
-   }
-
-   public String getNodeID()
-   {
-      return nodeID;
-   }
-
-   public void setClusterConnection(boolean clusterConnection)
-   {
-      this.clusterConnection = clusterConnection;
-   }
-
-   public boolean isClusterConnection()
-   {
-      return clusterConnection;
-   }
-
-   public TransportConfiguration getClusterTransportConfiguration()
-   {
-      return clusterTransportConfiguration;
-   }
-
-   public void setClusterTransportConfiguration(TransportConfiguration tc)
-   {
-      this.clusterTransportConfiguration = tc;
-   }
-
-   public boolean isBackup()
-   {
-      return backup;
-   }
-
-   public void setBackup(boolean backup)
-   {
-      this.backup = backup;
-   }
-
-   @Override
-   protected void finalize() throws Throwable
-   {
-      if (finalizeCheck)
-      {
-         close();
-      }
-
-      super.finalize();
-   }
-
-   public void close()
-   {
-      if (closed)
-      {
-         return;
-      }
-
-      closing = true;
-
-      if (discoveryGroup != null)
-      {
-         try
-         {
-            discoveryGroup.stop();
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to stop discovery group", e);
-         }
-      }
-      else
-      {
-         staticConnector.disconnect();
-      }
-
-      for (ClientSessionFactory factory : factories)
-      {
-         factory.close();
-      }
-
-      factories.clear();
-
-      if (!useGlobalPools)
-      {
-         if (threadPool != null)
-         {
-            threadPool.shutdown();
-
-            try
-            {
-               if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
-               {
-                  log.warn("Timed out waiting for pool to terminate");
-               }
-            }
-            catch (InterruptedException ignore)
-            {
-            }
-         }
-
-         if (scheduledThreadPool != null)
-         {
-            scheduledThreadPool.shutdown();
-
-            try
-            {
-               if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
-               {
-                  log.warn("Timed out waiting for scheduled pool to terminate");
-               }
-            }
-            catch (InterruptedException ignore)
-            {
-            }
-         }
-      }
-      readOnly = false;
-
-      closed = true;
-   }
-
-   public synchronized void notifyNodeDown(final String nodeID)
-   {
-      boolean removed = false;
-
-      if (!ha)
-      {
-         return;
-      }
-
-      removed = topology.removeMember(nodeID);
-
-      if (!topology.isEmpty())
-      {
-         updateArraysAndPairs();
-
-         if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
-         {
-            receivedTopology = false;
-         }
-      }
-      else
-      {
-         topologyArray = null;
-
-         receivedTopology = false;
-      }
-
-      if (removed)
-      {
-         for (ClusterTopologyListener listener : topologyListeners)
-         {
-            listener.nodeDown(nodeID);
-         }
-      }
-   }
-
-   public synchronized void notifyNodeUp(final String nodeID,
-                                         final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                                         final boolean last)
-   {
-      if (!ha)
-      {
-         return;
-      }
-
-      topology.addMember(nodeID, new TopologyMember(connectorPair));
-
-      TopologyMember actMember = topology.getMember(nodeID);
-
-      if (actMember.getConnector().a != null && actMember.getConnector().b != null)
-      {
-         for (ClientSessionFactory factory : factories)
-         {
-            ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
-                                                                       actMember.getConnector().b);
-         }
-      }
-
-      if (connectorPair.a != null)
-      {
-         updateArraysAndPairs();
-      }
-
-      if (last)
-      {
-         receivedTopology = true;
-      }
-
-      for (ClusterTopologyListener listener : topologyListeners)
-      {
-         listener.nodeUP(nodeID, connectorPair, last);
-      }
-
-      // Notify if waiting on getting topology
-      notify();
-   }
-
-   private void updateArraysAndPairs()
-   {
-      topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
-                                                                                                topology.members());
-
-      int count = 0;
-      for (TopologyMember pair : topology.getMembers())
-      {
-         topologyArray[count++] = pair.getConnector();
-      }
-   }
-
-   public synchronized void connectorsChanged()
-   {
-      List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
-
-      this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
-                                                                           newConnectors.size());
-
-      int count = 0;
-      for (DiscoveryEntry entry : newConnectors)
-      {
-         this.initialConnectors[count++] = entry.getConnector();
-      }
-
-      if (ha && clusterConnection && !receivedTopology && initialConnectors.length > 0)
-      {
-         // FIXME the node is alone in the cluster. We create a connection to the new node
-         // to trigger the node notification to form the cluster.
-         try
-         {
-            connect();
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
-         }
-      }
-   }
-
-   public synchronized void factoryClosed(final ClientSessionFactory factory)
-   {
-      factories.remove(factory);
-
-      if (factories.isEmpty())
-      {
-         // Go back to using the broadcast or static list
-
-         receivedTopology = false;
-
-         topology = null;
-
-      }
-   }
-
-   public Topology getTopology()
-   {
-      return topology;
-   }
-
-   public void addClusterTopologyListener(final ClusterTopologyListener listener)
-   {
-      topologyListeners.add(listener);
-      if(topology.members() > 0)
-      {
-         System.out.println("ServerLocatorImpl.addClusterTopologyListener");
-      }
-   }
-
-   public void removeClusterTopologyListener(final ClusterTopologyListener listener)
-   {
-      topologyListeners.remove(listener);
-   }
-
-   public synchronized void addFactory(ClientSessionFactoryInternal factory)
-   {
-      if (factory != null)
-      {
-         TransportConfiguration backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
-         factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
-         factories.add(factory);
-      }
-   }
-
-   public static void shutdown()
-   {
-      if (globalScheduledThreadPool != null)
-      {
-         globalScheduledThreadPool.shutdown();
-         globalScheduledThreadPool = null;
-      }
-      if (globalThreadPool != null)
-      {
-         globalThreadPool.shutdown();
-         globalThreadPool = null;
-      }
-   }
-
-   class StaticConnector implements Serializable
-   {
-      private List<Connector> connectors;
-
-      public ClientSessionFactory connect() throws HornetQException
-      {
-         if (closed)
-         {
-            throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
-         }
-
-         try
-         {
-            initialise();
-         }
-         catch (Exception e)
-         {
-            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-         }
-
-         ClientSessionFactory csf = null;
-
-         createConnectors();
-
-         try
-         {
-            List<Future<ClientSessionFactory>> futures = threadPool.invokeAll(connectors);
-            for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
-            {
-               Future<ClientSessionFactory> future = futures.get(i);
-               try
-               {
-                  csf = future.get();
-                  if (csf != null)
-                     break;
-               }
-               catch (Exception e)
-               {
-                  log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
-               }
-            }
-            if (csf == null && !closed)
-            {
-               throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
-            }
-         }
-         catch (InterruptedException e)
-         {
-            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
-         }
-
-         if (csf == null && !closed)
-         {
-            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
-         }
-         return csf;
-      }
-
-      private synchronized void createConnectors()
-      {
-         connectors = new ArrayList<Connector>();
-         for (TransportConfiguration initialConnector : initialConnectors)
-         {
-            ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
-                                                                                initialConnector,
-                                                                                callTimeout,
-                                                                                clientFailureCheckPeriod,
-                                                                                connectionTTL,
-                                                                                retryInterval,
-                                                                                retryIntervalMultiplier,
-                                                                                maxRetryInterval,
-                                                                                reconnectAttempts,
-                                                                                threadPool,
-                                                                                scheduledThreadPool,
-                                                                                interceptors);
-            connectors.add(new Connector(initialConnector, factory));
-         }
-      }
-
-      public synchronized void disconnect()
-      {
-         if (connectors != null)
-         {
-            for (Connector connector : connectors)
-            {
-               connector.disconnect();
-            }
-         }
-      }
-
-      public void finalize() throws Throwable
-      {
-         if (!closed && finalizeCheck)
-         {
-            log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
-                     System.identityHashCode(this));
-
-            log.warn("The ServerLocator you didn't close was created here:", e);
-            
-            if (ServerLocatorImpl.finalizeCallback != null)
-            {
-               ServerLocatorImpl.finalizeCallback.run();
-            }
-
-            close();
-         }
-
-         super.finalize();
-      }
-
-      class Connector implements Callable<ClientSessionFactory>
-      {
-         private TransportConfiguration initialConnector;
-
-         private volatile ClientSessionFactoryInternal factory;
-
-         private boolean isConnected = false;
-
-         private boolean interrupted = false;
-
-         private Exception e;
-
-         public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
-         {
-            this.initialConnector = initialConnector;
-            this.factory = factory;
-         }
-
-         public ClientSessionFactory call() throws HornetQException
-         {
-            try
-            {
-               factory.connect(reconnectAttempts, failoverOnInitialConnection);
-            }
-            catch (HornetQException e)
-            {
-               if (!interrupted)
-               {
-                  this.e = e;
-                  throw e;
-               }
-               /*if(factory != null)
-               {
-                  factory.close();
-                  factory = null;
-               }*/
-               return null;
-            }
-            isConnected = true;
-            for (Connector connector : connectors)
-            {
-               if (!connector.isConnected())
-               {
-                  connector.disconnect();
-               }
-            }
-            return factory;
-         }
-
-         public boolean isConnected()
-         {
-            return isConnected;
-         }
-
-         public void disconnect()
-         {
-            interrupted = true;
-
-            if (factory != null)
-            {
-               factory.causeExit();
-               factory.close();
-               factory = null;
-            }
-         }
-      }
-   }
-}

Modified: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -57,4 +57,6 @@
    void setBackup(boolean backup);
 
    Topology getTopology();
+   
+   boolean isStaticDirectConnection(TransportConfiguration con);
 }

Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A SimpleUDPServerLocatorImpl
+ * 
+ * @author Tim Fox
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public class SimpleUDPServerLocatorImpl extends AbstractServerLocator implements DiscoveryListener
+{
+   private static final long serialVersionUID = -1615857864410205260L;
+
+   private static final Logger log = Logger.getLogger(SimpleUDPServerLocatorImpl.class);
+
+   private String discoveryGroupName;
+   
+   private InetAddress localBindAddress;
+   
+   private InetAddress groupAddress;
+   
+   private int groupPort;
+   
+   private long refreshTimeout;
+   
+   private long initialWaitTimeout;
+   
+   private DiscoveryGroup discoveryGroup;
+
+   private volatile boolean closing;
+   
+   // To be called when there are ServerLocator being finalized.
+   // To be used on test assertions
+   public static Runnable finalizeCallback = null;
+
+   private synchronized void initialise() throws Exception
+   {
+      if (!isReadOnly())
+      {
+         setThreadPools();
+
+         instantiateLoadBalancingPolicy();
+
+         this.discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+   
+         Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+
+         String lbStr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
+
+         if (lbStr != null)
+         {
+            this.localBindAddress = InetAddress.getByName(lbStr);
+         }
+         else
+         {
+            this.localBindAddress = null;
+         }
+
+         this.groupAddress = InetAddress.getByName(ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params));
+         this.groupPort = ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
+         this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
+         this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
+         
+         discoveryGroup = new DiscoveryGroupImpl(getNodeID(),
+                                                 this.discoveryGroupName,
+                                                 this.localBindAddress,
+                                                 this.groupAddress,
+                                                 this.groupPort,
+                                                 this.refreshTimeout);
+
+         discoveryGroup.registerListener(this);
+
+         discoveryGroup.start();
+
+         setReadOnly(true);
+      }
+   }
+
+   public SimpleUDPServerLocatorImpl(final boolean useHA,
+                             final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+   {
+      super(useHA, discoveryGroupConfiguration);
+   }
+
+   public void start(Executor executor) throws Exception
+   {
+      initialise();
+
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               connect();
+            }
+            catch (Exception e)
+            {
+               if (!closing)
+               {
+                  log.warn("did not connect the cluster connection to other nodes", e);
+               }
+            }
+         }
+      });
+   }
+
+   public ClientSessionFactory connect() throws Exception
+   {
+      ClientSessionFactoryInternal sf;
+
+      // wait for discovery group to get the list of initial connectors
+      sf = (ClientSessionFactoryInternal)createSessionFactory();
+
+      addFactory(sf);
+      return sf;
+   }
+
+   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+   {
+      if (isClosed())
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+                                                                          transportConfiguration,
+                                                                          getCallTimeout(),
+                                                                          getClientFailureCheckPeriod(),
+                                                                          getConnectionTTL(),
+                                                                          getRetryInterval(),
+                                                                          getRetryIntervalMultiplier(),
+                                                                          getMaxRetryInterval(),
+                                                                          getReconnectAttempts(),
+                                                                          getThreadPool(),
+                                                                          getScheduledThreadPool(),
+                                                                          getInterceptors());
+
+      factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+
+      addFactory(factory);
+
+      return factory;
+   }
+
+   public ClientSessionFactory createSessionFactory() throws Exception
+   {
+      if (isClosed())
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      if (getInitialConnectors() == null)
+      {
+         // Wait for an initial broadcast to give us at least one node in the cluster
+         long timeout = isClusterConnection() ? 0 : this.initialWaitTimeout;
+         boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+         if (!ok)
+         {
+            throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                       "Timed out waiting to receive initial broadcast from cluster");
+         }
+      }
+
+      ClientSessionFactoryInternal factory = null;
+
+      synchronized (this)
+      {
+         boolean retry;
+         int attempts = 0;
+         do
+         {
+            retry = false;
+
+            TransportConfiguration tc = selectConnector();
+
+            // try each factory in the list until we find one which works
+
+            try
+            {
+               factory = new ClientSessionFactoryImpl(this,
+                                                      tc,
+                                                      getCallTimeout(),
+                                                      getClientFailureCheckPeriod(),
+                                                      getConnectionTTL(),
+                                                      getRetryInterval(),
+                                                      getRetryIntervalMultiplier(),
+                                                      getMaxRetryInterval(),
+                                                      getReconnectAttempts(),
+                                                      getThreadPool(),
+                                                      getScheduledThreadPool(),
+                                                      getInterceptors());
+               factory.connect(getInitialConnectAttempts(), isFailoverOnInitialConnection());
+            }
+            catch (HornetQException e)
+            {
+               factory.close();
+               factory = null;
+               if (e.getCode() == HornetQException.NOT_CONNECTED)
+               {
+                  attempts++;
+
+                  if (attempts == getConnectorLength())
+                  {
+                     throw new HornetQException(HornetQException.NOT_CONNECTED,
+                                                "Cannot connect to server(s). Tried with all available servers.");
+                  }
+                  retry = true;
+               }
+               else
+               {
+                  throw e;
+               }
+            }
+         }
+         while (retry);
+
+         if (isHA())
+         {
+            long toWait = 30000;
+            long start = System.currentTimeMillis();
+            while (!isReceivedTopology() && toWait > 0)
+            {
+               // Now wait for the topology
+
+               try
+               {
+                  wait(toWait);
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+
+               long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+
+            if (toWait <= 0)
+            {
+               throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                          "Timed out waiting to receive cluster topology");
+            }
+         }
+
+         addFactory(factory);
+
+         return factory;
+      }
+   }
+
+   public void close()
+   {
+      if (isClosed())
+      {
+         return;
+      }
+
+      closing = true;
+
+      try
+      {
+         discoveryGroup.stop();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to stop discovery group", e);
+      }
+
+      super.close();
+   }
+
+   public synchronized void connectorsChanged()
+   {
+      List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
+
+      TransportConfiguration[] initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+                                                                                               newConnectors.size());
+      int count = 0;
+      for (DiscoveryEntry entry : newConnectors)
+      {
+         initialConnectors[count++] = entry.getConnector();
+      }
+
+      if (isHA() && isClusterConnection() && !isReceivedTopology() && initialConnectors.length > 0)
+      {
+         // FIXME the node is alone in the cluster. We create a connection to the new node
+         // to trigger the node notification to form the cluster.
+         try
+         {
+            connect();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+         }
+      }
+
+      setInitialConnectors(initialConnectors);
+   }
+}


Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,450 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A SimpleUDPServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public class StaticServerLocatorImpl extends AbstractServerLocator
+{
+   private static final long serialVersionUID = -1615857864410205260L;
+
+   private static final Logger log = Logger.getLogger(StaticServerLocatorImpl.class);
+
+   private StaticConnector staticConnector = new StaticConnector();
+
+   private volatile boolean closing;
+   
+   private final Exception e = new Exception();
+   
+   // To be called when there are ServerLocator being finalized.
+   // To be used on test assertions
+   public static Runnable finalizeCallback = null;
+
+   private synchronized void initialise() throws Exception
+   {
+      if (!isReadOnly())
+      {
+         setThreadPools();
+
+         instantiateLoadBalancingPolicy();
+
+         Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+         TransportConfiguration[] initialConnectors = (TransportConfiguration[])params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME);
+         setInitialConnectors(initialConnectors);
+         
+         setReadOnly(true);
+      }
+   }
+
+   public StaticServerLocatorImpl(final boolean useHA,
+                             final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+   {
+      super(useHA, discoveryGroupConfiguration);
+      
+      e.fillInStackTrace();
+   }
+
+   public void start(Executor executor) throws Exception
+   {
+      initialise();
+
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               connect();
+            }
+            catch (Exception e)
+            {
+               if (!closing)
+               {
+                  log.warn("did not connect the cluster connection to other nodes", e);
+               }
+            }
+         }
+      });
+   }
+
+   public ClientSessionFactory connect() throws Exception
+   {
+      ClientSessionFactoryInternal sf;
+
+      sf = (ClientSessionFactoryInternal)staticConnector.connect();
+
+      addFactory(sf);
+      return sf;
+   }
+
+   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+   {
+      if (isClosed())
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+                                                                          transportConfiguration,
+                                                                          getCallTimeout(),
+                                                                          getClientFailureCheckPeriod(),
+                                                                          getConnectionTTL(),
+                                                                          getRetryInterval(),
+                                                                          getRetryIntervalMultiplier(),
+                                                                          getMaxRetryInterval(),
+                                                                          getReconnectAttempts(),
+                                                                          getThreadPool(),
+                                                                          getScheduledThreadPool(),
+                                                                          getInterceptors());
+
+      factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+
+      addFactory(factory);
+
+      return factory;
+   }
+
+   public ClientSessionFactory createSessionFactory() throws Exception
+   {
+      if (isClosed())
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      ClientSessionFactoryInternal factory = null;
+
+      synchronized (this)
+      {
+         boolean retry;
+         int attempts = 0;
+         do
+         {
+            retry = false;
+
+            TransportConfiguration tc = selectConnector();
+
+            // try each factory in the list until we find one which works
+
+            try
+            {
+               factory = new ClientSessionFactoryImpl(this,
+                                                      tc,
+                                                      getCallTimeout(),
+                                                      getClientFailureCheckPeriod(),
+                                                      getConnectionTTL(),
+                                                      getRetryInterval(),
+                                                      getRetryIntervalMultiplier(),
+                                                      getMaxRetryInterval(),
+                                                      getReconnectAttempts(),
+                                                      getThreadPool(),
+                                                      getScheduledThreadPool(),
+                                                      getInterceptors());
+               factory.connect(getInitialConnectAttempts(), isFailoverOnInitialConnection());
+            }
+            catch (HornetQException e)
+            {
+               factory.close();
+               factory = null;
+               if (e.getCode() == HornetQException.NOT_CONNECTED)
+               {
+                  attempts++;
+
+                  if (attempts == getConnectorLength())
+                  {
+                     throw new HornetQException(HornetQException.NOT_CONNECTED,
+                                                "Cannot connect to server(s). Tried with all available servers.");
+                  }
+                  retry = true;
+               }
+               else
+               {
+                  throw e;
+               }
+            }
+         }
+         while (retry);
+
+         if (isHA())
+         {
+            long toWait = 30000;
+            long start = System.currentTimeMillis();
+            while (!isReceivedTopology() && toWait > 0)
+            {
+               // Now wait for the topology
+
+               try
+               {
+                  wait(toWait);
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+
+               long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+
+            if (toWait <= 0)
+            {
+               throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                          "Timed out waiting to receive cluster topology");
+            }
+         }
+
+         addFactory(factory);
+
+         return factory;
+      }
+   }
+
+   public void close()
+   {
+      if (isClosed())
+      {
+         return;
+      }
+
+      closing = true;
+
+      staticConnector.disconnect();
+
+      super.close();
+   }
+
+   public boolean isStaticDirectConnection(TransportConfiguration con)
+   {
+      for(TransportConfiguration connector : getInitialConnectors())
+      {
+         if(connector.equals(con))
+         {
+            return true;
+         }
+      }
+      return false;
+   }
+   
+   class StaticConnector implements Serializable
+   {
+      private List<Connector> connectors;
+
+      public ClientSessionFactory connect() throws HornetQException
+      {
+         if (isClosed())
+         {
+            throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+         }
+
+         try
+         {
+            initialise();
+         }
+         catch (Exception e)
+         {
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+         }
+
+         ClientSessionFactory csf = null;
+
+         createConnectors();
+
+         try
+         {
+            List<Future<ClientSessionFactory>> futures = getThreadPool().invokeAll(connectors);
+            for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
+            {
+               Future<ClientSessionFactory> future = futures.get(i);
+               try
+               {
+                  csf = future.get();
+                  if (csf != null)
+                     break;
+               }
+               catch (Exception e)
+               {
+                  log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
+               }
+            }
+            if (csf == null && !isClosed())
+            {
+               throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+            }
+         }
+         catch (InterruptedException e)
+         {
+            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
+         }
+
+         if (csf == null && !isClosed())
+         {
+            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+         }
+         return csf;
+      }
+
+      private synchronized void createConnectors()
+      {
+         connectors = new ArrayList<Connector>();
+         for (TransportConfiguration initialConnector : getInitialConnectors())
+         {
+            ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(StaticServerLocatorImpl.this,
+                                                                                initialConnector,
+                                                                                getCallTimeout(),
+                                                                                getClientFailureCheckPeriod(),
+                                                                                getConnectionTTL(),
+                                                                                getRetryInterval(),
+                                                                                getRetryIntervalMultiplier(),
+                                                                                getMaxRetryInterval(),
+                                                                                getReconnectAttempts(),
+                                                                                getThreadPool(),
+                                                                                getScheduledThreadPool(),
+                                                                                getInterceptors());
+            connectors.add(new Connector(initialConnector, factory));
+         }
+      }
+
+      public synchronized void disconnect()
+      {
+         if (connectors != null)
+         {
+            for (Connector connector : connectors)
+            {
+               connector.disconnect();
+            }
+         }
+      }
+
+      public void finalize() throws Throwable
+      {
+         if (!isClosed() && doFinalizeCheck())
+         {
+            log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
+                     System.identityHashCode(this));
+
+            log.warn("The ServerLocator you didn't close was created here:", e);
+            
+            if (StaticServerLocatorImpl.finalizeCallback != null)
+            {
+               StaticServerLocatorImpl.finalizeCallback.run();
+            }
+
+            close();
+         }
+
+         super.finalize();
+      }
+
+      class Connector implements Callable<ClientSessionFactory>
+      {
+         private TransportConfiguration initialConnector;
+
+         private volatile ClientSessionFactoryInternal factory;
+
+         private boolean isConnected = false;
+
+         private boolean interrupted = false;
+
+         private Exception e;
+
+         public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
+         {
+            this.initialConnector = initialConnector;
+            this.factory = factory;
+         }
+
+         public ClientSessionFactory call() throws HornetQException
+         {
+            try
+            {
+               factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+            }
+            catch (HornetQException e)
+            {
+               if (!interrupted)
+               {
+                  this.e = e;
+                  throw e;
+               }
+               /*if(factory != null)
+               {
+                  factory.close();
+                  factory = null;
+               }*/
+               return null;
+            }
+            isConnected = true;
+            for (Connector connector : connectors)
+            {
+               if (!connector.isConnected())
+               {
+                  connector.disconnect();
+               }
+            }
+            return factory;
+         }
+
+         public boolean isConnected()
+         {
+            return isConnected;
+         }
+
+         public void disconnect()
+         {
+            interrupted = true;
+
+            if (factory != null)
+            {
+               factory.causeExit();
+               factory.close();
+               factory = null;
+            }
+         }
+      }
+   }
+}


Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -42,6 +42,8 @@
    private final boolean forwardWhenNoConsumers;
 
    private final List<String> staticConnectors;
+   
+   private final List<String> allowableConnectors;
 
    private final String discoveryGroupName;
 
@@ -49,7 +51,7 @@
 
    private final int confirmationWindowSize;
 
-   private final boolean allowDirectConnectionsOnly;
+   private final boolean allowableConnectionsOnly;
 
    public ClusterConnectionConfiguration(final String name,
                                          final String address,
@@ -60,42 +62,22 @@
                                          final int maxHops,
                                          final int confirmationWindowSize,
                                          final List<String> staticConnectors,
-                                         final boolean allowDirectConnectionsOnly)
+                                         final String discoveryGroupName,
+                                         final boolean allowableConnectionsOnly,
+                                         final List<String> allowableConnectorNames)
    {
       this.name = name;
       this.address = address;
       this.connectorName = connectorName;
       this.retryInterval = retryInterval;
-      this.staticConnectors = staticConnectors;
       this.duplicateDetection = duplicateDetection;
       this.forwardWhenNoConsumers = forwardWhenNoConsumers;
-      discoveryGroupName = null;
-      this.maxHops = maxHops;
-      this.confirmationWindowSize = confirmationWindowSize;
-      this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
-   }
-
-   public ClusterConnectionConfiguration(final String name,
-                                         final String address,
-                                         final String connectorName,
-                                         final long retryInterval,
-                                         final boolean duplicateDetection,
-                                         final boolean forwardWhenNoConsumers,
-                                         final int maxHops,
-                                         final int confirmationWindowSize,
-                                         final String discoveryGroupName)
-   {
-      this.name = name;
-      this.address = address;
-      this.connectorName = connectorName;
-      this.retryInterval = retryInterval;
-      this.duplicateDetection = duplicateDetection;
-      this.forwardWhenNoConsumers = forwardWhenNoConsumers;
       this.discoveryGroupName = discoveryGroupName;
-      this.staticConnectors = null;
       this.maxHops = maxHops;
       this.confirmationWindowSize = confirmationWindowSize;
-      allowDirectConnectionsOnly = false;
+      this.staticConnectors = staticConnectors;
+      this.allowableConnectors = allowableConnectorNames;
+      this.allowableConnectionsOnly = allowableConnectionsOnly;
    }
 
    public String getName()
@@ -137,6 +119,11 @@
    {
       return staticConnectors;
    }
+   
+   public List<String> getAllowableConnectors()
+   {
+      return allowableConnectors;
+   }
 
    public String getDiscoveryGroupName()
    {
@@ -148,8 +135,8 @@
       return retryInterval;
    }
 
-   public boolean isAllowDirectConnectionsOnly()
+   public boolean isAllowableConnectionsOnly()
    {
-      return allowDirectConnectionsOnly;
+      return allowableConnectionsOnly;
    }
 }

Modified: branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -22,12 +22,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.StringTokenizer;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
 import org.hornetq.core.config.*;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.config.impl.FileConfiguration;
@@ -939,29 +941,42 @@
    {
       String name = e.getAttribute("name");
 
-      String localBindAddress = XMLConfigurationUtil.getString(e, "local-bind-address", null, Validators.NO_CHECK);
+      String clazz = XMLConfigurationUtil.getString(e, "server-locator-class", null, Validators.NOT_NULL_OR_EMPTY);
 
-      String groupAddress = XMLConfigurationUtil.getString(e, "group-address", null, Validators.NOT_NULL_OR_EMPTY);
+      Map<String, Object> params = new HashMap<String, Object>();
 
-      int groupPort = XMLConfigurationUtil.getInteger(e, "group-port", -1, Validators.MINUS_ONE_OR_GT_ZERO);
+      NodeList paramsNodes = e.getElementsByTagName("param");
 
-      long discoveryInitialWaitTimeout = XMLConfigurationUtil.getLong(e,
-                                                                      "initial-wait-timeout",
-                                                                      HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
-                                                                      Validators.GT_ZERO);
+      for (int i = 0; i < paramsNodes.getLength(); i++)
+      {
+         Node paramNode = paramsNodes.item(i);
 
-      long refreshTimeout = XMLConfigurationUtil.getLong(e,
-                                                         "refresh-timeout",
-                                                         ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT,
-                                                         Validators.GT_ZERO);
+         NamedNodeMap attributes = paramNode.getAttributes();
 
-      DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(name,
-                                                                           localBindAddress,
-                                                                           groupAddress,
-                                                                           groupPort,
-                                                                           refreshTimeout,
-                                                                           discoveryInitialWaitTimeout);
+         Node nkey = attributes.getNamedItem("key");
 
+         String key = nkey.getTextContent();
+
+         Node nValue = attributes.getNamedItem("value");
+
+         params.put(key, nValue.getTextContent());
+      }
+      
+      // discovery-group configuration contains static connector list
+      String connectorList = (String)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME);
+      if(connectorList != null)
+      {
+         List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+         StringTokenizer token = new StringTokenizer(connectorList, ",", false);
+         while(token.hasMoreElements())
+         {
+            connectors.add(mainConfig.getConnectorConfigurations().get(token.nextElement()));
+         }
+         params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, connectors.toArray(new TransportConfiguration[0]));
+      }
+
+      DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(clazz, params, name);
+
       if (mainConfig.getDiscoveryGroupConfigurations().containsKey(name))
       {
          FileConfigurationParser.log.warn("There is already a discovery group with name " + name +
@@ -1008,8 +1023,6 @@
 
       String discoveryGroupName = null;
 
-      List<String> staticConnectorNames = new ArrayList<String>();
-
       boolean allowDirectConnectionsOnly = false;
 
       NodeList children = e.getChildNodes();
@@ -1021,46 +1034,58 @@
          if (child.getNodeName().equals("discovery-group-ref"))
          {
             discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
-         }
-         else if (child.getNodeName().equals("static-connectors"))
-         {
+
             Node attr = child.getAttributes().getNamedItem("allow-direct-connections-only");
             if (attr != null)
             {
                allowDirectConnectionsOnly = "true".equalsIgnoreCase(attr.getNodeValue()) || allowDirectConnectionsOnly;
             }
-            getStaticConnectors(staticConnectorNames, child);
          }
       }
 
-      ClusterConnectionConfiguration config;
-
-      if (discoveryGroupName == null)
+      List<String> staticConnectors = new ArrayList<String>();
+      DiscoveryGroupConfiguration discovery = mainConfig.getDiscoveryGroupConfigurations().get(discoveryGroupName);
+      Map<String,Object> params = discovery.getParams();
+      String connectorList = (String)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME);
+      if(connectorList != null)
       {
-         config = new ClusterConnectionConfiguration(name,
-                                                     address,
-                                                     connectorName,
-                                                     retryInterval,
-                                                     duplicateDetection,
-                                                     forwardWhenNoConsumers,
-                                                     maxHops,
-                                                     confirmationWindowSize,
-                                                     staticConnectorNames,
-                                                     allowDirectConnectionsOnly);
+         StringTokenizer token = new StringTokenizer(connectorList, ",", false);
+         while(token.hasMoreElements())
+         {
+            staticConnectors.add(token.nextToken());
+         }
       }
-      else
+      
+      List<String> allowableConnectionNames = null;
+      if(allowDirectConnectionsOnly)
       {
-         config = new ClusterConnectionConfiguration(name,
-                                                     address,
-                                                     connectorName,
-                                                     retryInterval,
-                                                     duplicateDetection,
-                                                     forwardWhenNoConsumers,
-                                                     maxHops,
-                                                     confirmationWindowSize,
-                                                     discoveryGroupName);
+         if(connectorList == null)
+         {
+            log.warn("allow-direct-connections-only was found, but "
+                     + DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME
+                     + " was not found in discovery-group. ignore.");
+         }
+         else
+         {
+            allowableConnectionNames = staticConnectors;
+         }
       }
+      
+      ClusterConnectionConfiguration config;
 
+      config = new ClusterConnectionConfiguration(name,
+                                                  address,
+                                                  connectorName,
+                                                  retryInterval,
+                                                  duplicateDetection,
+                                                  forwardWhenNoConsumers,
+                                                  maxHops,
+                                                  confirmationWindowSize,
+                                                  staticConnectors,
+                                                  discoveryGroupName,
+                                                  allowDirectConnectionsOnly,
+                                                  allowableConnectionNames);
+
       mainConfig.getClusterConfigurations().add(config);
    }
 

Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -100,90 +100,16 @@
 
    private final String clusterPassword;
 
-   private final ClusterConnector clusterConnector;
+   private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
 
    private ServerLocatorInternal serverLocator;
    
    private final TransportConfiguration connector;
 
-   private final boolean allowDirectConnectionsOnly;
+   private final boolean allowableConnectionsOnly;
 
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
    
-   public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
-                                final TransportConfiguration connector,
-                                final SimpleString name,
-                                final SimpleString address,
-                                final long retryInterval,
-                                final boolean useDuplicateDetection,
-                                final boolean routeWhenNoConsumers,
-                                final int confirmationWindowSize,
-                                final ExecutorFactory executorFactory,
-                                final HornetQServer server,
-                                final PostOffice postOffice,
-                                final ManagementService managementService,
-                                final ScheduledExecutorService scheduledExecutor,
-                                final int maxHops,
-                                final UUID nodeUUID,
-                                final boolean backup,
-                                final String clusterUser,
-                                final String clusterPassword,
-                                final boolean allowDirectConnectionsOnly) throws Exception
-   {
-
-      if (nodeUUID == null)
-      {
-         throw new IllegalArgumentException("node id is null");
-      }
-
-      this.nodeUUID = nodeUUID;
-
-      this.connector = connector;
-
-      this.name = name;
-
-      this.address = address;
-
-      this.retryInterval = retryInterval;
-
-      this.useDuplicateDetection = useDuplicateDetection;
-
-      this.routeWhenNoConsumers = routeWhenNoConsumers;
-
-      this.executorFactory = executorFactory;
-
-      this.server = server;
-
-      this.postOffice = postOffice;
-
-      this.managementService = managementService;
-
-      this.scheduledExecutor = scheduledExecutor;
-
-      this.maxHops = maxHops;
-
-      this.backup = backup;
-
-      this.clusterUser = clusterUser;
-
-      this.clusterPassword = clusterPassword;
-
-      this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
-
-      clusterConnector = new StaticClusterConnector(tcConfigs);
-
-      if (tcConfigs != null && tcConfigs.length > 0)
-      {
-         // a cluster connection will connect to other nodes only if they are directly connected
-         // through a static list of connectors or broadcasting using UDP.
-         if(allowDirectConnectionsOnly)
-         {
-            allowableConnections.addAll(Arrays.asList(tcConfigs));
-         }
-      }
-
-   }
-
    public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
                                 final TransportConfiguration connector,
                                 final SimpleString name,
@@ -202,7 +128,8 @@
                                 final boolean backup,
                                 final String clusterUser,
                                 final String clusterPassword,
-                                final boolean allowDirectConnectionsOnly) throws Exception
+                                final boolean allowableConnectionsOnly,
+                                final TransportConfiguration[] allowableConnections) throws Exception
    {
 
       if (nodeUUID == null)
@@ -242,9 +169,9 @@
 
       this.clusterPassword = clusterPassword;
 
-      this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+      this.allowableConnectionsOnly = allowableConnectionsOnly;
 
-      clusterConnector = new DiscoveryClusterConnector(dg);
+      this.discoveryGroupConfiguration = dg;
    }
 
    public synchronized void start() throws Exception
@@ -346,7 +273,7 @@
 
       backup = false;
 
-      serverLocator = clusterConnector.createServerLocator();
+      serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(this.discoveryGroupConfiguration);
 
 
       if (serverLocator != null)
@@ -433,7 +360,7 @@
       server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
 
       // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
-      if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
+      if (allowableConnectionsOnly && !allowableConnections.contains(connectorPair.a))
       {
          return;
       }
@@ -992,46 +919,4 @@
       
       return out;
    }
-
-   interface ClusterConnector
-   {
-      ServerLocatorInternal createServerLocator();
-   }
-
-   private class StaticClusterConnector implements ClusterConnector
-   {
-      private final TransportConfiguration[] tcConfigs;
-
-      public StaticClusterConnector(TransportConfiguration[] tcConfigs)
-      {
-         this.tcConfigs = tcConfigs;
-      }
-
-      public ServerLocatorInternal createServerLocator()
-      {
-         if(tcConfigs != null && tcConfigs.length > 0)
-         {
-            return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
-         }
-         else
-         {
-            return null;
-         }
-      }
-   }
-
-   private class DiscoveryClusterConnector implements ClusterConnector
-   {
-      private final DiscoveryGroupConfiguration dg;
-
-      public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg)
-      {
-         this.dg = dg;
-      }
-
-      public ServerLocatorInternal createServerLocator()
-      {
-         return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
-      }
-   }
 }

Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -30,7 +30,7 @@
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.client.impl.TopologyMember;
@@ -621,46 +621,23 @@
 
       ServerLocatorInternal serverLocator;
 
-      if (config.getDiscoveryGroupName() != null)
-      {
-         DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
+      DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
                                                                                 .get(config.getDiscoveryGroupName());
-         if (discoveryGroupConfiguration == null)
-         {
-            ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
-                                        "'. The bridge will not be deployed.");
+      if (discoveryGroupConfiguration == null)
+      {
+         ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
+                                       "'. The bridge will not be deployed.");
 
-            return;
-         }
+         return;
+      }
 
-         if (config.isHA())
-         {
-            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
-         }
-         else
-         {
-            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration);
-         }
-
+      if (config.isHA())
+      {
+         serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
       }
       else
       {
-         TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
-         if (tcConfigs == null)
-         {
-            return;
-         }
-
-         if (config.isHA())
-         {
-            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
-         }
-         else
-         {
-            serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
-         }
-
+         serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration);
       }
 
       serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
@@ -739,61 +716,41 @@
 
       ClusterConnectionImpl clusterConnection;
 
-      if (config.getDiscoveryGroupName() != null)
-      {
-         DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
-                                                       .get(config.getDiscoveryGroupName());
+      DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
+                                                      .get(config.getDiscoveryGroupName());
 
-         if (dg == null)
-         {
-            ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+      if (dg == null)
+      {
+         ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
                                         "'. The cluster connection will not be deployed.");
-         }
-
-         clusterConnection = new ClusterConnectionImpl(dg,
-                                                       connector,
-                                                       new SimpleString(config.getName()),
-                                                       new SimpleString(config.getAddress()),
-                                                       config.getRetryInterval(),
-                                                       config.isDuplicateDetection(),
-                                                       config.isForwardWhenNoConsumers(),
-                                                       config.getConfirmationWindowSize(),
-                                                       executorFactory,
-                                                       server,
-                                                       postOffice,
-                                                       managementService,
-                                                       scheduledExecutor,
-                                                       config.getMaxHops(),
-                                                       nodeUUID,
-                                                       backup,
-                                                       server.getConfiguration().getClusterUser(),
-                                                       server.getConfiguration().getClusterPassword(),
-                                                       config.isAllowDirectConnectionsOnly());
       }
-      else
-      {
-         TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null? connectorNameListToArray(config.getStaticConnectors()):null;
 
-         clusterConnection = new ClusterConnectionImpl(tcConfigs,
-                                                       connector,
-                                                       new SimpleString(config.getName()),
-                                                       new SimpleString(config.getAddress()),
-                                                       config.getRetryInterval(),
-                                                       config.isDuplicateDetection(),
-                                                       config.isForwardWhenNoConsumers(),
-                                                       config.getConfirmationWindowSize(),
-                                                       executorFactory,
-                                                       server,
-                                                       postOffice,
-                                                       managementService,
-                                                       scheduledExecutor,
-                                                       config.getMaxHops(),
-                                                       nodeUUID,
-                                                       backup,
-                                                       server.getConfiguration().getClusterUser(),
-                                                       server.getConfiguration().getClusterPassword(),
-                                                       config.isAllowDirectConnectionsOnly());
+      List<String> connectorNames = config.getAllowableConnectors();
+      TransportConfiguration[] allowableConnections = null;
+      if(connectorNames != null)
+      {
+         allowableConnections = connectorNameListToArray(connectorNames);
       }
+      clusterConnection = new ClusterConnectionImpl(dg,
+                                                    connector,
+                                                    new SimpleString(config.getName()),
+                                                    new SimpleString(config.getAddress()),
+                                                    config.getRetryInterval(),
+                                                    config.isDuplicateDetection(),
+                                                    config.isForwardWhenNoConsumers(),
+                                                    config.getConfirmationWindowSize(),
+                                                    executorFactory,
+                                                    server,
+                                                    postOffice,
+                                                    managementService,
+                                                    scheduledExecutor,
+                                                    config.getMaxHops(),
+                                                    nodeUUID,
+                                                    backup,
+                                                    server.getConfiguration().getClusterUser(),
+                                                    server.getConfiguration().getClusterPassword(),
+                                                    config.isAllowableConnectionsOnly(),
+                                                    allowableConnections);
 
       managementService.registerCluster(clusterConnection, config);
 
@@ -809,31 +766,18 @@
 
    private void announceBackup(final ClusterConnectionConfiguration config, final TransportConfiguration connector) throws Exception
    {
-      if (config.getStaticConnectors() != null)
-      {
-         TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
-         backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
-         backupServerLocator.setReconnectAttempts(-1);
-      }
-      else if (config.getDiscoveryGroupName() != null)
-      {
-         DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
+      DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
                                                        .get(config.getDiscoveryGroupName());
 
-         if (dg == null)
-         {
-            ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
-                                        "'. The cluster connection will not be deployed.");
-         }
-
-         backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
-         backupServerLocator.setReconnectAttempts(-1);
-      }
-      else
+      if (dg == null)
       {
-         return;
+         ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+                                       "'. The cluster connection will not be deployed.");
       }
+
+      backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
+      backupServerLocator.setReconnectAttempts(-1);
+
       log.info("announcing backup");
       this.executorFactory.getExecutor().execute(new Runnable()
       {

Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -37,6 +37,8 @@
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
+import org.hornetq.core.client.impl.SimpleUDPServerLocatorImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.ra.inflow.HornetQActivation;
@@ -1403,20 +1405,24 @@
       }
       else if (discoveryAddress != null)
       {
+         // FIXME make discovery stategy pluggable with configuration
+         Map<String,Object> params = new HashMap<String,Object>();
+         
          Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
                                                                               : getDiscoveryPort();
 
-         DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryAddress, discoveryPort);
-
          long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout()
                                                                     : raProperties.getDiscoveryRefreshTimeout();
 
          long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout()
                                                                         : raProperties.getDiscoveryInitialWaitTimeout();
 
-         groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout);
+         params.put(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, discoveryAddress);
+         params.put(DiscoveryGroupConstants.GROUP_PORT_NAME, discoveryPort);
+         params.put(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, initialTimeout);
+         params.put(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, initialTimeout);
 
-         groupConfiguration.setRefreshTimeout(refreshTimeout);
+         DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(SimpleUDPServerLocatorImpl.class.getName(), params, null);
 
          if (ha)
          {

Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -32,7 +32,7 @@
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -189,7 +189,7 @@
       {
          configs[i] = createTransportConfiguration(isNetty(), false, generateParams(nodes[i], isNetty()));
       }
-      return new ServerLocatorImpl(true, configs);
+      return new AbstractServerLocator(true, configs);
    }
    
    // Private -------------------------------------------------------

Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java	2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java	2011-01-27 14:06:20 UTC (rev 10150)
@@ -27,7 +27,7 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.jms.HornetQJMSClient;
 import org.hornetq.api.jms.JMSFactoryType;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.tests.util.JMSTestBase;
@@ -61,7 +61,7 @@
 
       final AtomicInteger valueGC = new AtomicInteger(0);
 
-      ServerLocatorImpl.finalizeCallback = new Runnable()
+      AbstractServerLocator.finalizeCallback = new Runnable()
       {
          public void run()
          {
@@ -85,7 +85,7 @@
       }
       finally
       {
-         ServerLocatorImpl.finalizeCallback = null;
+         AbstractServerLocator.finalizeCallback = null;
       }
 
       assertEquals("The code is throwing exceptions", 0, valueGC.get());



More information about the hornetq-commits mailing list