[hornetq-commits] JBoss hornetq SVN: r9308 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq: api/core/client/loadbalance and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jun 11 11:36:22 EDT 2010


Author: timfox
Date: 2010-06-11 11:36:20 -0400 (Fri, 11 Jun 2010)
New Revision: 9308

Added:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/LockFile.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java
Log:
new HA

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core.client;
+
+import java.util.List;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * A ClusterTopologyListener
+ *
+ * @author tim
+ *
+ *
+ */
+public interface ClusterTopologyListener
+{
+   void onTopologyChanged(List<Pair<TransportConfiguration, TransportConfiguration>> clusterTopology);
+
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core.client;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+
+/**
+ * A ServerLocator
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ServerLocator
+{
+   ClientSessionFactory createSessionFactory() throws Exception;
+   
+   /**
+    * Returns the period used to check if a client has failed to receive pings from the server.
+    *   
+    * Period is in milliseconds, default value is {@link HornetQClient#DEFAULT_CLIENT_FAILURE_CHECK_PERIOD}.
+    * 
+    * @return the period used to check if a client has failed to receive pings from the server
+    */
+   long getClientFailureCheckPeriod();
+
+   /**
+    * Sets the period (in milliseconds) used to check if a client has failed to receive pings from the server.
+    * 
+    * Value must be -1 (to disable) or greater than 0.
+    * 
+    * @param clientFailureCheckPeriod the period to check failure
+    */
+   void setClientFailureCheckPeriod(long clientFailureCheckPeriod);
+
+   /**
+    * When <code>true</code>, consumers created through this factory will create temporary files to cache large messages.
+    * 
+    * There is 1 temporary file created for each large message.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_CACHE_LARGE_MESSAGE_CLIENT}.
+    * 
+    * @return <code>true</code> if consumers created through this factory will cache large messages in temporary files, <code>false</code> else
+    */
+   boolean isCacheLargeMessagesClient();
+
+   /**
+    * Sets whether large messages received by consumers created through this factory will be cached in temporary files or not.
+    * 
+    * @param cached <code>true</code> to cache large messages in temporary files, <code>false</code> else
+    */
+   void setCacheLargeMessagesClient(boolean cached);
+
+   /**
+    * Returns the connection <em>time-to-live</em>.
+    * This TTL determines how long the server will keep a connection alive in the absence of any data arriving from the client.
+    * 
+    * Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_CONNECTION_TTL}.
+    * 
+    * @return the connection time-to-live in milliseconds
+    */
+   long getConnectionTTL();
+
+   /**
+    * Sets this factory's connections <em>time-to-live</em>.
+    * 
+    * Value must be -1 (to disable) or greater or equals to 0.
+    * 
+    * @param connectionTTL period in milliseconds
+    */
+   void setConnectionTTL(long connectionTTL);
+
+   /**
+    * Returns the blocking calls timeout.
+    * 
+    * If client's blocking calls to the server take more than this timeout, the call will throw a {@link HornetQException} with the code {@link HornetQException#CONNECTION_TIMEDOUT}.
+    * Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_CALL_TIMEOUT}.
+    * 
+    * @return the blocking calls timeout
+    */
+   long getCallTimeout();
+
+   /**
+    * Sets the blocking call timeout.
+    * 
+    * Value must be greater or equals to 0
+    * 
+    * @param callTimeout blocking call timeout in milliseconds
+    */
+   void setCallTimeout(long callTimeout);
+
+   /**
+    * Returns the large message size threshold.
+    * 
+    * Messages whose size is if greater than this value will be handled as <em>large messages</em>.
+    * 
+    * Value is in bytes, default value is {@link HornetQClient#DEFAULT_MIN_LARGE_MESSAGE_SIZE}.
+    * 
+    * @return the message size threshold to treat messages as large messages.
+    */
+   int getMinLargeMessageSize();
+
+   /**
+    * Sets the large message size threshold.
+    * 
+    * Value must be greater than 0.
+    * 
+    * @param minLargeMessageSize large message size threshold in bytes
+    */
+   void setMinLargeMessageSize(int minLargeMessageSize);
+
+   /**
+    * Returns the window size for flow control of the consumers created through this factory.
+    * 
+    * Value is in bytes, default value is {@link HornetQClient#DEFAULT_CONSUMER_WINDOW_SIZE}.
+    * 
+    * @return the window size used for consumer flow control
+    */
+   int getConsumerWindowSize();
+
+   /**
+    * Sets the window size for flow control of the consumers created through this factory.
+    * 
+    * Value must be -1 (to disable flow control), 0 (to not buffer any messages) or greater than 0 (to set the maximum size of the buffer)
+    *
+    * @param consumerWindowSize window size (in bytes) used for consumer flow control
+    */
+   void setConsumerWindowSize(int consumerWindowSize);
+
+   /**
+    * Returns the maximum rate of message consumption for consumers created through this factory.
+    * 
+    * This value controls the rate at which a consumer can consume messages. A consumer will never consume messages at a rate faster than the rate specified.
+    * 
+    * Value is -1 (to disable) or a positive integer corresponding to the maximum desired message consumption rate specified in units of messages per second.
+    * Default value is {@link HornetQClient#DEFAULT_CONSUMER_MAX_RATE}.
+    * 
+    * @return the consumer max rate
+    */
+   int getConsumerMaxRate();
+
+   /**
+    * Sets the maximum rate of message consumption for consumers created through this factory.
+    * 
+    * Value must -1 (to disable) or a positive integer corresponding to the maximum desired message consumption rate specified in units of messages per second.
+    * 
+    * @param consumerMaxRate maximum rate of message consumption (in messages per seconds)
+    */
+   void setConsumerMaxRate(int consumerMaxRate);
+
+   /**
+    * Returns the size for the confirmation window of clients using this factory.
+    * 
+    * Value is in bytes or -1 (to disable the window). Default value is {@link HornetQClient#DEFAULT_CONFIRMATION_WINDOW_SIZE}.
+    * 
+    * @return the size for the confirmation window of clients using this factory
+    */
+   int getConfirmationWindowSize();
+
+   /**
+    * Sets the size for the confirmation window buffer of clients using this factory.
+    * 
+    * Value must be -1 (to disable the window) or greater than 0.
+
+    * @param confirmationWindowSize size of the confirmation window (in bytes)
+    */
+   void setConfirmationWindowSize(int confirmationWindowSize);
+
+   /**
+    * Returns the window size for flow control of the producers created through this factory.
+    * 
+    * Value must be -1 (to disable flow control) or greater than 0 to determine the maximum amount of bytes at any give time (to prevent overloading the connection).
+    * Default value is {@link HornetQClient#DEFAULT_PRODUCER_WINDOW_SIZE}.
+    * 
+    * @return the window size for flow control of the producers created through this factory.
+    */
+   int getProducerWindowSize();
+
+   /**
+    * Returns the window size for flow control of the producers created through this factory.
+    * 
+    * Value must be -1 (to disable flow control) or greater than 0.
+    * 
+    * @param producerWindowSize window size (in bytest) for flow control of the producers created through this factory.
+    */
+   void setProducerWindowSize(int producerWindowSize);
+
+   /**
+    * Returns the maximum rate of message production for producers created through this factory.
+    * 
+    * This value controls the rate at which a producer can produce messages. A producer will never produce messages at a rate faster than the rate specified.
+    * 
+    * Value is -1 (to disable) or a positive integer corresponding to the maximum desired message production rate specified in units of messages per second.
+    * Default value is {@link HornetQClient#DEFAULT_PRODUCER_MAX_RATE}.
+    * 
+    * @return  maximum rate of message production (in messages per seconds)
+    */
+   int getProducerMaxRate();
+
+   /**
+    * Sets the maximum rate of message production for producers created through this factory.
+    * 
+    * Value must -1 (to disable) or a positive integer corresponding to the maximum desired message production rate specified in units of messages per second.
+    * 
+    * @param producerMaxRate maximum rate of message production (in messages per seconds)
+    */
+   void setProducerMaxRate(int producerMaxRate);
+
+   /**
+    * Returns whether consumers created through this factory will block while sending message acknowledgements or do it asynchronously.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_ACKNOWLEDGE}.
+    * 
+    * @return whether consumers will block while sending message acknowledgements or do it asynchronously
+    */
+   boolean isBlockOnAcknowledge();
+
+   /**
+    * Sets whether consumers created through this factory will block while sending message acknowledgements or do it asynchronously.
+    *
+    * @param blockOnAcknowledge <code>true</code> to block when sending message acknowledgements or <code>false</code> to send them asynchronously
+    */
+   void setBlockOnAcknowledge(boolean blockOnAcknowledge);
+
+   /**
+    * Returns whether producers created through this factory will block while sending <em>durable</em> messages or do it asynchronously.
+    * <br>
+    * If the session is configured to send durable message asynchronously, the client can set a SendAcknowledgementHandler on the ClientSession
+    * to be notified once the message has been handled by the server.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_DURABLE_SEND}.
+    *
+    * @return whether producers will block while sending persistent messages or do it asynchronously
+    */
+   boolean isBlockOnDurableSend();
+
+   /**
+    * Sets whether producers created through this factory will block while sending <em>durable</em> messages or do it asynchronously.
+    * 
+    * @param blockOnDurableSend <code>true</code> to block when sending durable messages or <code>false</code> to send them asynchronously
+    */
+   void setBlockOnDurableSend(boolean blockOnDurableSend);
+
+   /**
+    * Returns whether producers created through this factory will block while sending <em>non-durable</em> messages or do it asynchronously.
+    * <br>
+    * If the session is configured to send non-durable message asynchronously, the client can set a SendAcknowledgementHandler on the ClientSession
+    * to be notified once the message has been handled by the server.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_NON_DURABLE_SEND}.
+    *
+    * @return whether producers will block while sending non-durable messages or do it asynchronously
+    */
+   boolean isBlockOnNonDurableSend();
+
+   /**
+    * Sets whether producers created through this factory will block while sending <em>non-durable</em> messages or do it asynchronously.
+    * 
+    * @param blockOnNonDurableSend <code>true</code> to block when sending non-durable messages or <code>false</code> to send them asynchronously
+    */
+   void setBlockOnNonDurableSend(boolean blockOnNonDurableSend);
+
+   /**
+    * Returns whether producers created through this factory will automatically
+    * assign a group ID to the messages they sent.
+    * 
+    * if <code>true</code>, a random unique group ID is created and set on each message for the property
+    * {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
+    * Default value is {@link HornetQClient#DEFAULT_AUTO_GROUP}.
+    * 
+    * @return whether producers will automatically assign a group ID to their messages
+    */
+   boolean isAutoGroup();
+
+   /**
+    * Sets whether producers created through this factory will automatically
+    * assign a group ID to the messages they sent.
+    * 
+    * @param autoGroup <code>true</code> to automatically assign a group ID to each messages sent through this factory, <code>false</code> else
+    */
+   void setAutoGroup(boolean autoGroup);
+
+   /**
+    * Returns the group ID that will be eventually set on each message for the property {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
+    * 
+    * Default value is is <code>null</code> and no group ID will be set on the messages.
+    * 
+    * @return the group ID that will be eventually set on each message
+    */
+   String getGroupID();
+   
+   /**
+    * Sets the group ID that will be  set on each message sent through this factory.
+    * 
+    * @param groupID the group ID to use
+    */
+   void setGroupID(String groupID);
+
+   /**
+    * Returns whether messages will pre-acknowledged on the server before they are sent to the consumers or not.
+    *
+    * Default value is {@link HornetQClient#DEFAULT_PRE_ACKNOWLEDGE}
+    */
+   boolean isPreAcknowledge();
+
+   /**
+    * Sets to <code>true</code> to pre-acknowledge consumed messages on the server before they are sent to consumers, else set to <code>false</code> to let
+    * clients acknowledge the message they consume.
+    * 
+    * @param preAcknowledge <code>true</code> to enable pre-acknowledgement, <code>false</code> else
+    */
+   void setPreAcknowledge(boolean preAcknowledge);
+
+   /**
+    * Returns the acknowledgements batch size.
+    * 
+    * Default value is  {@link HornetQClient#DEFAULT_ACK_BATCH_SIZE}.
+    * 
+    * @return the acknowledgements batch size
+    */
+   int getAckBatchSize();
+
+   /**
+    * Sets the acknowledgements batch size.
+    * 
+    * Value must be equal or greater than 0.
+    * 
+    * @param ackBatchSize acknowledgements batch size
+    */
+   void setAckBatchSize(int ackBatchSize);
+
+   /**
+    * Returns the local bind address to which the multicast socket is bound for discovery.
+    * 
+    * This is null if the multicast socket is not bound, or no discovery is being used
+    * 
+    * @return the local bind address to which the multicast socket is bound for discovery
+    */
+   String getLocalBindAddress();
+
+   /**
+    * Sets the local bind address to which the multicast socket is bound for discovery.
+    * 
+    * @param the local bind address
+    */
+   void setLocalBindAddress(String localBindAddress);
+   
+   /**
+    * Returns the address to listen to discover which connectors this factory can use.
+    * The discovery address must be set to enable this factory to discover HornetQ servers.
+    * 
+    * @return the address to listen to discover which connectors this factory can use
+    */
+   String getDiscoveryAddress();
+
+   /**
+    * Returns the port to listen to discover which connectors this factory can use.
+    * The discovery port must be set to enable this factory to discover HornetQ servers.
+    * 
+    * @return the port to listen to discover which connectors this factory can use
+    */   
+   int getDiscoveryPort();
+
+
+   /**
+    * Returns an array of TransportConfigurations representing the static list of live servers used when
+    * creating this object
+    * @return
+    */
+   TransportConfiguration[] getStaticTransportConfigurations();
+
+   /**
+    * Returns the refresh timeout for discovered HornetQ servers.
+    * 
+    * If this factory uses discovery to find HornetQ servers, the list of discovered servers
+    * will be refreshed according to this timeout.
+    * 
+    * Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_DISCOVERY_REFRESH_TIMEOUT}.
+    * 
+    * @return the refresh timeout for discovered HornetQ servers
+    */
+   long getDiscoveryRefreshTimeout();
+
+   /**
+    * Sets the refresh timeout for discovered HornetQ servers.
+    * 
+    * Value must be greater than 0.
+    * 
+    * @param discoveryRefreshTimeout refresh timeout (in milliseconds) for discovered HornetQ servers
+    */
+   void setDiscoveryRefreshTimeout(long discoveryRefreshTimeout);
+
+   /**
+    * Returns the initial wait timeout if this factory is configured to use discovery.
+    * 
+    * Value is in milliseconds, default value is  {@link HornetQClient#DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT}.
+    * 
+    * @return the initial wait timeout if this factory is configured to use discovery 
+    */
+   long getDiscoveryInitialWaitTimeout();
+
+   /**
+    * Sets the initial wait timeout if this factory is configured to use discovery.
+    * 
+    * Value is in milliseconds and must be greater than 0.
+    * 
+    * @param initialWaitTimeout initial wait timeout when using discovery 
+    */
+   void setDiscoveryInitialWaitTimeout(long initialWaitTimeout);
+
+   /**
+    * Returns whether this factory will use global thread pools (shared among all the factories in the same JVM)
+    * or its own pools.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_USE_GLOBAL_POOLS}.
+    * 
+    * @return <code>true</code> if this factory uses global thread pools, <code>false</code> else
+    */
+   boolean isUseGlobalPools();
+
+   /**
+    * Sets whether this factory will use global thread pools (shared among all the factories in the same JVM)
+    * or its own pools.
+    * 
+    * @param useGlobalPools <code>true</code> to let this factory uses global thread pools, <code>false</code> else
+    */
+   void setUseGlobalPools(boolean useGlobalPools);
+
+   /**
+    * Returns the maximum size of the scheduled thread pool.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE}.
+    * 
+    * @return the maximum size of the scheduled thread pool.
+    */
+   int getScheduledThreadPoolMaxSize();
+
+   /**
+    * Sets the maximum size of the scheduled thread pool.
+    * 
+    * This setting is relevant only if this factory does not use global pools.
+    * Value must be greater than 0.
+    * 
+    * @param scheduledThreadPoolMaxSize  maximum size of the scheduled thread pool.
+    */
+   void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize);
+
+   /**
+    * Returns the maximum size of the thread pool.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_THREAD_POOL_MAX_SIZE}.
+    * 
+    * @return the maximum size of the thread pool.
+    */
+   int getThreadPoolMaxSize();
+
+   /**
+    * Sets the maximum size of the thread pool.
+    * 
+    * This setting is relevant only if this factory does not use global pools.
+    * Value must be -1 (for unlimited thread pool) or greater than 0.
+    * 
+    * @param threadPoolMaxSize  maximum size of the thread pool.
+    */
+   void setThreadPoolMaxSize(int threadPoolMaxSize);
+
+   /**
+    * Returns the time to retry connections created by this factory after failure. 
+    * 
+    * Value is in milliseconds, default is {@link HornetQClient#DEFAULT_RETRY_INTERVAL}.
+    * 
+    * @return the time to retry connections created by this factory after failure
+    */
+   long getRetryInterval();
+
+   /**
+    * Sets the time to retry connections created by this factory after failure.
+    * 
+    * Value must be greater than 0.
+    * 
+    * @param retryInterval time (in milliseconds) to retry connections created by this factory after failure 
+    */
+   void setRetryInterval(long retryInterval);
+
+   /**
+    * Returns the multiplier to apply to successive retry intervals.
+    * 
+    * Default value is  {@link HornetQClient#DEFAULT_RETRY_INTERVAL_MULTIPLIER}.
+    * 
+    * @return the multiplier to apply to successive retry intervals
+    */
+   double getRetryIntervalMultiplier();
+
+   /**
+    * Sets the multiplier to apply to successive retry intervals.
+    * 
+    * Value must be positive.
+    * 
+    * @param retryIntervalMultiplier multiplier to apply to successive retry intervals
+    */
+   void setRetryIntervalMultiplier(double retryIntervalMultiplier);
+
+   /**
+    * Returns the maximum retry interval (in the case a retry interval multiplier has been specified).
+    * 
+    * Value is in milliseconds, default value is  {@link HornetQClient#DEFAULT_MAX_RETRY_INTERVAL}.
+    * 
+    * @return the maximum retry interval
+    */
+   long getMaxRetryInterval();
+
+   /**
+    * Sets the maximum retry interval.
+    * 
+    * Value must be greater than 0.
+    * 
+    * @param maxRetryInterval maximum retry interval to apply in the case a retry interval multiplier has been specified
+    */
+   void setMaxRetryInterval(long maxRetryInterval);
+
+   /**
+    * Returns the maximum number of attempts to retry connection in case of failure.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_RECONNECT_ATTEMPTS}.
+    * 
+    * @return the maximum number of attempts to retry connection in case of failure.
+    */
+   int getReconnectAttempts();
+
+   /**
+    * Sets the maximum number of attempts to retry connection in case of failure.
+    * 
+    * Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater than 0.
+    * 
+    * @param reconnectAttempts maximum number of attempts to retry connection in case of failure
+    */
+   void setReconnectAttempts(int reconnectAttempts);
+   
+   /**
+    * Returns true if the client will automatically attempt to connect to the backup server if the initial
+    * connection to the live server fails
+    * 
+    * Default value is {@link HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION}.
+    * 
+    * @return
+    */
+   boolean isFailoverOnInitialConnection();
+   
+   /**
+    * Sets the value for FailoverOnInitialReconnection
+    * 
+    * @param failover
+    */
+   void setFailoverOnInitialConnection(boolean failover);
+
+   /**
+    * Returns whether connections created by this factory must failover in case the server they are
+    * connected to <em>has normally shut down</em>.
+    * 
+    * Default value is {@link HornetQClient#DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN}.
+    * 
+    * @return <code>true</code> if connections must failover if the server has normally shut down, else <code>false</code>
+    */
+   boolean isFailoverOnServerShutdown();
+
+   /**
+    * Sets whether connections created by this factory must failover in case the server they are
+    * connected to <em>has normally shut down</em>
+    * 
+    * @param failoverOnServerShutdown <code>true</code> if connections must failover if the server has normally shut down, <code>false</code> else
+    */
+   void setFailoverOnServerShutdown(boolean failoverOnServerShutdown);
+
+   /**
+    * Returns the class name of the connection load balancing policy.
+    * 
+    * Default value is "org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy".
+    * 
+    * @return the class name of the connection load balancing policy
+    */
+   String getConnectionLoadBalancingPolicyClassName();
+
+   /**
+    * Sets the class name of the connection load balancing policy.
+    * 
+    * Value must be the name of a class implementing {@link ConnectionLoadBalancingPolicy}.
+    * 
+    * @param loadBalancingPolicyClassName class name of the connection load balancing policy
+    */
+   void setConnectionLoadBalancingPolicyClassName(String loadBalancingPolicyClassName);
+
+   /**
+    * Returns the initial size of messages created through this factory.
+    * 
+    * Value is in bytes, default value is  {@link HornetQClient#DEFAULT_INITIAL_MESSAGE_PACKET_SIZE}.
+    * 
+    * @return the initial size of messages created through this factory
+    */
+   int getInitialMessagePacketSize();
+
+   /**
+    * Sets the initial size of messages created through this factory.
+    * 
+    * Value must be greater than 0.
+    * 
+    * @param size initial size of messages created through this factory.
+    */
+   void setInitialMessagePacketSize(int size);
+   
+   /**
+    * Adds an interceptor which will be executed <em>after packets are received from the server</em>.
+    * 
+    * @param interceptor an Interceptor
+    */
+   void addInterceptor(Interceptor interceptor);
+
+   /**
+    * Removes an interceptor.
+    * 
+    * @param interceptor interceptor to remove
+    * 
+    * @return <code>true</code> if the interceptor is removed from this factory, <code>false</code> else
+    */
+   boolean removeInterceptor(Interceptor interceptor);
+
+   /**
+    * Closes this factory and release all its resources
+    */
+   void close();
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core.client.loadbalance;
+
+import org.hornetq.utils.Random;
+
+/**
+ * {@link RandomConnectionLoadBalancingPolicy#select(int)} chooses a the initial node randomly then subsequent requests return the same node
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ */
+public class RandomStickyConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy
+{
+   private final Random random = new Random();
+
+   private int pos = -1;
+
+   public int select(final int max)
+   {
+      if (pos == -1)
+      {
+         pos = random.getRandom().nextInt(max);
+      }
+      
+      return pos;
+   }
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,1038 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A ServerLocatorImpl
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
+{
+   private static final long serialVersionUID = -1615857864410205260L;
+
+   private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
+
+   private final boolean useHA;
+
+   private final String discoveryAddress;
+
+   private final int discoveryPort;
+
+   private final TransportConfiguration[] transportConfigs;
+
+   private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
+
+   private Pair<TransportConfiguration, TransportConfiguration>[] topology;
+
+   private Map<TransportConfiguration, TransportConfiguration> pairs = new HashMap<TransportConfiguration, TransportConfiguration>();
+
+   private boolean receivedTopology;
+
+   private ExecutorService threadPool;
+
+   private ScheduledExecutorService scheduledThreadPool;
+
+   private DiscoveryGroup discoveryGroup;
+
+   private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+   private boolean readOnly;
+
+   // Settable attributes:
+
+   private boolean cacheLargeMessagesClient;
+
+   private String localBindAddress;
+
+   private long discoveryRefreshTimeout;
+
+   private long discoveryInitialWaitTimeout;
+
+   private long clientFailureCheckPeriod;
+
+   private long connectionTTL;
+
+   private long callTimeout;
+
+   private int minLargeMessageSize;
+
+   private int consumerWindowSize;
+
+   private int consumerMaxRate;
+
+   private int confirmationWindowSize;
+
+   private int producerWindowSize;
+
+   private int producerMaxRate;
+
+   private boolean blockOnAcknowledge;
+
+   private boolean blockOnDurableSend;
+
+   private boolean blockOnNonDurableSend;
+
+   private boolean autoGroup;
+
+   private boolean preAcknowledge;
+
+   private String connectionLoadBalancingPolicyClassName;
+
+   private int ackBatchSize;
+
+   private boolean useGlobalPools;
+
+   private int scheduledThreadPoolMaxSize;
+
+   private int threadPoolMaxSize;
+
+   private long retryInterval;
+
+   private double retryIntervalMultiplier;
+
+   private long maxRetryInterval;
+
+   private int reconnectAttempts;
+
+   private boolean failoverOnInitialConnection;
+
+   private int initialMessagePacketSize;
+
+   private volatile boolean closed;
+
+   private boolean failoverOnServerShutdown;
+
+   private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+
+   private static ExecutorService globalThreadPool;
+
+   private static ScheduledExecutorService globalScheduledThreadPool;
+
+   private String groupID;
+
+   private static synchronized ExecutorService getGlobalThreadPool()
+   {
+      if (globalThreadPool == null)
+      {
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
+
+         globalThreadPool = Executors.newCachedThreadPool(factory);
+      }
+
+      return globalThreadPool;
+   }
+
+   private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+   {
+      if (globalScheduledThreadPool == null)
+      {
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
+                                                          true,
+                                                          getThisClassLoader());
+
+         globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+                                                                      factory);
+      }
+
+      return globalScheduledThreadPool;
+   }
+
+   private void setThreadPools()
+   {
+      if (useGlobalPools)
+      {
+         threadPool = getGlobalThreadPool();
+
+         scheduledThreadPool = getGlobalScheduledThreadPool();
+      }
+      else
+      {
+         ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
+                                                          true,
+                                                          getThisClassLoader());
+
+         if (threadPoolMaxSize == -1)
+         {
+            threadPool = Executors.newCachedThreadPool(factory);
+         }
+         else
+         {
+            threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+         }
+
+         factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
+                                            true,
+                                            getThisClassLoader());
+
+         scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+      }
+   }
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return ClientSessionFactoryImpl.class.getClassLoader();
+         }
+      });
+
+   }
+
+   private void instantiateLoadBalancingPolicy()
+   {
+      if (connectionLoadBalancingPolicyClassName == null)
+      {
+         throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
+      }
+
+      AccessController.doPrivileged(new PrivilegedAction<Object>()
+      {
+         public Object run()
+         {
+            ClassLoader loader = Thread.currentThread().getContextClassLoader();
+            try
+            {
+               Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
+               loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+               return null;
+            }
+            catch (Exception e)
+            {
+               throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
+                                                           "\"",
+                                                  e);
+            }
+         }
+      });
+   }
+
+   private synchronized void initialise() throws Exception
+   {
+      if (!readOnly)
+      {
+         setThreadPools();
+
+         instantiateLoadBalancingPolicy();
+
+         if (discoveryAddress != null)
+         {
+            InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
+
+            InetAddress lbAddress;
+
+            if (localBindAddress != null)
+            {
+               lbAddress = InetAddress.getByName(localBindAddress);
+            }
+            else
+            {
+               lbAddress = null;
+            }
+
+            discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+                                                    discoveryAddress,
+                                                    lbAddress,
+                                                    groupAddress,
+                                                    discoveryPort,
+                                                    discoveryRefreshTimeout);
+
+            discoveryGroup.registerListener(this);
+
+            discoveryGroup.start();
+         }
+         else
+         {
+            setTopologyFromStaticList();
+         }
+         readOnly = true;
+      }
+   }
+
+   private ServerLocatorImpl(final boolean useHA,
+                             final String discoveryAddress,
+                             final int discoveryPort,
+                             final TransportConfiguration[] transportConfigs)
+   {
+      this.useHA = useHA;
+
+      this.discoveryAddress = discoveryAddress;
+
+      this.discoveryPort = discoveryPort;
+
+      this.transportConfigs = transportConfigs;
+
+      discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
+
+      clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+      connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+      callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+      minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+      consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+      confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+      producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+      producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+      blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+      blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+      blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+      autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+      preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+      ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+      connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+      discoveryInitialWaitTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+
+      useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+      scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+      threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+      retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+      retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+      maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+      reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+      failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
+
+      failoverOnServerShutdown = HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
+
+      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+      initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+
+      cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+   }
+
+   /**
+    * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+    * @param discoveryAddress
+    * @param discoveryPort
+    */
+   public ServerLocatorImpl(final boolean useHA, final String discoveryAddress, final int discoveryPort)
+   {
+      this(useHA, discoveryAddress, discoveryPort, null);
+   }
+
+   /**
+    * Create a ServerLocatorImpl using a static list of live servers
+    * @param transportConfigs
+    */
+   public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
+   {
+      this(useHA, null, -1, transportConfigs);
+   }
+
+   public ClientSessionFactory createSessionFactory() throws Exception
+   {
+      if (closed)
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      if (topology == null && discoveryGroup != null)
+      {
+         // Wait for an initial broadcast to give us at least one node in the cluster
+
+         boolean ok = discoveryGroup.waitForBroadcast(discoveryInitialWaitTimeout);
+
+         if (!ok)
+         {
+            throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                       "Timed out waiting to receive initial broadcast from cluster");
+         }
+      }
+
+      ClientSessionFactory factory = null;
+      
+      synchronized (this)
+      {
+         boolean retry;
+         int attempts = 0;
+         do
+         {
+            retry = false;
+            
+            int pos = loadBalancingPolicy.select(topology.length);
+   
+            Pair<TransportConfiguration, TransportConfiguration> pair = topology[pos];
+   
+            // try each factory in the list until we find one which works
+   
+            try
+            {
+               factory = new ClientSessionFactoryImpl(this,
+                                                      pair.a,
+                                                      failoverOnServerShutdown,
+                                                      callTimeout,
+                                                      clientFailureCheckPeriod,
+                                                      connectionTTL,
+                                                      retryInterval,
+                                                      retryIntervalMultiplier,
+                                                      maxRetryInterval,
+                                                      reconnectAttempts,
+                                                      failoverOnInitialConnection,
+                                                      threadPool,
+                                                      scheduledThreadPool,
+                                                      interceptors);
+            }
+            catch (HornetQException e)
+            {
+               if (e.getCode() == HornetQException.NOT_CONNECTED)
+               {
+                  attempts++;
+                  
+                  if (attempts == topology.length)
+                  {
+                     throw new HornetQException(HornetQException.NOT_CONNECTED, "Cannot connect to server(s). Tried with all available servers.");
+                  }
+                  
+                  retry = true;
+               }
+               else
+               {
+                  throw e;
+               }
+            }
+         }
+         while (retry);
+
+         if (useHA)
+         {
+            long toWait = 30000;
+            long start = System.currentTimeMillis();
+            while (!receivedTopology && toWait > 0)
+            {
+               // Now wait for the topology
+
+               try
+               {
+                  this.wait(toWait);
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+
+               long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+
+            if (toWait <= 0)
+            {
+               throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                          "Timed out waiting to receive cluster topology");
+            }
+         }
+
+         factories.add(factory);
+
+         return factory;
+      }
+   }
+
+   public synchronized boolean isCacheLargeMessagesClient()
+   {
+      return cacheLargeMessagesClient;
+   }
+
+   public synchronized void setCacheLargeMessagesClient(final boolean cached)
+   {
+      cacheLargeMessagesClient = cached;
+   }
+
+   public synchronized long getClientFailureCheckPeriod()
+   {
+      return clientFailureCheckPeriod;
+   }
+
+   public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+   {
+      checkWrite();
+      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+   }
+
+   public synchronized long getConnectionTTL()
+   {
+      return connectionTTL;
+   }
+
+   public synchronized void setConnectionTTL(final long connectionTTL)
+   {
+      checkWrite();
+      this.connectionTTL = connectionTTL;
+   }
+
+   public synchronized long getCallTimeout()
+   {
+      return callTimeout;
+   }
+
+   public synchronized void setCallTimeout(final long callTimeout)
+   {
+      checkWrite();
+      this.callTimeout = callTimeout;
+   }
+
+   public synchronized int getMinLargeMessageSize()
+   {
+      return minLargeMessageSize;
+   }
+
+   public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+   {
+      checkWrite();
+      this.minLargeMessageSize = minLargeMessageSize;
+   }
+
+   public synchronized int getConsumerWindowSize()
+   {
+      return consumerWindowSize;
+   }
+
+   public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+   {
+      checkWrite();
+      this.consumerWindowSize = consumerWindowSize;
+   }
+
+   public synchronized int getConsumerMaxRate()
+   {
+      return consumerMaxRate;
+   }
+
+   public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+   {
+      checkWrite();
+      this.consumerMaxRate = consumerMaxRate;
+   }
+
+   public synchronized int getConfirmationWindowSize()
+   {
+      return confirmationWindowSize;
+   }
+
+   public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+   {
+      checkWrite();
+      this.confirmationWindowSize = confirmationWindowSize;
+   }
+
+   public synchronized int getProducerWindowSize()
+   {
+      return producerWindowSize;
+   }
+
+   public synchronized void setProducerWindowSize(final int producerWindowSize)
+   {
+      checkWrite();
+      this.producerWindowSize = producerWindowSize;
+   }
+
+   public synchronized int getProducerMaxRate()
+   {
+      return producerMaxRate;
+   }
+
+   public synchronized void setProducerMaxRate(final int producerMaxRate)
+   {
+      checkWrite();
+      this.producerMaxRate = producerMaxRate;
+   }
+
+   public synchronized boolean isBlockOnAcknowledge()
+   {
+      return blockOnAcknowledge;
+   }
+
+   public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+   {
+      checkWrite();
+      this.blockOnAcknowledge = blockOnAcknowledge;
+   }
+
+   public synchronized boolean isBlockOnDurableSend()
+   {
+      return blockOnDurableSend;
+   }
+
+   public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+   {
+      checkWrite();
+      this.blockOnDurableSend = blockOnDurableSend;
+   }
+
+   public synchronized boolean isBlockOnNonDurableSend()
+   {
+      return blockOnNonDurableSend;
+   }
+
+   public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+   {
+      checkWrite();
+      this.blockOnNonDurableSend = blockOnNonDurableSend;
+   }
+
+   public synchronized boolean isAutoGroup()
+   {
+      return autoGroup;
+   }
+
+   public synchronized void setAutoGroup(final boolean autoGroup)
+   {
+      checkWrite();
+      this.autoGroup = autoGroup;
+   }
+
+   public synchronized boolean isPreAcknowledge()
+   {
+      return preAcknowledge;
+   }
+
+   public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+   {
+      checkWrite();
+      this.preAcknowledge = preAcknowledge;
+   }
+
+   public synchronized int getAckBatchSize()
+   {
+      return ackBatchSize;
+   }
+
+   public synchronized void setAckBatchSize(final int ackBatchSize)
+   {
+      checkWrite();
+      this.ackBatchSize = ackBatchSize;
+   }
+
+   public synchronized long getDiscoveryInitialWaitTimeout()
+   {
+      return discoveryInitialWaitTimeout;
+   }
+
+   public synchronized void setDiscoveryInitialWaitTimeout(final long initialWaitTimeout)
+   {
+      checkWrite();
+      discoveryInitialWaitTimeout = initialWaitTimeout;
+   }
+
+   public synchronized boolean isUseGlobalPools()
+   {
+      return useGlobalPools;
+   }
+
+   public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+   {
+      checkWrite();
+      this.useGlobalPools = useGlobalPools;
+   }
+
+   public synchronized int getScheduledThreadPoolMaxSize()
+   {
+      return scheduledThreadPoolMaxSize;
+   }
+
+   public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+   {
+      checkWrite();
+      this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+   }
+
+   public synchronized int getThreadPoolMaxSize()
+   {
+      return threadPoolMaxSize;
+   }
+
+   public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+   {
+      checkWrite();
+      this.threadPoolMaxSize = threadPoolMaxSize;
+   }
+
+   public synchronized long getRetryInterval()
+   {
+      return retryInterval;
+   }
+
+   public synchronized void setRetryInterval(final long retryInterval)
+   {
+      checkWrite();
+      this.retryInterval = retryInterval;
+   }
+
+   public synchronized long getMaxRetryInterval()
+   {
+      return maxRetryInterval;
+   }
+
+   public synchronized void setMaxRetryInterval(final long retryInterval)
+   {
+      checkWrite();
+      maxRetryInterval = retryInterval;
+   }
+
+   public synchronized double getRetryIntervalMultiplier()
+   {
+      return retryIntervalMultiplier;
+   }
+
+   public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+   {
+      checkWrite();
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+   }
+
+   public synchronized int getReconnectAttempts()
+   {
+      return reconnectAttempts;
+   }
+
+   public synchronized void setReconnectAttempts(final int reconnectAttempts)
+   {
+      checkWrite();
+      this.reconnectAttempts = reconnectAttempts;
+   }
+
+   public synchronized boolean isFailoverOnInitialConnection()
+   {
+      return this.failoverOnInitialConnection;
+   }
+
+   public synchronized void setFailoverOnInitialConnection(final boolean failover)
+   {
+      checkWrite();
+      this.failoverOnInitialConnection = failover;
+   }
+
+   public synchronized boolean isFailoverOnServerShutdown()
+   {
+      return failoverOnServerShutdown;
+   }
+
+   public synchronized void setFailoverOnServerShutdown(final boolean failoverOnServerShutdown)
+   {
+      checkWrite();
+      this.failoverOnServerShutdown = failoverOnServerShutdown;
+   }
+
+   public synchronized String getConnectionLoadBalancingPolicyClassName()
+   {
+      return connectionLoadBalancingPolicyClassName;
+   }
+
+   public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+   {
+      checkWrite();
+      connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+   }
+
+   public synchronized String getLocalBindAddress()
+   {
+      return localBindAddress;
+   }
+
+   public synchronized void setLocalBindAddress(final String localBindAddress)
+   {
+      checkWrite();
+      this.localBindAddress = localBindAddress;
+   }
+
+   public synchronized String getDiscoveryAddress()
+   {
+      return discoveryAddress;
+   }
+
+   public synchronized int getDiscoveryPort()
+   {
+      return discoveryPort;
+   }
+
+   public TransportConfiguration[] getStaticTransportConfigurations()
+   {
+      return transportConfigs;
+   }
+
+   public synchronized long getDiscoveryRefreshTimeout()
+   {
+      return discoveryRefreshTimeout;
+   }
+
+   public void addInterceptor(final Interceptor interceptor)
+   {
+      interceptors.add(interceptor);
+   }
+
+   public boolean removeInterceptor(final Interceptor interceptor)
+   {
+      return interceptors.remove(interceptor);
+   }
+
+   public synchronized void setDiscoveryRefreshTimeout(final long discoveryRefreshTimeout)
+   {
+      checkWrite();
+      this.discoveryRefreshTimeout = discoveryRefreshTimeout;
+   }
+
+   public synchronized int getInitialMessagePacketSize()
+   {
+      return initialMessagePacketSize;
+   }
+
+   public synchronized void setInitialMessagePacketSize(final int size)
+   {
+      checkWrite();
+      initialMessagePacketSize = size;
+   }
+
+   public void setGroupID(final String groupID)
+   {
+      checkWrite();
+      this.groupID = groupID;
+   }
+
+   public String getGroupID()
+   {
+      return groupID;
+   }
+
+   private void checkWrite()
+   {
+      if (readOnly)
+      {
+         throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
+      }
+   }
+
+   @Override
+   protected void finalize() throws Throwable
+   {
+      close();
+
+      super.finalize();
+   }
+
+   public void close()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      if (discoveryGroup != null)
+      {
+         try
+         {
+            discoveryGroup.stop();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to stop discovery group", e);
+         }
+      }
+
+      for (ClientSessionFactory factory : factories)
+      {
+         factory.close();
+      }
+
+      factories.clear();
+
+      if (!useGlobalPools)
+      {
+         if (threadPool != null)
+         {
+            threadPool.shutdown();
+
+            try
+            {
+               if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+               {
+                  log.warn("Timed out waiting for pool to terminate");
+               }
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+         }
+
+         if (scheduledThreadPool != null)
+         {
+            scheduledThreadPool.shutdown();
+
+            try
+            {
+               if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+               {
+                  log.warn("Timed out waiting for scheduled pool to terminate");
+               }
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+         }
+      }
+
+      closed = true;
+   }
+
+   public synchronized void onTopologyChanged(List<Pair<TransportConfiguration, TransportConfiguration>> topology)
+   {
+      if (!useHA)
+      {
+         return;
+      }
+
+      this.topology = topology.toArray(this.topology);
+
+      this.pairs.clear();
+
+      for (Pair<TransportConfiguration, TransportConfiguration> pair : topology)
+      {
+         if (pair.b != null)
+         {
+            pairs.put(pair.a, pair.b);
+         }
+      }
+
+      receivedTopology = true;
+   }
+
+   private void createTopologyArray(final int size)
+   {
+      topology = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class, size);
+   }
+
+   public synchronized void connectorsChanged()
+   {
+      if (receivedTopology)
+      {
+         return;
+      }
+
+      Map<String, DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntryMap();
+
+      createTopologyArray(newConnectors.size());
+
+      int i = 0;
+      for (DiscoveryEntry entry : newConnectors.values())
+      {
+         topology[i++] = new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null);
+      }
+   }
+
+   public synchronized void factoryClosed(final ClientSessionFactory factory)
+   {
+      this.factories.remove(factory);
+
+      if (this.factories.isEmpty())
+      {
+         // Go back to using the broadcast or static list
+
+         receivedTopology = false;
+
+         if (transportConfigs != null)
+         {
+            setTopologyFromStaticList();
+         }
+         else
+         {
+            topology = null;
+         }
+      }
+   }
+
+   private void setTopologyFromStaticList()
+   {
+      createTopologyArray(transportConfigs.length);
+
+      int i = 0;
+      for (TransportConfiguration config : transportConfigs)
+      {
+         topology[i++] = new Pair<TransportConfiguration, TransportConfiguration>(config, null);
+      }
+   }
+
+   public synchronized TransportConfiguration getBackup(final TransportConfiguration live)
+   {
+      return pairs.get(live);
+   }
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+
+/**
+ * A ServerLocatorInternal
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ServerLocatorInternal extends ServerLocator
+{
+   void factoryClosed(final ClientSessionFactory factory);
+
+   TransportConfiguration getBackup( TransportConfiguration live);
+   
+   void onTopologyChanged(List<Pair<TransportConfiguration, TransportConfiguration>> clusterTopology);
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ClusterTopologyMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(ClusterTopologyMessage.class);
+
+   // Attributes ----------------------------------------------------
+
+   private List<Pair<TransportConfiguration, TransportConfiguration>> topology;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ClusterTopologyMessage(final List<Pair<TransportConfiguration, TransportConfiguration>> topology)
+   {
+      super(PacketImpl.CLUSTER_TOPOLOGY);
+
+      this.topology = topology;
+   }
+
+   public ClusterTopologyMessage()
+   {
+      super(PacketImpl.CLUSTER_TOPOLOGY);
+   }
+
+   // Public --------------------------------------------------------
+
+
+   public List<Pair<TransportConfiguration, TransportConfiguration>> getTopology()
+   {
+      return topology;
+   }
+   
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeInt(topology.size());
+      for (Pair<TransportConfiguration, TransportConfiguration> pair: topology)
+      {
+         pair.a.encode(buffer);
+         if (pair.b != null)
+         {
+            buffer.writeBoolean(true);
+            pair.b.encode(buffer);
+         }
+         else
+         {
+            buffer.writeBoolean(false);
+         }
+      }
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      int size = buffer.readInt();
+      topology = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
+      for (int i = 0; i < size; i++)
+      {
+         TransportConfiguration a = new TransportConfiguration();
+         a.decode(buffer);
+         boolean hasBackup = buffer.readBoolean();
+         TransportConfiguration b;
+         if (hasBackup)
+         {
+            b = new TransportConfiguration();
+            b.decode(buffer);
+         }
+         else
+         {
+            b = null;
+         }
+         Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+         topology.add(pair);
+      }
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster;
+
+import java.util.List;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * A ClusterTopologyListener
+ *
+ * @author tim
+ *
+ *
+ */
+public interface ClusterTopologyListener
+{
+   void onTopologyChanged(List<Pair<TransportConfiguration, TransportConfiguration>> clusterTopology);
+
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/LockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/LockFile.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/LockFile.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster;
+
+import java.io.IOException;
+
+/**
+ * A FailoverLockFile
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface LockFile
+{
+   void lock() throws IOException;
+   
+   boolean unlock() throws IOException;
+   
+   String getFileName();
+   
+   String getDirectory();
+   
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hornetq.core.server.cluster.LockFile;
+
+/**
+ * A FakeLockFile
+ * 
+ * A VM-wide exclusive lock on a file.
+ * 
+ * Advisory only.
+ * 
+ * Used for testing.
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class FakeLockFile implements LockFile
+{
+   private final String fileName;
+
+   private final String directory;
+   
+   private static Map<String, Lock> locks = new WeakHashMap<String, Lock>();
+   
+   private Lock lock;
+   
+   /**
+    * @param fileName
+    * @param directory
+    */
+   public FakeLockFile(final String fileName, final String directory)
+   {
+      this.fileName = fileName;
+      
+      this.directory = directory;
+      
+      synchronized (locks)
+      {
+         String key = directory + fileName;
+         
+         lock = locks.get(key);
+         
+         if (lock == null)
+         {
+            lock = new ReentrantLock(true);
+            
+            locks.put(key, lock);
+         }
+      }
+   }
+   
+   public String getFileName()
+   {
+      return fileName;
+   }
+
+   public String getDirectory()
+   {
+      return directory;
+   }
+
+   public void lock() throws IOException
+   {
+      lock.lock();
+   }
+
+   public boolean unlock() throws IOException
+   {
+      lock.unlock();
+      
+      return true;
+   }
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java	2010-06-11 15:36:20 UTC (rev 9308)
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.LockFile;
+
+/**
+ * A FailoverLockFileImpl
+ * 
+ * The lock is per VM!
+ * 
+ * Won't work well with NFS or GFS
+ *
+ * @author Tim Fox
+ *
+ */
+public class LockFileImpl implements LockFile
+{
+   private static final Logger log = Logger.getLogger(LockFileImpl.class);
+
+   private final String fileName;
+
+   private final String directory;
+
+   private RandomAccessFile raFile;
+
+   private FileLock lock;
+
+   /*
+    * This method is "mainly" for testing (apologies for pun)
+    */
+   public static final void main(String[] args)
+   {
+      LockFileImpl lock = new LockFileImpl(args[0], args[1]);
+
+      long time = Long.parseLong(args[2]);
+
+      try
+      {
+         lock.lock();
+      }
+      catch (IOException e)
+      {
+         log.error("Failed to get lock", e);
+      }
+
+      log.info("Sleeping for " + time + " ms");
+
+      try
+      {
+         Thread.sleep(time);
+      }
+      catch (InterruptedException e)
+      {
+      }
+
+      try
+      {
+         lock.unlock();
+      }
+      catch (IOException e)
+      {
+         log.error("Failed to unlock", e);
+      }
+   }
+
+   /**
+    * @param fileName
+    * @param directory
+    */
+   public LockFileImpl(final String fileName, final String directory)
+   {
+      this.fileName = fileName;
+
+      this.directory = directory;
+   }
+
+   public String getFileName()
+   {
+      return fileName;
+   }
+
+   public String getDirectory()
+   {
+      return directory;
+   }
+
+   private final Object lockLock = new Object();
+
+   private final Object unlockLock = new Object();
+
+   public void lock() throws IOException
+   {
+      synchronized (lockLock)
+      {
+         File file = new File(directory, fileName);
+
+         log.info("Trying to create " + file.getCanonicalPath());
+
+         if (!file.exists())
+         {
+            file.createNewFile();
+         }
+
+         raFile = new RandomAccessFile(file, "rw");
+
+         FileChannel channel = raFile.getChannel();
+
+         // Try and obtain exclusive lock
+         log.info("Trying to obtain exclusive lock on " + fileName);
+
+         lock = channel.lock();
+
+         log.info("obtained lock");
+      }
+   }
+
+   public boolean unlock() throws IOException
+   {
+      synchronized (unlockLock)
+      {
+         if (lock == null)
+         {
+            return false;
+         }
+
+         lock.release();
+
+         lock = null;
+
+         raFile.close();
+
+         raFile = null;
+
+         log.info("Released lock on " + fileName);
+
+         return true;
+      }
+   }
+
+}



More information about the hornetq-commits mailing list