Author: timfox
Date: 2010-06-11 11:36:20 -0400 (Fri, 11 Jun 2010)
New Revision: 9308
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/LockFile.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java
Log:
new HA
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core.client;
+
+import java.util.List;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * A ClusterTopologyListener
+ *
+ * @author tim
+ *
+ *
+ */
+public interface ClusterTopologyListener
+{
+ void onTopologyChanged(List<Pair<TransportConfiguration,
TransportConfiguration>> clusterTopology);
+
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core.client;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+
+/**
+ * A ServerLocator
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ServerLocator
+{
+ ClientSessionFactory createSessionFactory() throws Exception;
+
+ /**
+ * Returns the period used to check if a client has failed to receive pings from the
server.
+ *
+ * Period is in milliseconds, default value is {@link
HornetQClient#DEFAULT_CLIENT_FAILURE_CHECK_PERIOD}.
+ *
+ * @return the period used to check if a client has failed to receive pings from the
server
+ */
+ long getClientFailureCheckPeriod();
+
+ /**
+ * Sets the period (in milliseconds) used to check if a client has failed to receive
pings from the server.
+ *
+ * Value must be -1 (to disable) or greater than 0.
+ *
+ * @param clientFailureCheckPeriod the period to check failure
+ */
+ void setClientFailureCheckPeriod(long clientFailureCheckPeriod);
+
+ /**
+ * When <code>true</code>, consumers created through this factory will
create temporary files to cache large messages.
+ *
+ * There is 1 temporary file created for each large message.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_CACHE_LARGE_MESSAGE_CLIENT}.
+ *
+ * @return <code>true</code> if consumers created through this factory
will cache large messages in temporary files, <code>false</code> else
+ */
+ boolean isCacheLargeMessagesClient();
+
+ /**
+ * Sets whether large messages received by consumers created through this factory will
be cached in temporary files or not.
+ *
+ * @param cached <code>true</code> to cache large messages in temporary
files, <code>false</code> else
+ */
+ void setCacheLargeMessagesClient(boolean cached);
+
+ /**
+ * Returns the connection <em>time-to-live</em>.
+ * This TTL determines how long the server will keep a connection alive in the absence
of any data arriving from the client.
+ *
+ * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_CONNECTION_TTL}.
+ *
+ * @return the connection time-to-live in milliseconds
+ */
+ long getConnectionTTL();
+
+ /**
+ * Sets this factory's connections <em>time-to-live</em>.
+ *
+ * Value must be -1 (to disable) or greater or equals to 0.
+ *
+ * @param connectionTTL period in milliseconds
+ */
+ void setConnectionTTL(long connectionTTL);
+
+ /**
+ * Returns the blocking calls timeout.
+ *
+ * If client's blocking calls to the server take more than this timeout, the call
will throw a {@link HornetQException} with the code {@link
HornetQException#CONNECTION_TIMEDOUT}.
+ * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_CALL_TIMEOUT}.
+ *
+ * @return the blocking calls timeout
+ */
+ long getCallTimeout();
+
+ /**
+ * Sets the blocking call timeout.
+ *
+ * Value must be greater or equals to 0
+ *
+ * @param callTimeout blocking call timeout in milliseconds
+ */
+ void setCallTimeout(long callTimeout);
+
+ /**
+ * Returns the large message size threshold.
+ *
+ * Messages whose size is if greater than this value will be handled as
<em>large messages</em>.
+ *
+ * Value is in bytes, default value is {@link
HornetQClient#DEFAULT_MIN_LARGE_MESSAGE_SIZE}.
+ *
+ * @return the message size threshold to treat messages as large messages.
+ */
+ int getMinLargeMessageSize();
+
+ /**
+ * Sets the large message size threshold.
+ *
+ * Value must be greater than 0.
+ *
+ * @param minLargeMessageSize large message size threshold in bytes
+ */
+ void setMinLargeMessageSize(int minLargeMessageSize);
+
+ /**
+ * Returns the window size for flow control of the consumers created through this
factory.
+ *
+ * Value is in bytes, default value is {@link
HornetQClient#DEFAULT_CONSUMER_WINDOW_SIZE}.
+ *
+ * @return the window size used for consumer flow control
+ */
+ int getConsumerWindowSize();
+
+ /**
+ * Sets the window size for flow control of the consumers created through this
factory.
+ *
+ * Value must be -1 (to disable flow control), 0 (to not buffer any messages) or
greater than 0 (to set the maximum size of the buffer)
+ *
+ * @param consumerWindowSize window size (in bytes) used for consumer flow control
+ */
+ void setConsumerWindowSize(int consumerWindowSize);
+
+ /**
+ * Returns the maximum rate of message consumption for consumers created through this
factory.
+ *
+ * This value controls the rate at which a consumer can consume messages. A consumer
will never consume messages at a rate faster than the rate specified.
+ *
+ * Value is -1 (to disable) or a positive integer corresponding to the maximum desired
message consumption rate specified in units of messages per second.
+ * Default value is {@link HornetQClient#DEFAULT_CONSUMER_MAX_RATE}.
+ *
+ * @return the consumer max rate
+ */
+ int getConsumerMaxRate();
+
+ /**
+ * Sets the maximum rate of message consumption for consumers created through this
factory.
+ *
+ * Value must -1 (to disable) or a positive integer corresponding to the maximum
desired message consumption rate specified in units of messages per second.
+ *
+ * @param consumerMaxRate maximum rate of message consumption (in messages per
seconds)
+ */
+ void setConsumerMaxRate(int consumerMaxRate);
+
+ /**
+ * Returns the size for the confirmation window of clients using this factory.
+ *
+ * Value is in bytes or -1 (to disable the window). Default value is {@link
HornetQClient#DEFAULT_CONFIRMATION_WINDOW_SIZE}.
+ *
+ * @return the size for the confirmation window of clients using this factory
+ */
+ int getConfirmationWindowSize();
+
+ /**
+ * Sets the size for the confirmation window buffer of clients using this factory.
+ *
+ * Value must be -1 (to disable the window) or greater than 0.
+
+ * @param confirmationWindowSize size of the confirmation window (in bytes)
+ */
+ void setConfirmationWindowSize(int confirmationWindowSize);
+
+ /**
+ * Returns the window size for flow control of the producers created through this
factory.
+ *
+ * Value must be -1 (to disable flow control) or greater than 0 to determine the
maximum amount of bytes at any give time (to prevent overloading the connection).
+ * Default value is {@link HornetQClient#DEFAULT_PRODUCER_WINDOW_SIZE}.
+ *
+ * @return the window size for flow control of the producers created through this
factory.
+ */
+ int getProducerWindowSize();
+
+ /**
+ * Returns the window size for flow control of the producers created through this
factory.
+ *
+ * Value must be -1 (to disable flow control) or greater than 0.
+ *
+ * @param producerWindowSize window size (in bytest) for flow control of the producers
created through this factory.
+ */
+ void setProducerWindowSize(int producerWindowSize);
+
+ /**
+ * Returns the maximum rate of message production for producers created through this
factory.
+ *
+ * This value controls the rate at which a producer can produce messages. A producer
will never produce messages at a rate faster than the rate specified.
+ *
+ * Value is -1 (to disable) or a positive integer corresponding to the maximum desired
message production rate specified in units of messages per second.
+ * Default value is {@link HornetQClient#DEFAULT_PRODUCER_MAX_RATE}.
+ *
+ * @return maximum rate of message production (in messages per seconds)
+ */
+ int getProducerMaxRate();
+
+ /**
+ * Sets the maximum rate of message production for producers created through this
factory.
+ *
+ * Value must -1 (to disable) or a positive integer corresponding to the maximum
desired message production rate specified in units of messages per second.
+ *
+ * @param producerMaxRate maximum rate of message production (in messages per
seconds)
+ */
+ void setProducerMaxRate(int producerMaxRate);
+
+ /**
+ * Returns whether consumers created through this factory will block while sending
message acknowledgements or do it asynchronously.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_ACKNOWLEDGE}.
+ *
+ * @return whether consumers will block while sending message acknowledgements or do
it asynchronously
+ */
+ boolean isBlockOnAcknowledge();
+
+ /**
+ * Sets whether consumers created through this factory will block while sending
message acknowledgements or do it asynchronously.
+ *
+ * @param blockOnAcknowledge <code>true</code> to block when sending
message acknowledgements or <code>false</code> to send them asynchronously
+ */
+ void setBlockOnAcknowledge(boolean blockOnAcknowledge);
+
+ /**
+ * Returns whether producers created through this factory will block while sending
<em>durable</em> messages or do it asynchronously.
+ * <br>
+ * If the session is configured to send durable message asynchronously, the client can
set a SendAcknowledgementHandler on the ClientSession
+ * to be notified once the message has been handled by the server.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_DURABLE_SEND}.
+ *
+ * @return whether producers will block while sending persistent messages or do it
asynchronously
+ */
+ boolean isBlockOnDurableSend();
+
+ /**
+ * Sets whether producers created through this factory will block while sending
<em>durable</em> messages or do it asynchronously.
+ *
+ * @param blockOnDurableSend <code>true</code> to block when sending
durable messages or <code>false</code> to send them asynchronously
+ */
+ void setBlockOnDurableSend(boolean blockOnDurableSend);
+
+ /**
+ * Returns whether producers created through this factory will block while sending
<em>non-durable</em> messages or do it asynchronously.
+ * <br>
+ * If the session is configured to send non-durable message asynchronously, the client
can set a SendAcknowledgementHandler on the ClientSession
+ * to be notified once the message has been handled by the server.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_NON_DURABLE_SEND}.
+ *
+ * @return whether producers will block while sending non-durable messages or do it
asynchronously
+ */
+ boolean isBlockOnNonDurableSend();
+
+ /**
+ * Sets whether producers created through this factory will block while sending
<em>non-durable</em> messages or do it asynchronously.
+ *
+ * @param blockOnNonDurableSend <code>true</code> to block when sending
non-durable messages or <code>false</code> to send them asynchronously
+ */
+ void setBlockOnNonDurableSend(boolean blockOnNonDurableSend);
+
+ /**
+ * Returns whether producers created through this factory will automatically
+ * assign a group ID to the messages they sent.
+ *
+ * if <code>true</code>, a random unique group ID is created and set on
each message for the property
+ * {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
+ * Default value is {@link HornetQClient#DEFAULT_AUTO_GROUP}.
+ *
+ * @return whether producers will automatically assign a group ID to their messages
+ */
+ boolean isAutoGroup();
+
+ /**
+ * Sets whether producers created through this factory will automatically
+ * assign a group ID to the messages they sent.
+ *
+ * @param autoGroup <code>true</code> to automatically assign a group ID
to each messages sent through this factory, <code>false</code> else
+ */
+ void setAutoGroup(boolean autoGroup);
+
+ /**
+ * Returns the group ID that will be eventually set on each message for the property
{@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
+ *
+ * Default value is is <code>null</code> and no group ID will be set on
the messages.
+ *
+ * @return the group ID that will be eventually set on each message
+ */
+ String getGroupID();
+
+ /**
+ * Sets the group ID that will be set on each message sent through this factory.
+ *
+ * @param groupID the group ID to use
+ */
+ void setGroupID(String groupID);
+
+ /**
+ * Returns whether messages will pre-acknowledged on the server before they are sent
to the consumers or not.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_PRE_ACKNOWLEDGE}
+ */
+ boolean isPreAcknowledge();
+
+ /**
+ * Sets to <code>true</code> to pre-acknowledge consumed messages on the
server before they are sent to consumers, else set to <code>false</code> to
let
+ * clients acknowledge the message they consume.
+ *
+ * @param preAcknowledge <code>true</code> to enable pre-acknowledgement,
<code>false</code> else
+ */
+ void setPreAcknowledge(boolean preAcknowledge);
+
+ /**
+ * Returns the acknowledgements batch size.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_ACK_BATCH_SIZE}.
+ *
+ * @return the acknowledgements batch size
+ */
+ int getAckBatchSize();
+
+ /**
+ * Sets the acknowledgements batch size.
+ *
+ * Value must be equal or greater than 0.
+ *
+ * @param ackBatchSize acknowledgements batch size
+ */
+ void setAckBatchSize(int ackBatchSize);
+
+ /**
+ * Returns the local bind address to which the multicast socket is bound for
discovery.
+ *
+ * This is null if the multicast socket is not bound, or no discovery is being used
+ *
+ * @return the local bind address to which the multicast socket is bound for
discovery
+ */
+ String getLocalBindAddress();
+
+ /**
+ * Sets the local bind address to which the multicast socket is bound for discovery.
+ *
+ * @param the local bind address
+ */
+ void setLocalBindAddress(String localBindAddress);
+
+ /**
+ * Returns the address to listen to discover which connectors this factory can use.
+ * The discovery address must be set to enable this factory to discover HornetQ
servers.
+ *
+ * @return the address to listen to discover which connectors this factory can use
+ */
+ String getDiscoveryAddress();
+
+ /**
+ * Returns the port to listen to discover which connectors this factory can use.
+ * The discovery port must be set to enable this factory to discover HornetQ servers.
+ *
+ * @return the port to listen to discover which connectors this factory can use
+ */
+ int getDiscoveryPort();
+
+
+ /**
+ * Returns an array of TransportConfigurations representing the static list of live
servers used when
+ * creating this object
+ * @return
+ */
+ TransportConfiguration[] getStaticTransportConfigurations();
+
+ /**
+ * Returns the refresh timeout for discovered HornetQ servers.
+ *
+ * If this factory uses discovery to find HornetQ servers, the list of discovered
servers
+ * will be refreshed according to this timeout.
+ *
+ * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_DISCOVERY_REFRESH_TIMEOUT}.
+ *
+ * @return the refresh timeout for discovered HornetQ servers
+ */
+ long getDiscoveryRefreshTimeout();
+
+ /**
+ * Sets the refresh timeout for discovered HornetQ servers.
+ *
+ * Value must be greater than 0.
+ *
+ * @param discoveryRefreshTimeout refresh timeout (in milliseconds) for discovered
HornetQ servers
+ */
+ void setDiscoveryRefreshTimeout(long discoveryRefreshTimeout);
+
+ /**
+ * Returns the initial wait timeout if this factory is configured to use discovery.
+ *
+ * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT}.
+ *
+ * @return the initial wait timeout if this factory is configured to use discovery
+ */
+ long getDiscoveryInitialWaitTimeout();
+
+ /**
+ * Sets the initial wait timeout if this factory is configured to use discovery.
+ *
+ * Value is in milliseconds and must be greater than 0.
+ *
+ * @param initialWaitTimeout initial wait timeout when using discovery
+ */
+ void setDiscoveryInitialWaitTimeout(long initialWaitTimeout);
+
+ /**
+ * Returns whether this factory will use global thread pools (shared among all the
factories in the same JVM)
+ * or its own pools.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_USE_GLOBAL_POOLS}.
+ *
+ * @return <code>true</code> if this factory uses global thread pools,
<code>false</code> else
+ */
+ boolean isUseGlobalPools();
+
+ /**
+ * Sets whether this factory will use global thread pools (shared among all the
factories in the same JVM)
+ * or its own pools.
+ *
+ * @param useGlobalPools <code>true</code> to let this factory uses global
thread pools, <code>false</code> else
+ */
+ void setUseGlobalPools(boolean useGlobalPools);
+
+ /**
+ * Returns the maximum size of the scheduled thread pool.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE}.
+ *
+ * @return the maximum size of the scheduled thread pool.
+ */
+ int getScheduledThreadPoolMaxSize();
+
+ /**
+ * Sets the maximum size of the scheduled thread pool.
+ *
+ * This setting is relevant only if this factory does not use global pools.
+ * Value must be greater than 0.
+ *
+ * @param scheduledThreadPoolMaxSize maximum size of the scheduled thread pool.
+ */
+ void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize);
+
+ /**
+ * Returns the maximum size of the thread pool.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_THREAD_POOL_MAX_SIZE}.
+ *
+ * @return the maximum size of the thread pool.
+ */
+ int getThreadPoolMaxSize();
+
+ /**
+ * Sets the maximum size of the thread pool.
+ *
+ * This setting is relevant only if this factory does not use global pools.
+ * Value must be -1 (for unlimited thread pool) or greater than 0.
+ *
+ * @param threadPoolMaxSize maximum size of the thread pool.
+ */
+ void setThreadPoolMaxSize(int threadPoolMaxSize);
+
+ /**
+ * Returns the time to retry connections created by this factory after failure.
+ *
+ * Value is in milliseconds, default is {@link HornetQClient#DEFAULT_RETRY_INTERVAL}.
+ *
+ * @return the time to retry connections created by this factory after failure
+ */
+ long getRetryInterval();
+
+ /**
+ * Sets the time to retry connections created by this factory after failure.
+ *
+ * Value must be greater than 0.
+ *
+ * @param retryInterval time (in milliseconds) to retry connections created by this
factory after failure
+ */
+ void setRetryInterval(long retryInterval);
+
+ /**
+ * Returns the multiplier to apply to successive retry intervals.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_RETRY_INTERVAL_MULTIPLIER}.
+ *
+ * @return the multiplier to apply to successive retry intervals
+ */
+ double getRetryIntervalMultiplier();
+
+ /**
+ * Sets the multiplier to apply to successive retry intervals.
+ *
+ * Value must be positive.
+ *
+ * @param retryIntervalMultiplier multiplier to apply to successive retry intervals
+ */
+ void setRetryIntervalMultiplier(double retryIntervalMultiplier);
+
+ /**
+ * Returns the maximum retry interval (in the case a retry interval multiplier has
been specified).
+ *
+ * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_MAX_RETRY_INTERVAL}.
+ *
+ * @return the maximum retry interval
+ */
+ long getMaxRetryInterval();
+
+ /**
+ * Sets the maximum retry interval.
+ *
+ * Value must be greater than 0.
+ *
+ * @param maxRetryInterval maximum retry interval to apply in the case a retry
interval multiplier has been specified
+ */
+ void setMaxRetryInterval(long maxRetryInterval);
+
+ /**
+ * Returns the maximum number of attempts to retry connection in case of failure.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_RECONNECT_ATTEMPTS}.
+ *
+ * @return the maximum number of attempts to retry connection in case of failure.
+ */
+ int getReconnectAttempts();
+
+ /**
+ * Sets the maximum number of attempts to retry connection in case of failure.
+ *
+ * Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater
than 0.
+ *
+ * @param reconnectAttempts maximum number of attempts to retry connection in case of
failure
+ */
+ void setReconnectAttempts(int reconnectAttempts);
+
+ /**
+ * Returns true if the client will automatically attempt to connect to the backup
server if the initial
+ * connection to the live server fails
+ *
+ * Default value is {@link HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION}.
+ *
+ * @return
+ */
+ boolean isFailoverOnInitialConnection();
+
+ /**
+ * Sets the value for FailoverOnInitialReconnection
+ *
+ * @param failover
+ */
+ void setFailoverOnInitialConnection(boolean failover);
+
+ /**
+ * Returns whether connections created by this factory must failover in case the
server they are
+ * connected to <em>has normally shut down</em>.
+ *
+ * Default value is {@link HornetQClient#DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN}.
+ *
+ * @return <code>true</code> if connections must failover if the server
has normally shut down, else <code>false</code>
+ */
+ boolean isFailoverOnServerShutdown();
+
+ /**
+ * Sets whether connections created by this factory must failover in case the server
they are
+ * connected to <em>has normally shut down</em>
+ *
+ * @param failoverOnServerShutdown <code>true</code> if connections must
failover if the server has normally shut down, <code>false</code> else
+ */
+ void setFailoverOnServerShutdown(boolean failoverOnServerShutdown);
+
+ /**
+ * Returns the class name of the connection load balancing policy.
+ *
+ * Default value is
"org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy".
+ *
+ * @return the class name of the connection load balancing policy
+ */
+ String getConnectionLoadBalancingPolicyClassName();
+
+ /**
+ * Sets the class name of the connection load balancing policy.
+ *
+ * Value must be the name of a class implementing {@link
ConnectionLoadBalancingPolicy}.
+ *
+ * @param loadBalancingPolicyClassName class name of the connection load balancing
policy
+ */
+ void setConnectionLoadBalancingPolicyClassName(String loadBalancingPolicyClassName);
+
+ /**
+ * Returns the initial size of messages created through this factory.
+ *
+ * Value is in bytes, default value is {@link
HornetQClient#DEFAULT_INITIAL_MESSAGE_PACKET_SIZE}.
+ *
+ * @return the initial size of messages created through this factory
+ */
+ int getInitialMessagePacketSize();
+
+ /**
+ * Sets the initial size of messages created through this factory.
+ *
+ * Value must be greater than 0.
+ *
+ * @param size initial size of messages created through this factory.
+ */
+ void setInitialMessagePacketSize(int size);
+
+ /**
+ * Adds an interceptor which will be executed <em>after packets are received
from the server</em>.
+ *
+ * @param interceptor an Interceptor
+ */
+ void addInterceptor(Interceptor interceptor);
+
+ /**
+ * Removes an interceptor.
+ *
+ * @param interceptor interceptor to remove
+ *
+ * @return <code>true</code> if the interceptor is removed from this
factory, <code>false</code> else
+ */
+ boolean removeInterceptor(Interceptor interceptor);
+
+ /**
+ * Closes this factory and release all its resources
+ */
+ void close();
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core.client.loadbalance;
+
+import org.hornetq.utils.Random;
+
+/**
+ * {@link RandomConnectionLoadBalancingPolicy#select(int)} chooses a the initial node
randomly then subsequent requests return the same node
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class RandomStickyConnectionLoadBalancingPolicy implements
ConnectionLoadBalancingPolicy
+{
+ private final Random random = new Random();
+
+ private int pos = -1;
+
+ public int select(final int max)
+ {
+ if (pos == -1)
+ {
+ pos = random.getRandom().nextInt(max);
+ }
+
+ return pos;
+ }
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,1038 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A ServerLocatorImpl
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener,
Serializable
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
+
+ private final boolean useHA;
+
+ private final String discoveryAddress;
+
+ private final int discoveryPort;
+
+ private final TransportConfiguration[] transportConfigs;
+
+ private Set<ClientSessionFactory> factories = new
HashSet<ClientSessionFactory>();
+
+ private Pair<TransportConfiguration, TransportConfiguration>[] topology;
+
+ private Map<TransportConfiguration, TransportConfiguration> pairs = new
HashMap<TransportConfiguration, TransportConfiguration>();
+
+ private boolean receivedTopology;
+
+ private ExecutorService threadPool;
+
+ private ScheduledExecutorService scheduledThreadPool;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+ private boolean readOnly;
+
+ // Settable attributes:
+
+ private boolean cacheLargeMessagesClient;
+
+ private String localBindAddress;
+
+ private long discoveryRefreshTimeout;
+
+ private long discoveryInitialWaitTimeout;
+
+ private long clientFailureCheckPeriod;
+
+ private long connectionTTL;
+
+ private long callTimeout;
+
+ private int minLargeMessageSize;
+
+ private int consumerWindowSize;
+
+ private int consumerMaxRate;
+
+ private int confirmationWindowSize;
+
+ private int producerWindowSize;
+
+ private int producerMaxRate;
+
+ private boolean blockOnAcknowledge;
+
+ private boolean blockOnDurableSend;
+
+ private boolean blockOnNonDurableSend;
+
+ private boolean autoGroup;
+
+ private boolean preAcknowledge;
+
+ private String connectionLoadBalancingPolicyClassName;
+
+ private int ackBatchSize;
+
+ private boolean useGlobalPools;
+
+ private int scheduledThreadPoolMaxSize;
+
+ private int threadPoolMaxSize;
+
+ private long retryInterval;
+
+ private double retryIntervalMultiplier;
+
+ private long maxRetryInterval;
+
+ private int reconnectAttempts;
+
+ private boolean failoverOnInitialConnection;
+
+ private int initialMessagePacketSize;
+
+ private volatile boolean closed;
+
+ private boolean failoverOnServerShutdown;
+
+ private final List<Interceptor> interceptors = new
CopyOnWriteArrayList<Interceptor>();
+
+ private static ExecutorService globalThreadPool;
+
+ private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private String groupID;
+
+ private static synchronized ExecutorService getGlobalThreadPool()
+ {
+ if (globalThreadPool == null)
+ {
+ ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-global-threads", true,
getThisClassLoader());
+
+ globalThreadPool = Executors.newCachedThreadPool(factory);
+ }
+
+ return globalThreadPool;
+ }
+
+ private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+ {
+ if (globalScheduledThreadPool == null)
+ {
+ ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
+ true,
+ getThisClassLoader());
+
+ globalScheduledThreadPool =
Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+ factory);
+ }
+
+ return globalScheduledThreadPool;
+ }
+
+ private void setThreadPools()
+ {
+ if (useGlobalPools)
+ {
+ threadPool = getGlobalThreadPool();
+
+ scheduledThreadPool = getGlobalScheduledThreadPool();
+ }
+ else
+ {
+ ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-factory-threads-" +
System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ if (threadPoolMaxSize == -1)
+ {
+ threadPool = Executors.newCachedThreadPool(factory);
+ }
+ else
+ {
+ threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+ }
+
+ factory = new
HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" +
System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ scheduledThreadPool =
Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+ }
+ }
+
+ private static ClassLoader getThisClassLoader()
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
+ }
+
+ private void instantiateLoadBalancingPolicy()
+ {
+ if (connectionLoadBalancingPolicyClassName == null)
+ {
+ throw new IllegalStateException("Please specify a load balancing policy
class name on the session factory");
+ }
+
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ Class<?> clazz =
loader.loadClass(connectionLoadBalancingPolicyClassName);
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+ return null;
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Unable to instantiate load
balancing policy \"" + connectionLoadBalancingPolicyClassName +
+ "\"",
+ e);
+ }
+ }
+ });
+ }
+
+ private synchronized void initialise() throws Exception
+ {
+ if (!readOnly)
+ {
+ setThreadPools();
+
+ instantiateLoadBalancingPolicy();
+
+ if (discoveryAddress != null)
+ {
+ InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
+
+ InetAddress lbAddress;
+
+ if (localBindAddress != null)
+ {
+ lbAddress = InetAddress.getByName(localBindAddress);
+ }
+ else
+ {
+ lbAddress = null;
+ }
+
+ discoveryGroup = new
DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+ discoveryAddress,
+ lbAddress,
+ groupAddress,
+ discoveryPort,
+ discoveryRefreshTimeout);
+
+ discoveryGroup.registerListener(this);
+
+ discoveryGroup.start();
+ }
+ else
+ {
+ setTopologyFromStaticList();
+ }
+ readOnly = true;
+ }
+ }
+
+ private ServerLocatorImpl(final boolean useHA,
+ final String discoveryAddress,
+ final int discoveryPort,
+ final TransportConfiguration[] transportConfigs)
+ {
+ this.useHA = useHA;
+
+ this.discoveryAddress = discoveryAddress;
+
+ this.discoveryPort = discoveryPort;
+
+ this.transportConfigs = transportConfigs;
+
+ discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
+
+ clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+ connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+ minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+ consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+ confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+ producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+ producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+ blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+ blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+ blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+ autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+ preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+ ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+ connectionLoadBalancingPolicyClassName =
HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+ discoveryInitialWaitTimeout =
HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+
+ useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+ scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+ threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+ retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+ retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+ reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+ failoverOnInitialConnection =
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
+
+ failoverOnServerShutdown = HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public ServerLocatorImpl(final boolean useHA, final String discoveryAddress, final int
discoveryPort)
+ {
+ this(useHA, discoveryAddress, discoveryPort, null);
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ * @param transportConfigs
+ */
+ public ServerLocatorImpl(final boolean useHA, final TransportConfiguration...
transportConfigs)
+ {
+ this(useHA, null, -1, transportConfigs);
+ }
+
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Cannot create session factory, server
locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to
initialise session factory", e);
+ }
+
+ if (topology == null && discoveryGroup != null)
+ {
+ // Wait for an initial broadcast to give us at least one node in the cluster
+
+ boolean ok = discoveryGroup.waitForBroadcast(discoveryInitialWaitTimeout);
+
+ if (!ok)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive initial
broadcast from cluster");
+ }
+ }
+
+ ClientSessionFactory factory = null;
+
+ synchronized (this)
+ {
+ boolean retry;
+ int attempts = 0;
+ do
+ {
+ retry = false;
+
+ int pos = loadBalancingPolicy.select(topology.length);
+
+ Pair<TransportConfiguration, TransportConfiguration> pair =
topology[pos];
+
+ // try each factory in the list until we find one which works
+
+ try
+ {
+ factory = new ClientSessionFactoryImpl(this,
+ pair.a,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ attempts++;
+
+ if (attempts == topology.length)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
+ }
+
+ retry = true;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ while (retry);
+
+ if (useHA)
+ {
+ long toWait = 30000;
+ long start = System.currentTimeMillis();
+ while (!receivedTopology && toWait > 0)
+ {
+ // Now wait for the topology
+
+ try
+ {
+ this.wait(toWait);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive cluster
topology");
+ }
+ }
+
+ factories.add(factory);
+
+ return factory;
+ }
+ }
+
+ public synchronized boolean isCacheLargeMessagesClient()
+ {
+ return cacheLargeMessagesClient;
+ }
+
+ public synchronized void setCacheLargeMessagesClient(final boolean cached)
+ {
+ cacheLargeMessagesClient = cached;
+ }
+
+ public synchronized long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
+
+ public synchronized void setClientFailureCheckPeriod(final long
clientFailureCheckPeriod)
+ {
+ checkWrite();
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ }
+
+ public synchronized long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ public synchronized void setConnectionTTL(final long connectionTTL)
+ {
+ checkWrite();
+ this.connectionTTL = connectionTTL;
+ }
+
+ public synchronized long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ public synchronized void setCallTimeout(final long callTimeout)
+ {
+ checkWrite();
+ this.callTimeout = callTimeout;
+ }
+
+ public synchronized int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+ {
+ checkWrite();
+ this.minLargeMessageSize = minLargeMessageSize;
+ }
+
+ public synchronized int getConsumerWindowSize()
+ {
+ return consumerWindowSize;
+ }
+
+ public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+ {
+ checkWrite();
+ this.consumerWindowSize = consumerWindowSize;
+ }
+
+ public synchronized int getConsumerMaxRate()
+ {
+ return consumerMaxRate;
+ }
+
+ public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+ {
+ checkWrite();
+ this.consumerMaxRate = consumerMaxRate;
+ }
+
+ public synchronized int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
+
+ public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+ {
+ checkWrite();
+ this.confirmationWindowSize = confirmationWindowSize;
+ }
+
+ public synchronized int getProducerWindowSize()
+ {
+ return producerWindowSize;
+ }
+
+ public synchronized void setProducerWindowSize(final int producerWindowSize)
+ {
+ checkWrite();
+ this.producerWindowSize = producerWindowSize;
+ }
+
+ public synchronized int getProducerMaxRate()
+ {
+ return producerMaxRate;
+ }
+
+ public synchronized void setProducerMaxRate(final int producerMaxRate)
+ {
+ checkWrite();
+ this.producerMaxRate = producerMaxRate;
+ }
+
+ public synchronized boolean isBlockOnAcknowledge()
+ {
+ return blockOnAcknowledge;
+ }
+
+ public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+ {
+ checkWrite();
+ this.blockOnAcknowledge = blockOnAcknowledge;
+ }
+
+ public synchronized boolean isBlockOnDurableSend()
+ {
+ return blockOnDurableSend;
+ }
+
+ public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+ {
+ checkWrite();
+ this.blockOnDurableSend = blockOnDurableSend;
+ }
+
+ public synchronized boolean isBlockOnNonDurableSend()
+ {
+ return blockOnNonDurableSend;
+ }
+
+ public synchronized void setBlockOnNonDurableSend(final boolean
blockOnNonDurableSend)
+ {
+ checkWrite();
+ this.blockOnNonDurableSend = blockOnNonDurableSend;
+ }
+
+ public synchronized boolean isAutoGroup()
+ {
+ return autoGroup;
+ }
+
+ public synchronized void setAutoGroup(final boolean autoGroup)
+ {
+ checkWrite();
+ this.autoGroup = autoGroup;
+ }
+
+ public synchronized boolean isPreAcknowledge()
+ {
+ return preAcknowledge;
+ }
+
+ public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+ {
+ checkWrite();
+ this.preAcknowledge = preAcknowledge;
+ }
+
+ public synchronized int getAckBatchSize()
+ {
+ return ackBatchSize;
+ }
+
+ public synchronized void setAckBatchSize(final int ackBatchSize)
+ {
+ checkWrite();
+ this.ackBatchSize = ackBatchSize;
+ }
+
+ public synchronized long getDiscoveryInitialWaitTimeout()
+ {
+ return discoveryInitialWaitTimeout;
+ }
+
+ public synchronized void setDiscoveryInitialWaitTimeout(final long
initialWaitTimeout)
+ {
+ checkWrite();
+ discoveryInitialWaitTimeout = initialWaitTimeout;
+ }
+
+ public synchronized boolean isUseGlobalPools()
+ {
+ return useGlobalPools;
+ }
+
+ public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+ {
+ checkWrite();
+ this.useGlobalPools = useGlobalPools;
+ }
+
+ public synchronized int getScheduledThreadPoolMaxSize()
+ {
+ return scheduledThreadPoolMaxSize;
+ }
+
+ public synchronized void setScheduledThreadPoolMaxSize(final int
scheduledThreadPoolMaxSize)
+ {
+ checkWrite();
+ this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+ }
+
+ public synchronized int getThreadPoolMaxSize()
+ {
+ return threadPoolMaxSize;
+ }
+
+ public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+ {
+ checkWrite();
+ this.threadPoolMaxSize = threadPoolMaxSize;
+ }
+
+ public synchronized long getRetryInterval()
+ {
+ return retryInterval;
+ }
+
+ public synchronized void setRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ this.retryInterval = retryInterval;
+ }
+
+ public synchronized long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
+ public synchronized void setMaxRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ maxRetryInterval = retryInterval;
+ }
+
+ public synchronized double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ public synchronized void setRetryIntervalMultiplier(final double
retryIntervalMultiplier)
+ {
+ checkWrite();
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ }
+
+ public synchronized int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
+ public synchronized void setReconnectAttempts(final int reconnectAttempts)
+ {
+ checkWrite();
+ this.reconnectAttempts = reconnectAttempts;
+ }
+
+ public synchronized boolean isFailoverOnInitialConnection()
+ {
+ return this.failoverOnInitialConnection;
+ }
+
+ public synchronized void setFailoverOnInitialConnection(final boolean failover)
+ {
+ checkWrite();
+ this.failoverOnInitialConnection = failover;
+ }
+
+ public synchronized boolean isFailoverOnServerShutdown()
+ {
+ return failoverOnServerShutdown;
+ }
+
+ public synchronized void setFailoverOnServerShutdown(final boolean
failoverOnServerShutdown)
+ {
+ checkWrite();
+ this.failoverOnServerShutdown = failoverOnServerShutdown;
+ }
+
+ public synchronized String getConnectionLoadBalancingPolicyClassName()
+ {
+ return connectionLoadBalancingPolicyClassName;
+ }
+
+ public synchronized void setConnectionLoadBalancingPolicyClassName(final String
loadBalancingPolicyClassName)
+ {
+ checkWrite();
+ connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+ }
+
+ public synchronized String getLocalBindAddress()
+ {
+ return localBindAddress;
+ }
+
+ public synchronized void setLocalBindAddress(final String localBindAddress)
+ {
+ checkWrite();
+ this.localBindAddress = localBindAddress;
+ }
+
+ public synchronized String getDiscoveryAddress()
+ {
+ return discoveryAddress;
+ }
+
+ public synchronized int getDiscoveryPort()
+ {
+ return discoveryPort;
+ }
+
+ public TransportConfiguration[] getStaticTransportConfigurations()
+ {
+ return transportConfigs;
+ }
+
+ public synchronized long getDiscoveryRefreshTimeout()
+ {
+ return discoveryRefreshTimeout;
+ }
+
+ public void addInterceptor(final Interceptor interceptor)
+ {
+ interceptors.add(interceptor);
+ }
+
+ public boolean removeInterceptor(final Interceptor interceptor)
+ {
+ return interceptors.remove(interceptor);
+ }
+
+ public synchronized void setDiscoveryRefreshTimeout(final long
discoveryRefreshTimeout)
+ {
+ checkWrite();
+ this.discoveryRefreshTimeout = discoveryRefreshTimeout;
+ }
+
+ public synchronized int getInitialMessagePacketSize()
+ {
+ return initialMessagePacketSize;
+ }
+
+ public synchronized void setInitialMessagePacketSize(final int size)
+ {
+ checkWrite();
+ initialMessagePacketSize = size;
+ }
+
+ public void setGroupID(final String groupID)
+ {
+ checkWrite();
+ this.groupID = groupID;
+ }
+
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
+ private void checkWrite()
+ {
+ if (readOnly)
+ {
+ throw new IllegalStateException("Cannot set attribute on SessionFactory
after it has been used");
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ close();
+
+ super.finalize();
+ }
+
+ public void close()
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ if (discoveryGroup != null)
+ {
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
+ }
+
+ for (ClientSessionFactory factory : factories)
+ {
+ factory.close();
+ }
+
+ factories.clear();
+
+ if (!useGlobalPools)
+ {
+ if (threadPool != null)
+ {
+ threadPool.shutdown();
+
+ try
+ {
+ if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ if (scheduledThreadPool != null)
+ {
+ scheduledThreadPool.shutdown();
+
+ try
+ {
+ if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for scheduled pool to
terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+
+ closed = true;
+ }
+
+ public synchronized void onTopologyChanged(List<Pair<TransportConfiguration,
TransportConfiguration>> topology)
+ {
+ if (!useHA)
+ {
+ return;
+ }
+
+ this.topology = topology.toArray(this.topology);
+
+ this.pairs.clear();
+
+ for (Pair<TransportConfiguration, TransportConfiguration> pair : topology)
+ {
+ if (pair.b != null)
+ {
+ pairs.put(pair.a, pair.b);
+ }
+ }
+
+ receivedTopology = true;
+ }
+
+ private void createTopologyArray(final int size)
+ {
+ topology = (Pair<TransportConfiguration,
TransportConfiguration>[])Array.newInstance(Pair.class, size);
+ }
+
+ public synchronized void connectorsChanged()
+ {
+ if (receivedTopology)
+ {
+ return;
+ }
+
+ Map<String, DiscoveryEntry> newConnectors =
discoveryGroup.getDiscoveryEntryMap();
+
+ createTopologyArray(newConnectors.size());
+
+ int i = 0;
+ for (DiscoveryEntry entry : newConnectors.values())
+ {
+ topology[i++] = new Pair<TransportConfiguration,
TransportConfiguration>(entry.getConnector(), null);
+ }
+ }
+
+ public synchronized void factoryClosed(final ClientSessionFactory factory)
+ {
+ this.factories.remove(factory);
+
+ if (this.factories.isEmpty())
+ {
+ // Go back to using the broadcast or static list
+
+ receivedTopology = false;
+
+ if (transportConfigs != null)
+ {
+ setTopologyFromStaticList();
+ }
+ else
+ {
+ topology = null;
+ }
+ }
+ }
+
+ private void setTopologyFromStaticList()
+ {
+ createTopologyArray(transportConfigs.length);
+
+ int i = 0;
+ for (TransportConfiguration config : transportConfigs)
+ {
+ topology[i++] = new Pair<TransportConfiguration,
TransportConfiguration>(config, null);
+ }
+ }
+
+ public synchronized TransportConfiguration getBackup(final TransportConfiguration
live)
+ {
+ return pairs.get(live);
+ }
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+
+/**
+ * A ServerLocatorInternal
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ServerLocatorInternal extends ServerLocator
+{
+ void factoryClosed(final ClientSessionFactory factory);
+
+ TransportConfiguration getBackup( TransportConfiguration live);
+
+ void onTopologyChanged(List<Pair<TransportConfiguration,
TransportConfiguration>> clusterTopology);
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class ClusterTopologyMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ClusterTopologyMessage.class);
+
+ // Attributes ----------------------------------------------------
+
+ private List<Pair<TransportConfiguration, TransportConfiguration>>
topology;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ClusterTopologyMessage(final List<Pair<TransportConfiguration,
TransportConfiguration>> topology)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY);
+
+ this.topology = topology;
+ }
+
+ public ClusterTopologyMessage()
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY);
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public List<Pair<TransportConfiguration, TransportConfiguration>>
getTopology()
+ {
+ return topology;
+ }
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(topology.size());
+ for (Pair<TransportConfiguration, TransportConfiguration> pair: topology)
+ {
+ pair.a.encode(buffer);
+ if (pair.b != null)
+ {
+ buffer.writeBoolean(true);
+ pair.b.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ int size = buffer.readInt();
+ topology = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
+ for (int i = 0; i < size; i++)
+ {
+ TransportConfiguration a = new TransportConfiguration();
+ a.decode(buffer);
+ boolean hasBackup = buffer.readBoolean();
+ TransportConfiguration b;
+ if (hasBackup)
+ {
+ b = new TransportConfiguration();
+ b.decode(buffer);
+ }
+ else
+ {
+ b = null;
+ }
+ Pair<TransportConfiguration, TransportConfiguration> pair = new
Pair<TransportConfiguration, TransportConfiguration>(a, b);
+ topology.add(pair);
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster;
+
+import java.util.List;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * A ClusterTopologyListener
+ *
+ * @author tim
+ *
+ *
+ */
+public interface ClusterTopologyListener
+{
+ void onTopologyChanged(List<Pair<TransportConfiguration,
TransportConfiguration>> clusterTopology);
+
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/LockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/LockFile.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/LockFile.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster;
+
+import java.io.IOException;
+
+/**
+ * A FailoverLockFile
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface LockFile
+{
+ void lock() throws IOException;
+
+ boolean unlock() throws IOException;
+
+ String getFileName();
+
+ String getDirectory();
+
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hornetq.core.server.cluster.LockFile;
+
+/**
+ * A FakeLockFile
+ *
+ * A VM-wide exclusive lock on a file.
+ *
+ * Advisory only.
+ *
+ * Used for testing.
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class FakeLockFile implements LockFile
+{
+ private final String fileName;
+
+ private final String directory;
+
+ private static Map<String, Lock> locks = new WeakHashMap<String, Lock>();
+
+ private Lock lock;
+
+ /**
+ * @param fileName
+ * @param directory
+ */
+ public FakeLockFile(final String fileName, final String directory)
+ {
+ this.fileName = fileName;
+
+ this.directory = directory;
+
+ synchronized (locks)
+ {
+ String key = directory + fileName;
+
+ lock = locks.get(key);
+
+ if (lock == null)
+ {
+ lock = new ReentrantLock(true);
+
+ locks.put(key, lock);
+ }
+ }
+ }
+
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ public String getDirectory()
+ {
+ return directory;
+ }
+
+ public void lock() throws IOException
+ {
+ lock.lock();
+ }
+
+ public boolean unlock() throws IOException
+ {
+ lock.unlock();
+
+ return true;
+ }
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java 2010-06-11
15:36:20 UTC (rev 9308)
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.LockFile;
+
+/**
+ * A FailoverLockFileImpl
+ *
+ * The lock is per VM!
+ *
+ * Won't work well with NFS or GFS
+ *
+ * @author Tim Fox
+ *
+ */
+public class LockFileImpl implements LockFile
+{
+ private static final Logger log = Logger.getLogger(LockFileImpl.class);
+
+ private final String fileName;
+
+ private final String directory;
+
+ private RandomAccessFile raFile;
+
+ private FileLock lock;
+
+ /*
+ * This method is "mainly" for testing (apologies for pun)
+ */
+ public static final void main(String[] args)
+ {
+ LockFileImpl lock = new LockFileImpl(args[0], args[1]);
+
+ long time = Long.parseLong(args[2]);
+
+ try
+ {
+ lock.lock();
+ }
+ catch (IOException e)
+ {
+ log.error("Failed to get lock", e);
+ }
+
+ log.info("Sleeping for " + time + " ms");
+
+ try
+ {
+ Thread.sleep(time);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ try
+ {
+ lock.unlock();
+ }
+ catch (IOException e)
+ {
+ log.error("Failed to unlock", e);
+ }
+ }
+
+ /**
+ * @param fileName
+ * @param directory
+ */
+ public LockFileImpl(final String fileName, final String directory)
+ {
+ this.fileName = fileName;
+
+ this.directory = directory;
+ }
+
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ public String getDirectory()
+ {
+ return directory;
+ }
+
+ private final Object lockLock = new Object();
+
+ private final Object unlockLock = new Object();
+
+ public void lock() throws IOException
+ {
+ synchronized (lockLock)
+ {
+ File file = new File(directory, fileName);
+
+ log.info("Trying to create " + file.getCanonicalPath());
+
+ if (!file.exists())
+ {
+ file.createNewFile();
+ }
+
+ raFile = new RandomAccessFile(file, "rw");
+
+ FileChannel channel = raFile.getChannel();
+
+ // Try and obtain exclusive lock
+ log.info("Trying to obtain exclusive lock on " + fileName);
+
+ lock = channel.lock();
+
+ log.info("obtained lock");
+ }
+ }
+
+ public boolean unlock() throws IOException
+ {
+ synchronized (unlockLock)
+ {
+ if (lock == null)
+ {
+ return false;
+ }
+
+ lock.release();
+
+ lock = null;
+
+ raFile.close();
+
+ raFile = null;
+
+ log.info("Released lock on " + fileName);
+
+ return true;
+ }
+ }
+
+}