[hornetq-commits] JBoss hornetq SVN: r11397 - in branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq: api/core/client and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 22 15:24:22 EDT 2011


Author: igarashitm
Date: 2011-09-22 15:24:22 -0400 (Thu, 22 Sep 2011)
New Revision: 11397

Added:
   branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java
   branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java
   branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
   branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
Removed:
   branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Modified:
   branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
   branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
Log:
Devided the ServerLocatorImpl into 3 classes, AbstractServerLocator, StaticServerLocatorImpl and SimpleUDPServerLocatorImpl


Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java	2011-09-22 15:30:57 UTC (rev 11396)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java	2011-09-22 19:24:22 UTC (rev 11397)
@@ -14,6 +14,7 @@
 package org.hornetq.api.core;
 
 import java.io.Serializable;
+import java.util.Map;
 
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.logging.Logger;
@@ -35,120 +36,36 @@
    private static final Logger log = Logger.getLogger(DiscoveryGroupConfiguration.class);
 
 
-   private String name;
+   private final String name;
    
-   private String localBindAddress;
+   private final String serverLocatorClassName;
 
-   private String groupAddress;
+   private final Map<String, Object> params;
 
-   private int groupPort;
-
-   private long refreshTimeout;
-   
-   private long discoveryInitialWaitTimeout;
-
-   public DiscoveryGroupConfiguration(final String name,
-                                      final String localBindAddress,
-                                      final String groupAddress,
-                                      final int groupPort,
-                                      final long refreshTimeout,
-                                      final long discoveryInitialWaitTimeout)
+   public DiscoveryGroupConfiguration(final String clazz, final Map<String, Object> params, final String name)
    {
+      this.serverLocatorClassName = clazz;
+      
+      this.params = params;
+      
       this.name = name;
-      this.groupAddress = groupAddress;
-      this.localBindAddress = localBindAddress;
-      this.groupPort = groupPort;
-      this.refreshTimeout = refreshTimeout;
-      this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
    }
 
-   public DiscoveryGroupConfiguration(final String groupAddress,
-                                      final int groupPort)
+   public String getServerLocatorClassName()
    {
-      this(UUIDGenerator.getInstance().generateStringUUID(), null, groupAddress, groupPort, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
+      return this.serverLocatorClassName;
    }
-
-   public String getName()
-   {
-      return name;
-   }
    
-   public String getLocalBindAddress()
+   public Map<String, Object> getParams()
    {
-      return localBindAddress;
+      return this.params;
    }
-
-   public String getGroupAddress()
+  
+   public String getName()
    {
-      return groupAddress;
+      return this.name;
    }
-
-   public int getGroupPort()
-   {
-      return groupPort;
-   }
-
-   public long getRefreshTimeout()
-   {
-      return refreshTimeout;
-   }
-
-   /**
-    * @param name the name to set
-    */
-   public void setName(final String name)
-   {
-      this.name = name;
-   }
    
-   /**
-    * @param localBindAddress the localBindAddress to set
-    */
-   public void setLocalBindAdress(final String localBindAddress)
-   {
-      this.localBindAddress = localBindAddress;
-   }
-
-   /**
-    * @param groupAddress the groupAddress to set
-    */
-   public void setGroupAddress(final String groupAddress)
-   {
-      this.groupAddress = groupAddress;
-   }
-
-   /**
-    * @param groupPort the groupPort to set
-    */
-   public void setGroupPort(final int groupPort)
-   {
-      this.groupPort = groupPort;
-   }
-
-   /**
-    * @param refreshTimeout the refreshTimeout to set
-    */
-   public void setRefreshTimeout(final long refreshTimeout)
-   {
-      this.refreshTimeout = refreshTimeout;
-   }
-
-   /**
-    * @return the discoveryInitialWaitTimeout
-    */
-   public long getDiscoveryInitialWaitTimeout()
-   {
-      return discoveryInitialWaitTimeout;
-   }
-
-   /**
-    * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout to set
-    */
-   public void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout)
-   {
-      this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
-   }
-
    @Override
    public boolean equals(Object o)
    {
@@ -157,11 +74,18 @@
 
       DiscoveryGroupConfiguration that = (DiscoveryGroupConfiguration) o;
 
-      if (discoveryInitialWaitTimeout != that.discoveryInitialWaitTimeout) return false;
-      if (groupPort != that.groupPort) return false;
-      if (refreshTimeout != that.refreshTimeout) return false;
-      if (groupAddress != null ? !groupAddress.equals(that.groupAddress) : that.groupAddress != null) return false;
-      if (localBindAddress != null ? !localBindAddress.equals(that.localBindAddress) : that.localBindAddress != null)
+      if (this.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME)
+               != that.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME)) return false;
+      if (this.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME)
+               != that.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME)) return false;
+      if (this.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME)
+               != that.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME)) return false;
+      if (this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME) != null
+               ? !this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME).equals(that.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME))
+               : that.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME) != null) return false;
+      if (this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME) != null
+               ? !this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME).equals(that.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME))
+               : that.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME) != null)
          return false;
       if (name != null ? !name.equals(that.name) : that.name != null) return false;
 
@@ -171,9 +95,16 @@
    @Override
    public int hashCode()
    {
+      int groupPort = this.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME) != null
+               ? Integer.parseInt((String)this.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME)) : 0;
+      int refreshTimeout = this.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME) != null
+               ? Integer.parseInt((String)this.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME)) : 0;
+      int discoveryInitialWaitTimeout = this.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME) != null
+               ? Integer.parseInt((String)this.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME)) : 0;
+               
       int result = name != null ? name.hashCode() : 0;
-      result = 31 * result + (localBindAddress != null ? localBindAddress.hashCode() : 0);
-      result = 31 * result + (groupAddress != null ? groupAddress.hashCode() : 0);
+      result = 31 * result + (this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME) != null ? this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME).hashCode() : 0);
+      result = 31 * result + (this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME) != null ? this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME).hashCode() : 0);
       result = 31 * result + groupPort;
       result = 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>> 32));
       result = 31 * result + (int) (discoveryInitialWaitTimeout ^ (discoveryInitialWaitTimeout >>> 32));
@@ -186,17 +117,18 @@
    @Override
    public String toString()
    {
-      return "DiscoveryGroupConfiguration [discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout +
+      return "DiscoveryGroupConfiguration [discoveryInitialWaitTimeout=" +
+             this.params.get(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME) +
              ", groupAddress=" +
-             groupAddress +
+             this.params.get(DiscoveryGroupConstants.GROUP_ADDRESS_NAME) +
              ", groupPort=" +
-             groupPort +
+             this.params.get(DiscoveryGroupConstants.GROUP_PORT_NAME) +
              ", localBindAddress=" +
-             localBindAddress +
+             this.params.get(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME) +
              ", name=" +
              name +
              ", refreshTimeout=" +
-             refreshTimeout +
+             this.params.get(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME) +
              "]";
    }
    

Added: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java	                        (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java	2011-09-22 19:24:22 UTC (rev 11397)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ */
+public class DiscoveryGroupConstants
+{
+   // for static discovery
+   public static final String STATIC_CONNECTORS_CONNECTOR_NAMES_NAME = "static-connector-names";
+   public static final String STATIC_CONNECTORS_LIST_NAME = "static-connector-list";
+   
+   // for simple UDP discovery
+   public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+   public static final String GROUP_ADDRESS_NAME = "group-address";
+   public static final String GROUP_PORT_NAME = "group-port";
+   public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+   public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}

Modified: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java	2011-09-22 15:30:57 UTC (rev 11396)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java	2011-09-22 19:24:22 UTC (rev 11397)
@@ -12,10 +12,18 @@
  */
 package org.hornetq.api.core.client;
 
+import java.lang.reflect.Constructor;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.UUIDGenerator;
 
 /**
  * Utility class for creating HornetQ {@link ClientSessionFactory} objects.
@@ -28,6 +36,8 @@
  */
 public class HornetQClient
 {
+   private static final Logger log = Logger.getLogger(HornetQClient.class);
+   
    public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
 
    public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 30000;
@@ -109,7 +119,10 @@
     */
    public static ServerLocator createServerLocatorWithoutHA(TransportConfiguration... transportConfigurations)
    {
-      return new ServerLocatorImpl(false, transportConfigurations);
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, Arrays.asList(transportConfigurations));
+      DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, UUIDGenerator.getInstance().generateStringUUID());
+      return createServerLocatorWithoutHA(config);
    }
    
    /**
@@ -117,13 +130,27 @@
     * 
     * The UDP address and port are used to listen for live servers in the cluster
     * 
-    * @param discoveryAddress The UDP group address to listen for updates
-    * @param discoveryPort the UDP port to listen for updates
+    * @param groupConfiguration The configuration for server discovery
     * @return the ServerLocator
     */
    public static ServerLocator createServerLocatorWithoutHA(final DiscoveryGroupConfiguration groupConfiguration)
    {
-      return new ServerLocatorImpl(false, groupConfiguration);
+      ServerLocator serverLocator = null;
+      String className = groupConfiguration.getServerLocatorClassName();
+      try
+      {
+         ClassLoader loader = Thread.currentThread().getContextClassLoader();
+         Class<?> clazz = loader.loadClass(className);
+         Constructor<?> constructor = clazz.getConstructor(boolean.class, DiscoveryGroupConfiguration.class);
+         serverLocator = (ServerLocator)constructor.newInstance(Boolean.FALSE, groupConfiguration);
+      }
+      catch(Exception e)
+      {
+         log.fatal("Could not instantiate ServerLocator implementation class: ", e);
+         return null;
+      }
+      
+      return serverLocator;
    }
    
    /**
@@ -137,7 +164,10 @@
     */
    public static ServerLocator createServerLocatorWithHA(TransportConfiguration... initialServers)
    {
-      return new ServerLocatorImpl(true, initialServers);
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, Arrays.asList(initialServers));
+      DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, UUIDGenerator.getInstance().generateStringUUID());
+      return createServerLocatorWithHA(config);
    }
    
    /**
@@ -152,7 +182,22 @@
     */
    public static ServerLocator createServerLocatorWithHA(final DiscoveryGroupConfiguration groupConfiguration)
    {
-      return new ServerLocatorImpl(true, groupConfiguration);
+      ServerLocator serverLocator = null;
+      String className = groupConfiguration.getServerLocatorClassName();
+         
+      try{
+         ClassLoader loader = Thread.currentThread().getContextClassLoader();
+         Class<?> clazz = loader.loadClass(className);
+         Constructor<?> constructor = clazz.getConstructor(boolean.class, DiscoveryGroupConfiguration.class);
+         serverLocator = (ServerLocator)constructor.newInstance(Boolean.TRUE, groupConfiguration);
+      }
+      catch(Exception e)
+      {
+         log.fatal("Could not instantiate ServerLocator implementation class", e);
+         return null;
+      }
+      
+      return serverLocator;
    }
    
 

Copied: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java (from rev 11395, branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java)
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java	                        (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java	2011-09-22 19:24:22 UTC (rev 11397)
@@ -0,0 +1,1442 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A AbstractServerLocator, was derived from ServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public abstract class AbstractServerLocator implements ServerLocatorInternal, DiscoveryListener, Serializable
+{
+   private static final long serialVersionUID = -1615857864410205260L;
+
+   private static final Logger log = Logger.getLogger(AbstractServerLocator.class);
+
+   private final boolean ha;
+
+   private boolean finalizeCheck = true;
+
+   private boolean clusterConnection;
+
+   private transient String identity;
+
+   private Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+
+   private TransportConfiguration[] initialConnectors;
+
+   private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+
+   private final Topology topology;
+
+   private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
+
+   private boolean receivedTopology;
+
+   private boolean compressLargeMessage;
+
+   // if the system should shutdown the pool when shutting down
+   private transient boolean shutdownPool;
+
+   private ExecutorService threadPool;
+
+   private ScheduledExecutorService scheduledThreadPool;
+
+   private DiscoveryGroup discoveryGroup;
+
+   private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+   private boolean readOnly;
+
+   // Settable attributes:
+
+   private boolean cacheLargeMessagesClient;
+
+   private long clientFailureCheckPeriod;
+
+   private long connectionTTL;
+
+   private long callTimeout;
+
+   private int minLargeMessageSize;
+
+   private int consumerWindowSize;
+
+   private int consumerMaxRate;
+
+   private int confirmationWindowSize;
+
+   private int producerWindowSize;
+
+   private int producerMaxRate;
+
+   private boolean blockOnAcknowledge;
+
+   private boolean blockOnDurableSend;
+
+   private boolean blockOnNonDurableSend;
+
+   private boolean autoGroup;
+
+   private boolean preAcknowledge;
+
+   private String connectionLoadBalancingPolicyClassName;
+
+   private int ackBatchSize;
+
+   private boolean useGlobalPools;
+
+   private int scheduledThreadPoolMaxSize;
+
+   private int threadPoolMaxSize;
+
+   private long retryInterval;
+
+   private double retryIntervalMultiplier;
+
+   private long maxRetryInterval;
+
+   private int reconnectAttempts;
+
+   private int initialConnectAttempts;
+
+   private boolean failoverOnInitialConnection;
+
+   private int initialMessagePacketSize;
+
+   private volatile boolean closed;
+
+   private volatile boolean closing;
+
+   private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+
+   private static ExecutorService globalThreadPool;
+
+   private Executor startExecutor;
+
+   private static ScheduledExecutorService globalScheduledThreadPool;
+
+   private AfterConnectInternalListener afterConnectListener;
+
+   private String groupID;
+
+   private String nodeID;
+
+   private TransportConfiguration clusterTransportConfiguration;
+
+   private boolean backup;
+
+   private final Exception e = new Exception();
+
+   // To be called when there are ServerLocator being finalized.
+   // To be used on test assertions
+   public static Runnable finalizeCallback = null;
+
+   public static synchronized void clearThreadPools()
+   {
+
+      if (globalThreadPool != null)
+      {
+         globalThreadPool.shutdown();
+         try
+         {
+            if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS))
+            {
+               throw new IllegalStateException("Couldn't finish the globalThreadPool");
+            }
+         }
+         catch (InterruptedException e)
+         {
+         }
+         finally
+         {
+            globalThreadPool = null;
+         }
+      }
+
+      if (globalScheduledThreadPool != null)
+      {
+         globalScheduledThreadPool.shutdown();
+         try
+         {
+            if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS))
+            {
+               throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
+            }
+         }
+         catch (InterruptedException e)
+         {
+         }
+         finally
+         {
+            globalScheduledThreadPool = null;
+         }
+      }
+   }
+
+   private static synchronized ExecutorService getGlobalThreadPool()
+   {
+      if (globalThreadPool == null)
+      {
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
+
+         globalThreadPool = Executors.newCachedThreadPool(factory);
+      }
+
+      return globalThreadPool;
+   }
+
+   public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+   {
+      if (globalScheduledThreadPool == null)
+      {
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
+                                                          true,
+                                                          getThisClassLoader());
+
+         globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+                                                                      factory);
+      }
+
+      return globalScheduledThreadPool;
+   }
+
+   private void setThreadPools()
+   {
+      if (threadPool != null)
+      {
+         return;
+      }
+      else if (useGlobalPools)
+      {
+         threadPool = getGlobalThreadPool();
+
+         scheduledThreadPool = getGlobalScheduledThreadPool();
+      }
+      else
+      {
+         this.shutdownPool = true;
+
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
+                                                          true,
+                                                          getThisClassLoader());
+
+         if (threadPoolMaxSize == -1)
+         {
+            threadPool = Executors.newCachedThreadPool(factory);
+         }
+         else
+         {
+            threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+         }
+
+         factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
+                                            true,
+                                            getThisClassLoader());
+
+         scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+      }
+   }
+
+   protected ExecutorService getThreadPool()
+   {
+      return this.threadPool;
+   }
+   
+   protected ScheduledExecutorService getScheduledThreadPool()
+   {
+      return this.scheduledThreadPool;
+   }
+   
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return ClientSessionFactoryImpl.class.getClassLoader();
+         }
+      });
+
+   }
+
+   private void instantiateLoadBalancingPolicy()
+   {
+      if (connectionLoadBalancingPolicyClassName == null)
+      {
+         throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
+      }
+
+      AccessController.doPrivileged(new PrivilegedAction<Object>()
+      {
+         public Object run()
+         {
+            ClassLoader loader = Thread.currentThread().getContextClassLoader();
+            try
+            {
+               Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
+               loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+               return null;
+            }
+            catch (Exception e)
+            {
+               throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
+                                                           "\"",
+                                                  e);
+            }
+         }
+      });
+   }
+
+   private AbstractServerLocator(final Topology topology,
+                                 final boolean useHA,
+                                 final DiscoveryGroupConfiguration discoveryGroupConfiguration,
+                                 final TransportConfiguration[] transportConfigs)
+   {
+      e.fillInStackTrace();
+
+      this.topology = topology;
+
+      this.ha = useHA;
+
+      this.discoveryGroupConfiguration = discoveryGroupConfiguration;
+
+      this.initialConnectors = transportConfigs;
+
+      this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
+
+      clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+      connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+      callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+      minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+      consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+      confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+      producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+      producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+      blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+      blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+      blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+      autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+      preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+      ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+      connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+      useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+      scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+      threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+      retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+      retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+      maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+      reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+      initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
+
+      failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
+
+      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+      initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+
+      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+      compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
+
+      clusterConnection = false;
+   }
+
+   /**
+    * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+    *
+    * @param discoveryAddress
+    * @param discoveryPort
+    */
+   public AbstractServerLocator(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
+      if (useHA)
+      {
+         // We only set the owner at where the Topology was created.
+         // For that reason we can't set it at the main constructor
+         topology.setOwner(this);
+      }
+   }
+
+   /**
+    * Create a ServerLocatorImpl using a static list of live servers
+    *
+    * @param transportConfigs
+    */
+   public AbstractServerLocator(final boolean useHA, final TransportConfiguration... transportConfigs)
+   {
+      this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
+      if (useHA)
+      {
+         // We only set the owner at where the Topology was created.
+         // For that reason we can't set it at the main constructor
+         topology.setOwner(this);
+      }
+   }
+
+   /**
+    * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+    *
+    * @param discoveryAddress
+    * @param discoveryPort
+    */
+   public AbstractServerLocator(final Topology topology,
+                                final boolean useHA,
+                                final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      this(topology, useHA, groupConfiguration, null);
+
+   }
+
+   /**
+    * Create a ServerLocatorImpl using a static list of live servers
+    *
+    * @param transportConfigs
+    */
+   public AbstractServerLocator(final Topology topology,
+                                final boolean useHA,
+                                final TransportConfiguration... transportConfigs)
+   {
+      this(topology, useHA, null, transportConfigs);
+   }
+
+   private TransportConfiguration selectConnector()
+   {
+      if (receivedTopology)
+      {
+         int pos = loadBalancingPolicy.select(topologyArray.length);
+
+         Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
+
+         return pair.a;
+      }
+      else
+      {
+         // Get from initialconnectors
+
+         int pos = loadBalancingPolicy.select(initialConnectors.length);
+
+         return initialConnectors[pos];
+      }
+   }
+
+   protected abstract void initialiseInternal() throws Exception;
+   
+   private void initialise() throws Exception
+   {
+      if (!readOnly)
+      {
+         setThreadPools();
+         instantiateLoadBalancingPolicy();
+         
+         initialiseInternal();
+         
+         readOnly = true;
+      }
+   }
+   
+   public void start(Executor executor) throws Exception
+   {
+      initialise();
+      
+      this.startExecutor = executor;
+
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               connect();
+            }
+            catch (Exception e)
+            {
+               if (!closing)
+               {
+                  log.warn("did not connect the cluster connection to other nodes", e);
+               }
+            }
+         }
+      });
+   }
+
+   public Executor getExecutor()
+   {
+      return startExecutor;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
+    */
+   public void disableFinalizeCheck()
+   {
+      finalizeCheck = false;
+   }
+
+   public boolean doFinalizeCheck()
+   {
+      return finalizeCheck;
+   }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
+    */
+   public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+   {
+      this.afterConnectListener = listener;
+   }
+
+   public AfterConnectInternalListener getAfterConnectInternalListener()
+   {
+      return afterConnectListener;
+   }
+
+   public boolean isClosed()
+   {
+      return closed || closing;
+   }
+
+   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+   {
+      if (closed)
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+                                                                          transportConfiguration,
+                                                                          callTimeout,
+                                                                          clientFailureCheckPeriod,
+                                                                          connectionTTL,
+                                                                          retryInterval,
+                                                                          retryIntervalMultiplier,
+                                                                          maxRetryInterval,
+                                                                          reconnectAttempts,
+                                                                          threadPool,
+                                                                          scheduledThreadPool,
+                                                                          interceptors);
+
+      factory.connect(reconnectAttempts, failoverOnInitialConnection);
+
+      addFactory(factory);
+
+      return factory;
+   }
+
+   protected abstract void waitInitialDiscovery() throws Exception;
+   
+   public ClientSessionFactory createSessionFactory() throws Exception
+   {
+      if (closed || closing)
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      waitInitialDiscovery();
+      
+      ClientSessionFactoryInternal factory = null;
+
+      synchronized (this)
+      {
+         boolean retry;
+         int attempts = 0;
+         do
+         {
+            retry = false;
+
+            TransportConfiguration tc = selectConnector();
+
+            // try each factory in the list until we find one which works
+
+            try
+            {
+               factory = new ClientSessionFactoryImpl(this,
+                                                      tc,
+                                                      callTimeout,
+                                                      clientFailureCheckPeriod,
+                                                      connectionTTL,
+                                                      retryInterval,
+                                                      retryIntervalMultiplier,
+                                                      maxRetryInterval,
+                                                      reconnectAttempts,
+                                                      threadPool,
+                                                      scheduledThreadPool,
+                                                      interceptors);
+               factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+            }
+            catch (HornetQException e)
+            {
+               factory.close();
+               factory = null;
+               if (e.getCode() == HornetQException.NOT_CONNECTED)
+               {
+                  attempts++;
+
+                  if (topologyArray != null && attempts == topologyArray.length)
+                  {
+                     throw new HornetQException(HornetQException.NOT_CONNECTED,
+                                                "Cannot connect to server(s). Tried with all available servers.");
+                  }
+                  if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
+                  {
+                     throw new HornetQException(HornetQException.NOT_CONNECTED,
+                                                "Cannot connect to server(s). Tried with all available servers.");
+                  }
+                  retry = true;
+               }
+               else
+               {
+                  throw e;
+               }
+            }
+         }
+         while (retry);
+
+         if (ha || clusterConnection)
+         {
+            long timeout = System.currentTimeMillis() + 30000;
+            while (!AbstractServerLocator.this.closed && !AbstractServerLocator.this.closing &&
+                   !receivedTopology &&
+                   timeout > System.currentTimeMillis())
+            {
+               // Now wait for the topology
+
+               try
+               {
+                  wait(1000);
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+
+            }
+
+            if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
+            {
+               throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                          "Timed out waiting to receive cluster topology");
+            }
+
+         }
+
+         addFactory(factory);
+
+         return factory;
+      }
+
+   }
+
+   public boolean isHA()
+   {
+      return ha;
+   }
+
+   public boolean isCacheLargeMessagesClient()
+   {
+      return cacheLargeMessagesClient;
+   }
+
+   public void setCacheLargeMessagesClient(final boolean cached)
+   {
+      cacheLargeMessagesClient = cached;
+   }
+
+   public long getClientFailureCheckPeriod()
+   {
+      return clientFailureCheckPeriod;
+   }
+
+   public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+   {
+      checkWrite();
+      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+   }
+
+   public long getConnectionTTL()
+   {
+      return connectionTTL;
+   }
+
+   public void setConnectionTTL(final long connectionTTL)
+   {
+      checkWrite();
+      this.connectionTTL = connectionTTL;
+   }
+
+   public long getCallTimeout()
+   {
+      return callTimeout;
+   }
+
+   public void setCallTimeout(final long callTimeout)
+   {
+      checkWrite();
+      this.callTimeout = callTimeout;
+   }
+
+   public int getMinLargeMessageSize()
+   {
+      return minLargeMessageSize;
+   }
+
+   public void setMinLargeMessageSize(final int minLargeMessageSize)
+   {
+      checkWrite();
+      this.minLargeMessageSize = minLargeMessageSize;
+   }
+
+   public int getConsumerWindowSize()
+   {
+      return consumerWindowSize;
+   }
+
+   public void setConsumerWindowSize(final int consumerWindowSize)
+   {
+      checkWrite();
+      this.consumerWindowSize = consumerWindowSize;
+   }
+
+   public int getConsumerMaxRate()
+   {
+      return consumerMaxRate;
+   }
+
+   public void setConsumerMaxRate(final int consumerMaxRate)
+   {
+      checkWrite();
+      this.consumerMaxRate = consumerMaxRate;
+   }
+
+   public int getConfirmationWindowSize()
+   {
+      return confirmationWindowSize;
+   }
+
+   public void setConfirmationWindowSize(final int confirmationWindowSize)
+   {
+      checkWrite();
+      this.confirmationWindowSize = confirmationWindowSize;
+   }
+
+   public int getProducerWindowSize()
+   {
+      return producerWindowSize;
+   }
+
+   public void setProducerWindowSize(final int producerWindowSize)
+   {
+      checkWrite();
+      this.producerWindowSize = producerWindowSize;
+   }
+
+   public int getProducerMaxRate()
+   {
+      return producerMaxRate;
+   }
+
+   public void setProducerMaxRate(final int producerMaxRate)
+   {
+      checkWrite();
+      this.producerMaxRate = producerMaxRate;
+   }
+
+   public boolean isBlockOnAcknowledge()
+   {
+      return blockOnAcknowledge;
+   }
+
+   public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+   {
+      checkWrite();
+      this.blockOnAcknowledge = blockOnAcknowledge;
+   }
+
+   public boolean isBlockOnDurableSend()
+   {
+      return blockOnDurableSend;
+   }
+
+   public void setBlockOnDurableSend(final boolean blockOnDurableSend)
+   {
+      checkWrite();
+      this.blockOnDurableSend = blockOnDurableSend;
+   }
+
+   public boolean isBlockOnNonDurableSend()
+   {
+      return blockOnNonDurableSend;
+   }
+
+   public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+   {
+      checkWrite();
+      this.blockOnNonDurableSend = blockOnNonDurableSend;
+   }
+
+   public boolean isAutoGroup()
+   {
+      return autoGroup;
+   }
+
+   public void setAutoGroup(final boolean autoGroup)
+   {
+      checkWrite();
+      this.autoGroup = autoGroup;
+   }
+
+   public boolean isPreAcknowledge()
+   {
+      return preAcknowledge;
+   }
+
+   public void setPreAcknowledge(final boolean preAcknowledge)
+   {
+      checkWrite();
+      this.preAcknowledge = preAcknowledge;
+   }
+
+   public int getAckBatchSize()
+   {
+      return ackBatchSize;
+   }
+
+   public void setAckBatchSize(final int ackBatchSize)
+   {
+      checkWrite();
+      this.ackBatchSize = ackBatchSize;
+   }
+
+   public boolean isUseGlobalPools()
+   {
+      return useGlobalPools;
+   }
+
+   public void setUseGlobalPools(final boolean useGlobalPools)
+   {
+      checkWrite();
+      this.useGlobalPools = useGlobalPools;
+   }
+
+   public int getScheduledThreadPoolMaxSize()
+   {
+      return scheduledThreadPoolMaxSize;
+   }
+
+   public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+   {
+      checkWrite();
+      this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+   }
+
+   public int getThreadPoolMaxSize()
+   {
+      return threadPoolMaxSize;
+   }
+
+   public void setThreadPoolMaxSize(final int threadPoolMaxSize)
+   {
+      checkWrite();
+      this.threadPoolMaxSize = threadPoolMaxSize;
+   }
+
+   public long getRetryInterval()
+   {
+      return retryInterval;
+   }
+
+   public void setRetryInterval(final long retryInterval)
+   {
+      checkWrite();
+      this.retryInterval = retryInterval;
+   }
+
+   public long getMaxRetryInterval()
+   {
+      return maxRetryInterval;
+   }
+
+   public void setMaxRetryInterval(final long retryInterval)
+   {
+      checkWrite();
+      maxRetryInterval = retryInterval;
+   }
+
+   public double getRetryIntervalMultiplier()
+   {
+      return retryIntervalMultiplier;
+   }
+
+   public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+   {
+      checkWrite();
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+   }
+
+   public int getReconnectAttempts()
+   {
+      return reconnectAttempts;
+   }
+
+   public void setReconnectAttempts(final int reconnectAttempts)
+   {
+      checkWrite();
+      this.reconnectAttempts = reconnectAttempts;
+   }
+
+   public void setInitialConnectAttempts(int initialConnectAttempts)
+   {
+      checkWrite();
+      this.initialConnectAttempts = initialConnectAttempts;
+   }
+
+   public int getInitialConnectAttempts()
+   {
+      return initialConnectAttempts;
+   }
+
+   public boolean isFailoverOnInitialConnection()
+   {
+      return this.failoverOnInitialConnection;
+   }
+
+   public void setFailoverOnInitialConnection(final boolean failover)
+   {
+      checkWrite();
+      this.failoverOnInitialConnection = failover;
+   }
+
+   public String getConnectionLoadBalancingPolicyClassName()
+   {
+      return connectionLoadBalancingPolicyClassName;
+   }
+
+   public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+   {
+      checkWrite();
+      connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+   }
+
+   public TransportConfiguration[] getStaticTransportConfigurations()
+   {
+      return this.initialConnectors;
+   }
+
+   public void setInitialConnectors(TransportConfiguration[] connectors)
+   {
+      this.initialConnectors = connectors; 
+   }
+   
+   public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
+   {
+      return discoveryGroupConfiguration;
+   }
+
+   public List<Interceptor> getInterceptors()
+   {
+      return this.interceptors;
+   }
+   
+   public void addInterceptor(final Interceptor interceptor)
+   {
+      interceptors.add(interceptor);
+   }
+
+   public boolean removeInterceptor(final Interceptor interceptor)
+   {
+      return interceptors.remove(interceptor);
+   }
+
+   public int getInitialMessagePacketSize()
+   {
+      return initialMessagePacketSize;
+   }
+
+   public void setInitialMessagePacketSize(final int size)
+   {
+      checkWrite();
+      initialMessagePacketSize = size;
+   }
+
+   public void setGroupID(final String groupID)
+   {
+      checkWrite();
+      this.groupID = groupID;
+   }
+
+   public String getGroupID()
+   {
+      return groupID;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
+    */
+   public boolean isCompressLargeMessage()
+   {
+      return compressLargeMessage;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
+    */
+   public void setCompressLargeMessage(boolean compress)
+   {
+      this.compressLargeMessage = compress;
+   }
+
+   private void checkWrite()
+   {
+      if (readOnly)
+      {
+         throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
+      }
+   }
+
+   public String getIdentity()
+   {
+      return identity;
+   }
+
+   public void setIdentity(String identity)
+   {
+      this.identity = identity;
+   }
+
+   public void setNodeID(String nodeID)
+   {
+      this.nodeID = nodeID;
+   }
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+
+   public void setClusterConnection(boolean clusterConnection)
+   {
+      this.clusterConnection = clusterConnection;
+   }
+
+   public boolean isClusterConnection()
+   {
+      return clusterConnection;
+   }
+
+   public TransportConfiguration getClusterTransportConfiguration()
+   {
+      return clusterTransportConfiguration;
+   }
+
+   public void setClusterTransportConfiguration(TransportConfiguration tc)
+   {
+      this.clusterTransportConfiguration = tc;
+   }
+
+   public boolean isBackup()
+   {
+      return backup;
+   }
+
+   public void setBackup(boolean backup)
+   {
+      this.backup = backup;
+   }
+
+   @Override
+   protected void finalize() throws Throwable
+   {
+      if (finalizeCheck)
+      {
+         close();
+      }
+
+      super.finalize();
+   }
+
+   public void cleanup()
+   {
+      doClose(false);
+   }
+
+   public void close()
+   {
+      doClose(true);
+   }
+
+   protected abstract void doCloseInternal();
+   
+   protected void doClose(final boolean sendClose)
+   {
+      if (closed)
+      {
+         if (log.isDebugEnabled())
+         {
+            log.debug(this + " is already closed when calling closed");
+         }
+         return;
+      }
+
+      if (log.isDebugEnabled())
+      {
+         log.debug(this + " is calling close", new Exception("trace"));
+      }
+
+      closing = true;
+
+      doCloseInternal();
+      
+      Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
+
+      for (ClientSessionFactory factory : clonedFactory)
+      {
+         if (sendClose)
+         {
+            factory.close();
+         }
+         else
+         {
+            factory.cleanup();
+         }
+      }
+
+      factories.clear();
+
+      if (shutdownPool)
+      {
+         if (threadPool != null)
+         {
+            threadPool.shutdown();
+
+            try
+            {
+               if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+               {
+                  log.warn("Timed out waiting for pool to terminate");
+               }
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+         }
+
+         if (scheduledThreadPool != null)
+         {
+            scheduledThreadPool.shutdown();
+
+            try
+            {
+               if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+               {
+                  log.warn("Timed out waiting for scheduled pool to terminate");
+               }
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+         }
+      }
+      readOnly = false;
+
+      closed = true;
+   }
+
+   /** This is directly called when the connection to the node is gone,
+    *  or when the node sends a disconnection.
+    *  Look for callers of this method! */
+   public void notifyNodeDown(final long eventTime, final String nodeID)
+   {
+
+      if (topology == null)
+      {
+         // there's no topology here
+         return;
+      }
+
+      if (log.isDebugEnabled())
+      {
+         log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
+      }
+
+      if (topology.removeMember(eventTime, nodeID))
+      {
+         if (topology.isEmpty())
+         {
+            // Resetting the topology to its original condition as it was brand new
+            synchronized (this)
+            {
+               topologyArray = null;
+               receivedTopology = false;
+            }
+         }
+         else
+         {
+            updateArraysAndPairs();
+
+            if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+            {
+               // Resetting the topology to its original condition as it was brand new
+               receivedTopology = false;
+            }
+         }
+      }
+
+   }
+
+   public void notifyNodeUp(long uniqueEventID,
+                            final String nodeID,
+                            final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                            final boolean last)
+   {
+      if (topology == null)
+      {
+         // there's no topology
+         return;
+      }
+
+      if (log.isDebugEnabled())
+      {
+         log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
+      }
+
+      TopologyMember member = new TopologyMember(connectorPair.a, connectorPair.b);
+
+      if (topology.updateMember(uniqueEventID, nodeID, member))
+      {
+
+         TopologyMember actMember = topology.getMember(nodeID);
+
+         if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
+         {
+            for (ClientSessionFactory factory : factories)
+            {
+               ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+                                                                          actMember.getConnector().b);
+            }
+         }
+
+         updateArraysAndPairs();
+      }
+
+      if (last)
+      {
+         synchronized (this)
+         {
+            receivedTopology = true;
+            // Notify if waiting on getting topology
+            notifyAll();
+         }
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      if (identity != null)
+      {
+         return "ServerLocatorImpl (identity=" + identity +
+                ") [initialConnectors=" +
+                Arrays.toString(initialConnectors) +
+                ", discoveryGroupConfiguration=" +
+                discoveryGroupConfiguration +
+                "]";
+      }
+      else
+      {
+         return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+                ", discoveryGroupConfiguration=" +
+                discoveryGroupConfiguration +
+                "]";
+      }
+   }
+
+   private synchronized void updateArraysAndPairs()
+   {
+      Collection<TopologyMember> membersCopy = topology.getMembers();
+
+      topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
+                                                                                                membersCopy.size());
+
+      int count = 0;
+      for (TopologyMember pair : membersCopy)
+      {
+         topologyArray[count++] = pair.getConnector();
+      }
+   }
+
+   public synchronized void connectorsChanged()
+   {
+      List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
+
+      this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+                                                                           newConnectors.size());
+
+      int count = 0;
+      for (DiscoveryEntry entry : newConnectors)
+      {
+         this.initialConnectors[count++] = entry.getConnector();
+
+         if (topology != null && topology.getMember(entry.getNodeID()) == null)
+         {
+            TopologyMember member = new TopologyMember(entry.getConnector(), null);
+            // on this case we set it as zero as any update coming from server should be accepted
+            topology.updateMember(0, entry.getNodeID(), member);
+         }
+      }
+
+      if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
+      {
+         // FIXME the node is alone in the cluster. We create a connection to the new node
+         // to trigger the node notification to form the cluster.
+         try
+         {
+            connect();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+         }
+      }
+   }
+
+   public synchronized void factoryClosed(final ClientSessionFactory factory)
+   {
+      factories.remove(factory);
+
+      if (!clusterConnection && factories.isEmpty())
+      {
+         // Go back to using the broadcast or static list
+
+         receivedTopology = false;
+
+         topologyArray = null;
+      }
+   }
+
+   public Topology getTopology()
+   {
+      return topology;
+   }
+
+   public void addClusterTopologyListener(final ClusterTopologyListener listener)
+   {
+      topology.addClusterTopologyListener(listener);
+   }
+
+   public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+   {
+      topology.removeClusterTopologyListener(listener);
+   }
+
+   public synchronized void addFactory(ClientSessionFactoryInternal factory)
+   {
+      if (factory != null)
+      {
+         TransportConfiguration backup = null;
+
+         if (topology != null)
+         {
+            backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+         }
+
+         factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+         factories.add(factory);
+      }
+   }
+
+}

Deleted: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-09-22 15:30:57 UTC (rev 11396)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-09-22 19:24:22 UTC (rev 11397)
@@ -1,1714 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * A ServerLocatorImpl
- *
- * @author Tim Fox
- */
-public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
-{
-   private static final long serialVersionUID = -1615857864410205260L;
-
-   private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
-
-   private final boolean ha;
-
-   private boolean finalizeCheck = true;
-
-   private boolean clusterConnection;
-
-   private transient String identity;
-
-   private Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
-
-   private TransportConfiguration[] initialConnectors;
-
-   private DiscoveryGroupConfiguration discoveryGroupConfiguration;
-
-   private StaticConnector staticConnector = new StaticConnector();
-
-   private final Topology topology;
-
-   private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
-
-   private boolean receivedTopology;
-
-   private boolean compressLargeMessage;
-
-   // if the system should shutdown the pool when shutting down
-   private transient boolean shutdownPool;
-
-   private ExecutorService threadPool;
-
-   private ScheduledExecutorService scheduledThreadPool;
-
-   private DiscoveryGroup discoveryGroup;
-
-   private ConnectionLoadBalancingPolicy loadBalancingPolicy;
-
-   private boolean readOnly;
-
-   // Settable attributes:
-
-   private boolean cacheLargeMessagesClient;
-
-   private long clientFailureCheckPeriod;
-
-   private long connectionTTL;
-
-   private long callTimeout;
-
-   private int minLargeMessageSize;
-
-   private int consumerWindowSize;
-
-   private int consumerMaxRate;
-
-   private int confirmationWindowSize;
-
-   private int producerWindowSize;
-
-   private int producerMaxRate;
-
-   private boolean blockOnAcknowledge;
-
-   private boolean blockOnDurableSend;
-
-   private boolean blockOnNonDurableSend;
-
-   private boolean autoGroup;
-
-   private boolean preAcknowledge;
-
-   private String connectionLoadBalancingPolicyClassName;
-
-   private int ackBatchSize;
-
-   private boolean useGlobalPools;
-
-   private int scheduledThreadPoolMaxSize;
-
-   private int threadPoolMaxSize;
-
-   private long retryInterval;
-
-   private double retryIntervalMultiplier;
-
-   private long maxRetryInterval;
-
-   private int reconnectAttempts;
-
-   private int initialConnectAttempts;
-
-   private boolean failoverOnInitialConnection;
-
-   private int initialMessagePacketSize;
-
-   private volatile boolean closed;
-
-   private volatile boolean closing;
-
-   private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
-
-   private static ExecutorService globalThreadPool;
-
-   private Executor startExecutor;
-
-   private static ScheduledExecutorService globalScheduledThreadPool;
-
-   private AfterConnectInternalListener afterConnectListener;
-
-   private String groupID;
-
-   private String nodeID;
-
-   private TransportConfiguration clusterTransportConfiguration;
-
-   private boolean backup;
-
-   private final Exception e = new Exception();
-
-   // To be called when there are ServerLocator being finalized.
-   // To be used on test assertions
-   public static Runnable finalizeCallback = null;
-
-   public static synchronized void clearThreadPools()
-   {
-
-      if (globalThreadPool != null)
-      {
-         globalThreadPool.shutdown();
-         try
-         {
-            if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS))
-            {
-               throw new IllegalStateException("Couldn't finish the globalThreadPool");
-            }
-         }
-         catch (InterruptedException e)
-         {
-         }
-         finally
-         {
-            globalThreadPool = null;
-         }
-      }
-
-      if (globalScheduledThreadPool != null)
-      {
-         globalScheduledThreadPool.shutdown();
-         try
-         {
-            if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS))
-            {
-               throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
-            }
-         }
-         catch (InterruptedException e)
-         {
-         }
-         finally
-         {
-            globalScheduledThreadPool = null;
-         }
-      }
-   }
-
-   private static synchronized ExecutorService getGlobalThreadPool()
-   {
-      if (globalThreadPool == null)
-      {
-         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
-
-         globalThreadPool = Executors.newCachedThreadPool(factory);
-      }
-
-      return globalThreadPool;
-   }
-
-   public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
-   {
-      if (globalScheduledThreadPool == null)
-      {
-         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
-                                                          true,
-                                                          getThisClassLoader());
-
-         globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-
-                                                                      factory);
-      }
-
-      return globalScheduledThreadPool;
-   }
-
-   private void setThreadPools()
-   {
-      if (threadPool != null)
-      {
-         return;
-      }
-      else if (useGlobalPools)
-      {
-         threadPool = getGlobalThreadPool();
-
-         scheduledThreadPool = getGlobalScheduledThreadPool();
-      }
-      else
-      {
-         this.shutdownPool = true;
-
-         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
-                                                          true,
-                                                          getThisClassLoader());
-
-         if (threadPoolMaxSize == -1)
-         {
-            threadPool = Executors.newCachedThreadPool(factory);
-         }
-         else
-         {
-            threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
-         }
-
-         factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
-                                            true,
-                                            getThisClassLoader());
-
-         scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
-      }
-   }
-
-   private static ClassLoader getThisClassLoader()
-   {
-      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
-      {
-         public ClassLoader run()
-         {
-            return ClientSessionFactoryImpl.class.getClassLoader();
-         }
-      });
-
-   }
-
-   private void instantiateLoadBalancingPolicy()
-   {
-      if (connectionLoadBalancingPolicyClassName == null)
-      {
-         throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
-      }
-
-      AccessController.doPrivileged(new PrivilegedAction<Object>()
-      {
-         public Object run()
-         {
-            ClassLoader loader = Thread.currentThread().getContextClassLoader();
-            try
-            {
-               Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
-               loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
-               return null;
-            }
-            catch (Exception e)
-            {
-               throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
-                                                           "\"",
-                                                  e);
-            }
-         }
-      });
-   }
-
-   private synchronized void initialise() throws Exception
-   {
-      if (!readOnly)
-      {
-         setThreadPools();
-
-         instantiateLoadBalancingPolicy();
-
-         if (discoveryGroupConfiguration != null)
-         {
-            InetAddress groupAddress = InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
-
-            InetAddress lbAddress;
-
-            if (discoveryGroupConfiguration.getLocalBindAddress() != null)
-            {
-               lbAddress = InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
-            }
-            else
-            {
-               lbAddress = null;
-            }
-
-            discoveryGroup = new DiscoveryGroupImpl(nodeID,
-                                                    discoveryGroupConfiguration.getName(),
-                                                    lbAddress,
-                                                    groupAddress,
-                                                    discoveryGroupConfiguration.getGroupPort(),
-                                                    discoveryGroupConfiguration.getRefreshTimeout());
-
-            discoveryGroup.registerListener(this);
-
-            discoveryGroup.start();
-         }
-
-         readOnly = true;
-      }
-   }
-
-   private ServerLocatorImpl(final Topology topology,
-                             final boolean useHA,
-                             final DiscoveryGroupConfiguration discoveryGroupConfiguration,
-                             final TransportConfiguration[] transportConfigs)
-   {
-      e.fillInStackTrace();
-
-      this.topology = topology;
-
-      this.ha = useHA;
-
-      this.discoveryGroupConfiguration = discoveryGroupConfiguration;
-
-      this.initialConnectors = transportConfigs;
-
-      this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
-
-      clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
-      connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
-      callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
-
-      minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
-      consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
-      consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
-
-      confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
-      producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
-      producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
-
-      blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
-      blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
-      blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
-      autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
-
-      preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
-      ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
-
-      connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
-      useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
-
-      scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
-      threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
-      retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
-
-      retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
-      maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
-      reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
-      initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
-
-      failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
-
-      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
-      initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
-
-      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
-      compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
-      clusterConnection = false;
-   }
-
-   /**
-    * Create a ServerLocatorImpl using UDP discovery to lookup cluster
-    *
-    * @param discoveryAddress
-    * @param discoveryPort
-    */
-   public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
-   {
-      this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
-      if (useHA)
-      {
-         // We only set the owner at where the Topology was created.
-         // For that reason we can't set it at the main constructor
-         topology.setOwner(this);
-      }
-   }
-
-   /**
-    * Create a ServerLocatorImpl using a static list of live servers
-    *
-    * @param transportConfigs
-    */
-   public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
-   {
-      this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
-      if (useHA)
-      {
-         // We only set the owner at where the Topology was created.
-         // For that reason we can't set it at the main constructor
-         topology.setOwner(this);
-      }
-   }
-
-   /**
-    * Create a ServerLocatorImpl using UDP discovery to lookup cluster
-    *
-    * @param discoveryAddress
-    * @param discoveryPort
-    */
-   public ServerLocatorImpl(final Topology topology,
-                            final boolean useHA,
-                            final DiscoveryGroupConfiguration groupConfiguration)
-   {
-      this(topology, useHA, groupConfiguration, null);
-
-   }
-
-   /**
-    * Create a ServerLocatorImpl using a static list of live servers
-    *
-    * @param transportConfigs
-    */
-   public ServerLocatorImpl(final Topology topology,
-                            final boolean useHA,
-                            final TransportConfiguration... transportConfigs)
-   {
-      this(topology, useHA, null, transportConfigs);
-   }
-
-   private TransportConfiguration selectConnector()
-   {
-      if (receivedTopology)
-      {
-         int pos = loadBalancingPolicy.select(topologyArray.length);
-
-         Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
-
-         return pair.a;
-      }
-      else
-      {
-         // Get from initialconnectors
-
-         int pos = loadBalancingPolicy.select(initialConnectors.length);
-
-         return initialConnectors[pos];
-      }
-   }
-
-   public void start(Executor executor) throws Exception
-   {
-      initialise();
-
-      this.startExecutor = executor;
-
-      executor.execute(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               connect();
-            }
-            catch (Exception e)
-            {
-               if (!closing)
-               {
-                  log.warn("did not connect the cluster connection to other nodes", e);
-               }
-            }
-         }
-      });
-   }
-
-   public Executor getExecutor()
-   {
-      return startExecutor;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
-    */
-   public void disableFinalizeCheck()
-   {
-      finalizeCheck = false;
-   }
-
-   public ClientSessionFactoryInternal connect() throws Exception
-   {
-      ClientSessionFactoryInternal sf;
-      // static list of initial connectors
-      if (initialConnectors != null && discoveryGroup == null)
-      {
-         sf = (ClientSessionFactoryInternal)staticConnector.connect();
-      }
-      // wait for discovery group to get the list of initial connectors
-      else
-      {
-         sf = (ClientSessionFactoryInternal)createSessionFactory();
-      }
-      addFactory(sf);
-      return sf;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
-    */
-   public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
-   {
-      this.afterConnectListener = listener;
-   }
-
-   public AfterConnectInternalListener getAfterConnectInternalListener()
-   {
-      return afterConnectListener;
-   }
-
-   public boolean isClosed()
-   {
-      return closed || closing;
-   }
-
-   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
-   {
-      if (closed)
-      {
-         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
-      }
-
-      try
-      {
-         initialise();
-      }
-      catch (Exception e)
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-      }
-
-      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
-                                                                          transportConfiguration,
-                                                                          callTimeout,
-                                                                          clientFailureCheckPeriod,
-                                                                          connectionTTL,
-                                                                          retryInterval,
-                                                                          retryIntervalMultiplier,
-                                                                          maxRetryInterval,
-                                                                          reconnectAttempts,
-                                                                          threadPool,
-                                                                          scheduledThreadPool,
-                                                                          interceptors);
-
-      factory.connect(reconnectAttempts, failoverOnInitialConnection);
-
-      addFactory(factory);
-
-      return factory;
-   }
-
-   public ClientSessionFactory createSessionFactory() throws Exception
-   {
-      if (closed || closing)
-      {
-         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
-      }
-
-      try
-      {
-         initialise();
-      }
-      catch (Exception e)
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-      }
-
-      if (initialConnectors == null && discoveryGroup != null)
-      {
-         // Wait for an initial broadcast to give us at least one node in the cluster
-         long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
-         boolean ok = discoveryGroup.waitForBroadcast(timeout);
-
-         if (!ok)
-         {
-            throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                       "Timed out waiting to receive initial broadcast from cluster");
-         }
-      }
-
-      ClientSessionFactoryInternal factory = null;
-
-      synchronized (this)
-      {
-         boolean retry;
-         int attempts = 0;
-         do
-         {
-            retry = false;
-
-            TransportConfiguration tc = selectConnector();
-
-            // try each factory in the list until we find one which works
-
-            try
-            {
-               factory = new ClientSessionFactoryImpl(this,
-                                                      tc,
-                                                      callTimeout,
-                                                      clientFailureCheckPeriod,
-                                                      connectionTTL,
-                                                      retryInterval,
-                                                      retryIntervalMultiplier,
-                                                      maxRetryInterval,
-                                                      reconnectAttempts,
-                                                      threadPool,
-                                                      scheduledThreadPool,
-                                                      interceptors);
-               factory.connect(initialConnectAttempts, failoverOnInitialConnection);
-            }
-            catch (HornetQException e)
-            {
-               factory.close();
-               factory = null;
-               if (e.getCode() == HornetQException.NOT_CONNECTED)
-               {
-                  attempts++;
-
-                  if (topologyArray != null && attempts == topologyArray.length)
-                  {
-                     throw new HornetQException(HornetQException.NOT_CONNECTED,
-                                                "Cannot connect to server(s). Tried with all available servers.");
-                  }
-                  if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
-                  {
-                     throw new HornetQException(HornetQException.NOT_CONNECTED,
-                                                "Cannot connect to server(s). Tried with all available servers.");
-                  }
-                  retry = true;
-               }
-               else
-               {
-                  throw e;
-               }
-            }
-         }
-         while (retry);
-
-         if (ha || clusterConnection)
-         {
-            long timeout = System.currentTimeMillis() + 30000;
-            while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
-                   !receivedTopology &&
-                   timeout > System.currentTimeMillis())
-            {
-               // Now wait for the topology
-
-               try
-               {
-                  wait(1000);
-               }
-               catch (InterruptedException ignore)
-               {
-               }
-
-            }
-
-            if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
-            {
-               throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                          "Timed out waiting to receive cluster topology");
-            }
-
-         }
-
-         addFactory(factory);
-
-         return factory;
-      }
-
-   }
-
-   public boolean isHA()
-   {
-      return ha;
-   }
-
-   public boolean isCacheLargeMessagesClient()
-   {
-      return cacheLargeMessagesClient;
-   }
-
-   public void setCacheLargeMessagesClient(final boolean cached)
-   {
-      cacheLargeMessagesClient = cached;
-   }
-
-   public long getClientFailureCheckPeriod()
-   {
-      return clientFailureCheckPeriod;
-   }
-
-   public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
-   {
-      checkWrite();
-      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-   }
-
-   public long getConnectionTTL()
-   {
-      return connectionTTL;
-   }
-
-   public void setConnectionTTL(final long connectionTTL)
-   {
-      checkWrite();
-      this.connectionTTL = connectionTTL;
-   }
-
-   public long getCallTimeout()
-   {
-      return callTimeout;
-   }
-
-   public void setCallTimeout(final long callTimeout)
-   {
-      checkWrite();
-      this.callTimeout = callTimeout;
-   }
-
-   public int getMinLargeMessageSize()
-   {
-      return minLargeMessageSize;
-   }
-
-   public void setMinLargeMessageSize(final int minLargeMessageSize)
-   {
-      checkWrite();
-      this.minLargeMessageSize = minLargeMessageSize;
-   }
-
-   public int getConsumerWindowSize()
-   {
-      return consumerWindowSize;
-   }
-
-   public void setConsumerWindowSize(final int consumerWindowSize)
-   {
-      checkWrite();
-      this.consumerWindowSize = consumerWindowSize;
-   }
-
-   public int getConsumerMaxRate()
-   {
-      return consumerMaxRate;
-   }
-
-   public void setConsumerMaxRate(final int consumerMaxRate)
-   {
-      checkWrite();
-      this.consumerMaxRate = consumerMaxRate;
-   }
-
-   public int getConfirmationWindowSize()
-   {
-      return confirmationWindowSize;
-   }
-
-   public void setConfirmationWindowSize(final int confirmationWindowSize)
-   {
-      checkWrite();
-      this.confirmationWindowSize = confirmationWindowSize;
-   }
-
-   public int getProducerWindowSize()
-   {
-      return producerWindowSize;
-   }
-
-   public void setProducerWindowSize(final int producerWindowSize)
-   {
-      checkWrite();
-      this.producerWindowSize = producerWindowSize;
-   }
-
-   public int getProducerMaxRate()
-   {
-      return producerMaxRate;
-   }
-
-   public void setProducerMaxRate(final int producerMaxRate)
-   {
-      checkWrite();
-      this.producerMaxRate = producerMaxRate;
-   }
-
-   public boolean isBlockOnAcknowledge()
-   {
-      return blockOnAcknowledge;
-   }
-
-   public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
-   {
-      checkWrite();
-      this.blockOnAcknowledge = blockOnAcknowledge;
-   }
-
-   public boolean isBlockOnDurableSend()
-   {
-      return blockOnDurableSend;
-   }
-
-   public void setBlockOnDurableSend(final boolean blockOnDurableSend)
-   {
-      checkWrite();
-      this.blockOnDurableSend = blockOnDurableSend;
-   }
-
-   public boolean isBlockOnNonDurableSend()
-   {
-      return blockOnNonDurableSend;
-   }
-
-   public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
-   {
-      checkWrite();
-      this.blockOnNonDurableSend = blockOnNonDurableSend;
-   }
-
-   public boolean isAutoGroup()
-   {
-      return autoGroup;
-   }
-
-   public void setAutoGroup(final boolean autoGroup)
-   {
-      checkWrite();
-      this.autoGroup = autoGroup;
-   }
-
-   public boolean isPreAcknowledge()
-   {
-      return preAcknowledge;
-   }
-
-   public void setPreAcknowledge(final boolean preAcknowledge)
-   {
-      checkWrite();
-      this.preAcknowledge = preAcknowledge;
-   }
-
-   public int getAckBatchSize()
-   {
-      return ackBatchSize;
-   }
-
-   public void setAckBatchSize(final int ackBatchSize)
-   {
-      checkWrite();
-      this.ackBatchSize = ackBatchSize;
-   }
-
-   public boolean isUseGlobalPools()
-   {
-      return useGlobalPools;
-   }
-
-   public void setUseGlobalPools(final boolean useGlobalPools)
-   {
-      checkWrite();
-      this.useGlobalPools = useGlobalPools;
-   }
-
-   public int getScheduledThreadPoolMaxSize()
-   {
-      return scheduledThreadPoolMaxSize;
-   }
-
-   public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
-   {
-      checkWrite();
-      this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
-   }
-
-   public int getThreadPoolMaxSize()
-   {
-      return threadPoolMaxSize;
-   }
-
-   public void setThreadPoolMaxSize(final int threadPoolMaxSize)
-   {
-      checkWrite();
-      this.threadPoolMaxSize = threadPoolMaxSize;
-   }
-
-   public long getRetryInterval()
-   {
-      return retryInterval;
-   }
-
-   public void setRetryInterval(final long retryInterval)
-   {
-      checkWrite();
-      this.retryInterval = retryInterval;
-   }
-
-   public long getMaxRetryInterval()
-   {
-      return maxRetryInterval;
-   }
-
-   public void setMaxRetryInterval(final long retryInterval)
-   {
-      checkWrite();
-      maxRetryInterval = retryInterval;
-   }
-
-   public double getRetryIntervalMultiplier()
-   {
-      return retryIntervalMultiplier;
-   }
-
-   public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
-   {
-      checkWrite();
-      this.retryIntervalMultiplier = retryIntervalMultiplier;
-   }
-
-   public int getReconnectAttempts()
-   {
-      return reconnectAttempts;
-   }
-
-   public void setReconnectAttempts(final int reconnectAttempts)
-   {
-      checkWrite();
-      this.reconnectAttempts = reconnectAttempts;
-   }
-
-   public void setInitialConnectAttempts(int initialConnectAttempts)
-   {
-      checkWrite();
-      this.initialConnectAttempts = initialConnectAttempts;
-   }
-
-   public int getInitialConnectAttempts()
-   {
-      return initialConnectAttempts;
-   }
-
-   public boolean isFailoverOnInitialConnection()
-   {
-      return this.failoverOnInitialConnection;
-   }
-
-   public void setFailoverOnInitialConnection(final boolean failover)
-   {
-      checkWrite();
-      this.failoverOnInitialConnection = failover;
-   }
-
-   public String getConnectionLoadBalancingPolicyClassName()
-   {
-      return connectionLoadBalancingPolicyClassName;
-   }
-
-   public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
-   {
-      checkWrite();
-      connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
-   }
-
-   public TransportConfiguration[] getStaticTransportConfigurations()
-   {
-      return this.initialConnectors;
-   }
-
-   public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
-   {
-      return discoveryGroupConfiguration;
-   }
-
-   public void addInterceptor(final Interceptor interceptor)
-   {
-      interceptors.add(interceptor);
-   }
-
-   public boolean removeInterceptor(final Interceptor interceptor)
-   {
-      return interceptors.remove(interceptor);
-   }
-
-   public int getInitialMessagePacketSize()
-   {
-      return initialMessagePacketSize;
-   }
-
-   public void setInitialMessagePacketSize(final int size)
-   {
-      checkWrite();
-      initialMessagePacketSize = size;
-   }
-
-   public void setGroupID(final String groupID)
-   {
-      checkWrite();
-      this.groupID = groupID;
-   }
-
-   public String getGroupID()
-   {
-      return groupID;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
-    */
-   public boolean isCompressLargeMessage()
-   {
-      return compressLargeMessage;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
-    */
-   public void setCompressLargeMessage(boolean compress)
-   {
-      this.compressLargeMessage = compress;
-   }
-
-   private void checkWrite()
-   {
-      if (readOnly)
-      {
-         throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
-      }
-   }
-
-   public String getIdentity()
-   {
-      return identity;
-   }
-
-   public void setIdentity(String identity)
-   {
-      this.identity = identity;
-   }
-
-   public void setNodeID(String nodeID)
-   {
-      this.nodeID = nodeID;
-   }
-
-   public String getNodeID()
-   {
-      return nodeID;
-   }
-
-   public void setClusterConnection(boolean clusterConnection)
-   {
-      this.clusterConnection = clusterConnection;
-   }
-
-   public boolean isClusterConnection()
-   {
-      return clusterConnection;
-   }
-
-   public TransportConfiguration getClusterTransportConfiguration()
-   {
-      return clusterTransportConfiguration;
-   }
-
-   public void setClusterTransportConfiguration(TransportConfiguration tc)
-   {
-      this.clusterTransportConfiguration = tc;
-   }
-
-   public boolean isBackup()
-   {
-      return backup;
-   }
-
-   public void setBackup(boolean backup)
-   {
-      this.backup = backup;
-   }
-
-   @Override
-   protected void finalize() throws Throwable
-   {
-      if (finalizeCheck)
-      {
-         close();
-      }
-
-      super.finalize();
-   }
-
-   public void cleanup()
-   {
-      doClose(false);
-   }
-
-   public void close()
-   {
-      doClose(true);
-   }
-
-   protected void doClose(final boolean sendClose)
-   {
-      if (closed)
-      {
-         if (log.isDebugEnabled())
-         {
-            log.debug(this + " is already closed when calling closed");
-         }
-         return;
-      }
-
-      if (log.isDebugEnabled())
-      {
-         log.debug(this + " is calling close", new Exception("trace"));
-      }
-
-      closing = true;
-
-      if (discoveryGroup != null)
-      {
-         try
-         {
-            discoveryGroup.stop();
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to stop discovery group", e);
-         }
-      }
-      else
-      {
-         staticConnector.disconnect();
-      }
-
-      Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
-
-      for (ClientSessionFactory factory : clonedFactory)
-      {
-         if (sendClose)
-         {
-            factory.close();
-         }
-         else
-         {
-            factory.cleanup();
-         }
-      }
-
-      factories.clear();
-
-      if (shutdownPool)
-      {
-         if (threadPool != null)
-         {
-            threadPool.shutdown();
-
-            try
-            {
-               if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
-               {
-                  log.warn("Timed out waiting for pool to terminate");
-               }
-            }
-            catch (InterruptedException ignore)
-            {
-            }
-         }
-
-         if (scheduledThreadPool != null)
-         {
-            scheduledThreadPool.shutdown();
-
-            try
-            {
-               if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
-               {
-                  log.warn("Timed out waiting for scheduled pool to terminate");
-               }
-            }
-            catch (InterruptedException ignore)
-            {
-            }
-         }
-      }
-      readOnly = false;
-
-      closed = true;
-   }
-
-   /** This is directly called when the connection to the node is gone,
-    *  or when the node sends a disconnection.
-    *  Look for callers of this method! */
-   public void notifyNodeDown(final long eventTime, final String nodeID)
-   {
-
-      if (topology == null)
-      {
-         // there's no topology here
-         return;
-      }
-
-      if (log.isDebugEnabled())
-      {
-         log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
-      }
-
-      if (topology.removeMember(eventTime, nodeID))
-      {
-         if (topology.isEmpty())
-         {
-            // Resetting the topology to its original condition as it was brand new
-            synchronized (this)
-            {
-               topologyArray = null;
-               receivedTopology = false;
-            }
-         }
-         else
-         {
-            updateArraysAndPairs();
-
-            if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
-            {
-               // Resetting the topology to its original condition as it was brand new
-               receivedTopology = false;
-            }
-         }
-      }
-
-   }
-
-   public void notifyNodeUp(long uniqueEventID,
-                            final String nodeID,
-                            final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            final boolean last)
-   {
-      if (topology == null)
-      {
-         // there's no topology
-         return;
-      }
-
-      if (log.isDebugEnabled())
-      {
-         log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
-      }
-
-      TopologyMember member = new TopologyMember(connectorPair.a, connectorPair.b);
-
-      if (topology.updateMember(uniqueEventID, nodeID, member))
-      {
-
-         TopologyMember actMember = topology.getMember(nodeID);
-
-         if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
-         {
-            for (ClientSessionFactory factory : factories)
-            {
-               ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
-                                                                          actMember.getConnector().b);
-            }
-         }
-
-         updateArraysAndPairs();
-      }
-
-      if (last)
-      {
-         synchronized (this)
-         {
-            receivedTopology = true;
-            // Notify if waiting on getting topology
-            notifyAll();
-         }
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see java.lang.Object#toString()
-    */
-   @Override
-   public String toString()
-   {
-      if (identity != null)
-      {
-         return "ServerLocatorImpl (identity=" + identity +
-                ") [initialConnectors=" +
-                Arrays.toString(initialConnectors) +
-                ", discoveryGroupConfiguration=" +
-                discoveryGroupConfiguration +
-                "]";
-      }
-      else
-      {
-         return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
-                ", discoveryGroupConfiguration=" +
-                discoveryGroupConfiguration +
-                "]";
-      }
-   }
-
-   private synchronized void updateArraysAndPairs()
-   {
-      Collection<TopologyMember> membersCopy = topology.getMembers();
-
-      topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
-                                                                                                membersCopy.size());
-
-      int count = 0;
-      for (TopologyMember pair : membersCopy)
-      {
-         topologyArray[count++] = pair.getConnector();
-      }
-   }
-
-   public synchronized void connectorsChanged()
-   {
-      List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
-
-      this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
-                                                                           newConnectors.size());
-
-      int count = 0;
-      for (DiscoveryEntry entry : newConnectors)
-      {
-         this.initialConnectors[count++] = entry.getConnector();
-
-         if (topology != null && topology.getMember(entry.getNodeID()) == null)
-         {
-            TopologyMember member = new TopologyMember(entry.getConnector(), null);
-            // on this case we set it as zero as any update coming from server should be accepted
-            topology.updateMember(0, entry.getNodeID(), member);
-         }
-      }
-
-      if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
-      {
-         // FIXME the node is alone in the cluster. We create a connection to the new node
-         // to trigger the node notification to form the cluster.
-         try
-         {
-            connect();
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
-         }
-      }
-   }
-
-   public synchronized void factoryClosed(final ClientSessionFactory factory)
-   {
-      factories.remove(factory);
-
-      if (!clusterConnection && factories.isEmpty())
-      {
-         // Go back to using the broadcast or static list
-
-         receivedTopology = false;
-
-         topologyArray = null;
-      }
-   }
-
-   public Topology getTopology()
-   {
-      return topology;
-   }
-
-   public void addClusterTopologyListener(final ClusterTopologyListener listener)
-   {
-      topology.addClusterTopologyListener(listener);
-   }
-
-   public void removeClusterTopologyListener(final ClusterTopologyListener listener)
-   {
-      topology.removeClusterTopologyListener(listener);
-   }
-
-   public synchronized void addFactory(ClientSessionFactoryInternal factory)
-   {
-      if (factory != null)
-      {
-         TransportConfiguration backup = null;
-
-         if (topology != null)
-         {
-            backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
-         }
-
-         factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
-         factories.add(factory);
-      }
-   }
-
-   class StaticConnector implements Serializable
-   {
-      private static final long serialVersionUID = 6772279632415242634l;
-
-      private List<Connector> connectors;
-
-      public ClientSessionFactory connect() throws HornetQException
-      {
-         if (closed)
-         {
-            throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
-         }
-
-         try
-         {
-            initialise();
-         }
-         catch (Exception e)
-         {
-            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-         }
-
-         ClientSessionFactory csf = null;
-
-         createConnectors();
-
-         try
-         {
-
-            int retryNumber = 0;
-            while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
-            {
-               retryNumber++;
-               for (Connector conn : connectors)
-               {
-                  if (log.isDebugEnabled())
-                  {
-                     log.debug(this + "::Submitting connect towards " + conn);
-                  }
-
-                  csf = conn.tryConnect();
-
-                  if (csf != null)
-                  {
-                     csf.getConnection().addFailureListener(new FailureListener()
-                     {
-                        // Case the node where the cluster connection was connected is gone, we need to restart the
-                        // connection
-                        public void connectionFailed(HornetQException exception, boolean failedOver)
-                        {
-                           if (clusterConnection && exception.getCode() == HornetQException.DISCONNECTED)
-                           {
-                              try
-                              {
-                                 ServerLocatorImpl.this.start(startExecutor);
-                              }
-                              catch (Exception e)
-                              {
-                                 // There isn't much to be done if this happens here
-                                 log.warn(e.getMessage());
-                              }
-                           }
-                        }
-                     });
-
-                     if (log.isDebugEnabled())
-                     {
-                        log.debug("Returning " + csf +
-                                  " after " +
-                                  retryNumber +
-                                  " retries on StaticConnector " +
-                                  ServerLocatorImpl.this);
-                     }
-
-                     return csf;
-                  }
-               }
-
-               if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts)
-               {
-                  break;
-               }
-
-               if (!closed && !closing)
-               {
-                  Thread.sleep(retryInterval);
-               }
-            }
-
-         }
-         catch (Exception e)
-         {
-            log.warn(e.getMessage(), e);
-            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
-         }
-
-         if (csf == null && !closed)
-         {
-            log.warn("Failed to connecto to any static connector, throwing exception now");
-            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
-         }
-         if (log.isDebugEnabled())
-         {
-            log.debug("Returning " + csf + " on " + ServerLocatorImpl.this);
-         }
-         return csf;
-      }
-
-      private synchronized void createConnectors()
-      {
-         if (connectors != null)
-         {
-            for (Connector conn : connectors)
-            {
-               if (conn != null)
-               {
-                  conn.disconnect();
-               }
-            }
-         }
-         connectors = new ArrayList<Connector>();
-         for (TransportConfiguration initialConnector : initialConnectors)
-         {
-            ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
-                                                                                initialConnector,
-                                                                                callTimeout,
-                                                                                clientFailureCheckPeriod,
-                                                                                connectionTTL,
-                                                                                retryInterval,
-                                                                                retryIntervalMultiplier,
-                                                                                maxRetryInterval,
-                                                                                reconnectAttempts,
-                                                                                threadPool,
-                                                                                scheduledThreadPool,
-                                                                                interceptors);
-
-            factory.disableFinalizeCheck();
-
-            connectors.add(new Connector(initialConnector, factory));
-         }
-      }
-
-      public synchronized void disconnect()
-      {
-         if (connectors != null)
-         {
-            for (Connector connector : connectors)
-            {
-               connector.disconnect();
-            }
-         }
-      }
-
-      public void finalize() throws Throwable
-      {
-         if (!closed && finalizeCheck)
-         {
-            log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
-                     System.identityHashCode(this));
-
-            log.warn("The ServerLocator you didn't close was created here:", e);
-
-            if (ServerLocatorImpl.finalizeCallback != null)
-            {
-               ServerLocatorImpl.finalizeCallback.run();
-            }
-
-            close();
-         }
-
-         super.finalize();
-      }
-
-      class Connector
-      {
-         private TransportConfiguration initialConnector;
-
-         private volatile ClientSessionFactoryInternal factory;
-
-         private boolean interrupted = false;
-
-         private Exception e;
-
-         public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
-         {
-            this.initialConnector = initialConnector;
-            this.factory = factory;
-         }
-
-         public ClientSessionFactory tryConnect() throws HornetQException
-         {
-            if (log.isDebugEnabled())
-            {
-               log.debug(this + "::Trying to connect to " + factory);
-            }
-            try
-            {
-               ClientSessionFactoryInternal factoryToUse = factory;
-               if (factoryToUse != null)
-               {
-                  factory.connect(1, false);
-               }
-               return factoryToUse;
-            }
-            catch (HornetQException e)
-            {
-               log.debug(this + "::Exception on establish connector initial connection", e);
-               return null;
-            }
-         }
-
-         public void disconnect()
-         {
-            interrupted = true;
-
-            if (factory != null)
-            {
-               factory.causeExit();
-               factory.cleanup();
-               factory = null;
-            }
-         }
-
-         /* (non-Javadoc)
-          * @see java.lang.Object#toString()
-          */
-         @Override
-         public String toString()
-         {
-            return "Connector [initialConnector=" + initialConnector + "]";
-         }
-
-      }
-   }
-}

Added: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java	                        (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java	2011-09-22 19:24:22 UTC (rev 11397)
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A SimpleUDPServerLocatorImpl, was derived from ServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public class SimpleUDPServerLocatorImpl extends AbstractServerLocator
+{
+   private static final Logger log = Logger.getLogger(SimpleUDPServerLocatorImpl.class);
+
+   private String discoveryGroupName;
+     
+   private InetAddress localBindAddress;
+      
+   private InetAddress groupAddress;
+      
+   private int groupPort;
+      
+   private long refreshTimeout;
+      
+   private long initialWaitTimeout;
+      
+   private DiscoveryGroup discoveryGroup;
+   
+   private volatile boolean closing;
+
+   @Override
+   protected synchronized void initialiseInternal() throws Exception
+   {
+      this.discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+         
+      Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+         
+      String lbStr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
+            
+      if (lbStr != null)
+      {
+         this.localBindAddress = InetAddress.getByName(lbStr);
+      }
+      else
+      {
+         this.localBindAddress = null;
+      }
+
+      String gaddr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params);
+      if(gaddr != null)
+      {
+         this.groupAddress = InetAddress.getByName(gaddr);
+      }
+      this.groupPort = ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
+      this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
+      this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
+         
+      discoveryGroup = new DiscoveryGroupImpl(getNodeID(),
+                                              this.discoveryGroupName,
+                                              this.localBindAddress,
+                                              this.groupAddress,
+                                              this.groupPort,
+                                              this.refreshTimeout);
+            
+      discoveryGroup.registerListener(this);
+            
+      discoveryGroup.start();
+   }
+   
+   public SimpleUDPServerLocatorImpl(final boolean useHA,
+                                     final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+   {
+      super(useHA, discoveryGroupConfiguration);
+   }
+
+   public ClientSessionFactoryInternal connect() throws Exception
+   {
+      ClientSessionFactoryInternal sf;
+      sf = (ClientSessionFactoryInternal)createSessionFactory();
+
+      addFactory(sf);
+      return sf;
+   }
+
+   @Override
+   protected void waitInitialDiscovery() throws Exception
+   {
+      // Wait for an initial broadcast to give us at least one node in the cluster
+      long timeout = this.isClusterConnection() ? 0 : this.initialWaitTimeout;
+      boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+      if (!ok)
+      {
+         throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                    "Timed out waiting to receive initial broadcast from cluster");
+      }
+   }
+   
+   @Override
+   protected void doCloseInternal()
+   {
+      try
+      {
+         discoveryGroup.stop();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to stop discovery group", e);
+      }
+   }
+}

Added: branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java	                        (rev 0)
+++ branches/HORNETQ-316_for_2_2_EAP/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java	2011-09-22 19:24:22 UTC (rev 11397)
@@ -0,0 +1,317 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A StaticServerLocatorImpl, was derived from ServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public class StaticServerLocatorImpl extends AbstractServerLocator
+{
+   private static final Logger log = Logger.getLogger(StaticServerLocatorImpl.class);
+
+   private StaticConnector staticConnector = new StaticConnector();
+
+   private final Exception e = new Exception();
+   
+   @Override
+   protected synchronized void initialiseInternal() throws Exception
+   {
+      /* Nothing special for this class */
+   }
+
+   public StaticServerLocatorImpl(final boolean useHA,
+                                  final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+   {
+      super(useHA, discoveryGroupConfiguration);
+      
+      Map<String,Object> params = discoveryGroupConfiguration.getParams();
+      List<TransportConfiguration> initialConnectors = (List<TransportConfiguration>)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME);
+      setInitialConnectors(initialConnectors.toArray(new TransportConfiguration[0]));
+                                            
+      e.fillInStackTrace();
+   }
+
+   public ClientSessionFactoryInternal connect() throws Exception
+   {
+      ClientSessionFactoryInternal sf;
+
+      sf = (ClientSessionFactoryInternal)staticConnector.connect();
+
+      addFactory(sf);
+      return sf;
+   }
+
+   @Override
+   protected void waitInitialDiscovery()
+   {
+      /* Nothing to do for this class */
+   }
+   
+   @Override
+   protected void doCloseInternal()
+   {
+      staticConnector.disconnect();
+   }
+
+   class StaticConnector implements Serializable
+   {
+      private static final long serialVersionUID = 6772279632415242634l;
+
+      private List<Connector> connectors;
+
+      public ClientSessionFactory connect() throws HornetQException
+      {
+         if (isClosed())
+         {
+            throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+         }
+
+         try
+         {
+            initialiseInternal();
+         }
+         catch (Exception e)
+         {
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+         }
+
+         ClientSessionFactory csf = null;
+
+         createConnectors();
+
+         try
+         {
+
+            int retryNumber = 0;
+            while (csf == null && !isClosed())
+            {
+               retryNumber++;
+               for (Connector conn : connectors)
+               {
+                  if (log.isDebugEnabled())
+                  {
+                     log.debug(this + "::Submitting connect towards " + conn);
+                  }
+
+                  csf = conn.tryConnect();
+
+                  if (csf != null)
+                  {
+                     csf.getConnection().addFailureListener(new FailureListener()
+                     {
+                        // Case the node where the cluster connection was connected is gone, we need to restart the
+                        // connection
+                        public void connectionFailed(HornetQException exception, boolean failedOver)
+                        {
+                           if (isClusterConnection() && exception.getCode() == HornetQException.DISCONNECTED)
+                           {
+                              try
+                              {
+                                 StaticServerLocatorImpl.this.start(getExecutor());
+                              }
+                              catch (Exception e)
+                              {
+                                 // There isn't much to be done if this happens here
+                                 log.warn(e.getMessage());
+                              }
+                           }
+                        }
+                     });
+
+                     if (log.isDebugEnabled())
+                     {
+                        log.debug("Returning " + csf +
+                                  " after " +
+                                  retryNumber +
+                                  " retries on StaticConnector " +
+                                  StaticServerLocatorImpl.this);
+                     }
+
+                     return csf;
+                  }
+               }
+
+               if (getInitialConnectAttempts() >= 0 && retryNumber > getInitialConnectAttempts())
+               {
+                  break;
+               }
+
+               if (!isClosed())
+               {
+                  Thread.sleep(getRetryInterval());
+               }
+            }
+
+         }
+         catch (Exception e)
+         {
+            log.warn(e.getMessage(), e);
+            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
+         }
+
+         if (csf == null && !isClosed())
+         {
+            log.warn("Failed to connecto to any static connector, throwing exception now");
+            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+         }
+         if (log.isDebugEnabled())
+         {
+            log.debug("Returning " + csf + " on " + StaticServerLocatorImpl.this);
+         }
+         return csf;
+      }
+
+      private synchronized void createConnectors()
+      {
+         if (connectors != null)
+         {
+            for (Connector conn : connectors)
+            {
+               if (conn != null)
+               {
+                  conn.disconnect();
+               }
+            }
+         }
+         connectors = new ArrayList<Connector>();
+         for (TransportConfiguration initialConnector : getStaticTransportConfigurations())
+         {
+            ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(StaticServerLocatorImpl.this,
+                                                                                initialConnector,
+                                                                                getCallTimeout(),
+                                                                                getClientFailureCheckPeriod(),
+                                                                                getConnectionTTL(),
+                                                                                getRetryInterval(),
+                                                                                getRetryIntervalMultiplier(),
+                                                                                getMaxRetryInterval(),
+                                                                                getReconnectAttempts(),
+                                                                                getThreadPool(),
+                                                                                getScheduledThreadPool(),
+                                                                                getInterceptors());
+
+            factory.disableFinalizeCheck();
+
+            connectors.add(new Connector(initialConnector, factory));
+         }
+      }
+
+      public synchronized void disconnect()
+      {
+         if (connectors != null)
+         {
+            for (Connector connector : connectors)
+            {
+               connector.disconnect();
+            }
+         }
+      }
+
+      public void finalize() throws Throwable
+      {
+         if (!isClosed() && doFinalizeCheck())
+         {
+            log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
+                     System.identityHashCode(this));
+
+            log.warn("The ServerLocator you didn't close was created here:", e);
+
+            if (StaticServerLocatorImpl.finalizeCallback != null)
+            {
+               StaticServerLocatorImpl.finalizeCallback.run();
+            }
+
+            close();
+         }
+
+         super.finalize();
+      }
+
+      class Connector
+      {
+         private TransportConfiguration initialConnector;
+
+         private volatile ClientSessionFactoryInternal factory;
+
+         private boolean interrupted = false;
+
+         private Exception e;
+
+         public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
+         {
+            this.initialConnector = initialConnector;
+            this.factory = factory;
+         }
+
+         public ClientSessionFactory tryConnect() throws HornetQException
+         {
+            if (log.isDebugEnabled())
+            {
+               log.debug(this + "::Trying to connect to " + factory);
+            }
+            try
+            {
+               ClientSessionFactoryInternal factoryToUse = factory;
+               if (factoryToUse != null)
+               {
+                  factory.connect(1, false);
+               }
+               return factoryToUse;
+            }
+            catch (HornetQException e)
+            {
+               log.debug(this + "::Exception on establish connector initial connection", e);
+               return null;
+            }
+         }
+
+         public void disconnect()
+         {
+            interrupted = true;
+
+            if (factory != null)
+            {
+               factory.causeExit();
+               factory.cleanup();
+               factory = null;
+            }
+         }
+
+         /* (non-Javadoc)
+          * @see java.lang.Object#toString()
+          */
+         @Override
+         public String toString()
+         {
+            return "Connector [initialConnector=" + initialConnector + "]";
+         }
+
+      }
+   }
+}



More information about the hornetq-commits mailing list