Author: timfox
Date: 2010-06-30 13:22:16 -0400 (Wed, 30 Jun 2010)
New Revision: 9376
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
Removed:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/DiscoveryGroupControl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl_Old.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/FailoverManagerImpl_Old.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java
Modified:
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/BridgeControl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/ClusterConnectionControl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/HornetQJMSClient.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryGroup.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BridgeConfiguration.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/management/ManagementService.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java
Log:
more on ha improvements
Modified:
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd
===================================================================
---
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd 2010-06-30
17:22:16 UTC (rev 9376)
@@ -347,7 +347,8 @@
<xsd:element maxOccurs="1" minOccurs="0"
name="confirmation-window-size" type="xsd:int">
</xsd:element>
<xsd:choice>
- <xsd:element maxOccurs="unbounded" minOccurs="1"
name="connector-ref" type="connector-refType">
+ <xsd:element maxOccurs="1" minOccurs="1"
name="static-connectors" type="connector-refType">
+
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="1"
name="discovery-group-ref">
<xsd:complexType>
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -14,6 +14,7 @@
package org.hornetq.api.core.client;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
/**
@@ -131,634 +132,10 @@
boolean preAcknowledge,
int ackBatchSize) throws HornetQException;
+ void close();
-// /**
-// * Returns the period used to check if a client has failed to receive pings from the
server.
-// *
-// * Period is in milliseconds, default value is {@link
HornetQClient#DEFAULT_CLIENT_FAILURE_CHECK_PERIOD}.
-// *
-// * @return the period used to check if a client has failed to receive pings from the
server
-// */
-// long getClientFailureCheckPeriod();
-//
-// /**
-// * Sets the period (in milliseconds) used to check if a client has failed to receive
pings from the server.
-// *
-// * Value must be -1 (to disable) or greater than 0.
-// *
-// * @param clientFailureCheckPeriod the period to check failure
-// */
-// void setClientFailureCheckPeriod(long clientFailureCheckPeriod);
-//
-// /**
-// * When <code>true</code>, consumers created through this factory will
create temporary files to cache large messages.
-// *
-// * There is 1 temporary file created for each large message.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_CACHE_LARGE_MESSAGE_CLIENT}.
-// *
-// * @return <code>true</code> if consumers created through this factory
will cache large messages in temporary files, <code>false</code> else
-// */
-// boolean isCacheLargeMessagesClient();
-//
-// /**
-// * Sets whether large messages received by consumers created through this factory
will be cached in temporary files or not.
-// *
-// * @param cached <code>true</code> to cache large messages in temporary
files, <code>false</code> else
-// */
-// void setCacheLargeMessagesClient(boolean cached);
-//
-// /**
-// * Returns the connection <em>time-to-live</em>.
-// * This TTL determines how long the server will keep a connection alive in the
absence of any data arriving from the client.
-// *
-// * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_CONNECTION_TTL}.
-// *
-// * @return the connection time-to-live in milliseconds
-// */
-// long getConnectionTTL();
-//
-// /**
-// * Sets this factory's connections <em>time-to-live</em>.
-// *
-// * Value must be -1 (to disable) or greater or equals to 0.
-// *
-// * @param connectionTTL period in milliseconds
-// */
-// void setConnectionTTL(long connectionTTL);
-//
-// /**
-// * Returns the blocking calls timeout.
-// *
-// * If client's blocking calls to the server take more than this timeout, the
call will throw a {@link HornetQException} with the code {@link
HornetQException#CONNECTION_TIMEDOUT}.
-// * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_CALL_TIMEOUT}.
-// *
-// * @return the blocking calls timeout
-// */
-// long getCallTimeout();
-//
-// /**
-// * Sets the blocking call timeout.
-// *
-// * Value must be greater or equals to 0
-// *
-// * @param callTimeout blocking call timeout in milliseconds
-// */
-// void setCallTimeout(long callTimeout);
-//
-// /**
-// * Returns the large message size threshold.
-// *
-// * Messages whose size is if greater than this value will be handled as
<em>large messages</em>.
-// *
-// * Value is in bytes, default value is {@link
HornetQClient#DEFAULT_MIN_LARGE_MESSAGE_SIZE}.
-// *
-// * @return the message size threshold to treat messages as large messages.
-// */
-// int getMinLargeMessageSize();
-//
-// /**
-// * Sets the large message size threshold.
-// *
-// * Value must be greater than 0.
-// *
-// * @param minLargeMessageSize large message size threshold in bytes
-// */
-// void setMinLargeMessageSize(int minLargeMessageSize);
-//
-// /**
-// * Returns the window size for flow control of the consumers created through this
factory.
-// *
-// * Value is in bytes, default value is {@link
HornetQClient#DEFAULT_CONSUMER_WINDOW_SIZE}.
-// *
-// * @return the window size used for consumer flow control
-// */
-// int getConsumerWindowSize();
-//
-// /**
-// * Sets the window size for flow control of the consumers created through this
factory.
-// *
-// * Value must be -1 (to disable flow control), 0 (to not buffer any messages) or
greater than 0 (to set the maximum size of the buffer)
-// *
-// * @param consumerWindowSize window size (in bytes) used for consumer flow control
-// */
-// void setConsumerWindowSize(int consumerWindowSize);
-//
-// /**
-// * Returns the maximum rate of message consumption for consumers created through
this factory.
-// *
-// * This value controls the rate at which a consumer can consume messages. A consumer
will never consume messages at a rate faster than the rate specified.
-// *
-// * Value is -1 (to disable) or a positive integer corresponding to the maximum
desired message consumption rate specified in units of messages per second.
-// * Default value is {@link HornetQClient#DEFAULT_CONSUMER_MAX_RATE}.
-// *
-// * @return the consumer max rate
-// */
-// int getConsumerMaxRate();
-//
-// /**
-// * Sets the maximum rate of message consumption for consumers created through this
factory.
-// *
-// * Value must -1 (to disable) or a positive integer corresponding to the maximum
desired message consumption rate specified in units of messages per second.
-// *
-// * @param consumerMaxRate maximum rate of message consumption (in messages per
seconds)
-// */
-// void setConsumerMaxRate(int consumerMaxRate);
-//
-// /**
-// * Returns the size for the confirmation window of clients using this factory.
-// *
-// * Value is in bytes or -1 (to disable the window). Default value is {@link
HornetQClient#DEFAULT_CONFIRMATION_WINDOW_SIZE}.
-// *
-// * @return the size for the confirmation window of clients using this factory
-// */
-// int getConfirmationWindowSize();
-//
-// /**
-// * Sets the size for the confirmation window buffer of clients using this factory.
-// *
-// * Value must be -1 (to disable the window) or greater than 0.
-//
-// * @param confirmationWindowSize size of the confirmation window (in bytes)
-// */
-// void setConfirmationWindowSize(int confirmationWindowSize);
-//
-// /**
-// * Returns the window size for flow control of the producers created through this
factory.
-// *
-// * Value must be -1 (to disable flow control) or greater than 0 to determine the
maximum amount of bytes at any give time (to prevent overloading the connection).
-// * Default value is {@link HornetQClient#DEFAULT_PRODUCER_WINDOW_SIZE}.
-// *
-// * @return the window size for flow control of the producers created through this
factory.
-// */
-// int getProducerWindowSize();
-//
-// /**
-// * Returns the window size for flow control of the producers created through this
factory.
-// *
-// * Value must be -1 (to disable flow control) or greater than 0.
-// *
-// * @param producerWindowSize window size (in bytest) for flow control of the
producers created through this factory.
-// */
-// void setProducerWindowSize(int producerWindowSize);
-//
-// /**
-// * Returns the maximum rate of message production for producers created through this
factory.
-// *
-// * This value controls the rate at which a producer can produce messages. A producer
will never produce messages at a rate faster than the rate specified.
-// *
-// * Value is -1 (to disable) or a positive integer corresponding to the maximum
desired message production rate specified in units of messages per second.
-// * Default value is {@link HornetQClient#DEFAULT_PRODUCER_MAX_RATE}.
-// *
-// * @return maximum rate of message production (in messages per seconds)
-// */
-// int getProducerMaxRate();
-//
-// /**
-// * Sets the maximum rate of message production for producers created through this
factory.
-// *
-// * Value must -1 (to disable) or a positive integer corresponding to the maximum
desired message production rate specified in units of messages per second.
-// *
-// * @param producerMaxRate maximum rate of message production (in messages per
seconds)
-// */
-// void setProducerMaxRate(int producerMaxRate);
-//
-// /**
-// * Returns whether consumers created through this factory will block while sending
message acknowledgements or do it asynchronously.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_ACKNOWLEDGE}.
-// *
-// * @return whether consumers will block while sending message acknowledgements or do
it asynchronously
-// */
-// boolean isBlockOnAcknowledge();
-//
-// /**
-// * Sets whether consumers created through this factory will block while sending
message acknowledgements or do it asynchronously.
-// *
-// * @param blockOnAcknowledge <code>true</code> to block when sending
message acknowledgements or <code>false</code> to send them asynchronously
-// */
-// void setBlockOnAcknowledge(boolean blockOnAcknowledge);
-//
-// /**
-// * Returns whether producers created through this factory will block while sending
<em>durable</em> messages or do it asynchronously.
-// * <br>
-// * If the session is configured to send durable message asynchronously, the client
can set a SendAcknowledgementHandler on the ClientSession
-// * to be notified once the message has been handled by the server.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_DURABLE_SEND}.
-// *
-// * @return whether producers will block while sending persistent messages or do it
asynchronously
-// */
-// boolean isBlockOnDurableSend();
-//
-// /**
-// * Sets whether producers created through this factory will block while sending
<em>durable</em> messages or do it asynchronously.
-// *
-// * @param blockOnDurableSend <code>true</code> to block when sending
durable messages or <code>false</code> to send them asynchronously
-// */
-// void setBlockOnDurableSend(boolean blockOnDurableSend);
-//
-// /**
-// * Returns whether producers created through this factory will block while sending
<em>non-durable</em> messages or do it asynchronously.
-// * <br>
-// * If the session is configured to send non-durable message asynchronously, the
client can set a SendAcknowledgementHandler on the ClientSession
-// * to be notified once the message has been handled by the server.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_NON_DURABLE_SEND}.
-// *
-// * @return whether producers will block while sending non-durable messages or do it
asynchronously
-// */
-// boolean isBlockOnNonDurableSend();
-//
-// /**
-// * Sets whether producers created through this factory will block while sending
<em>non-durable</em> messages or do it asynchronously.
-// *
-// * @param blockOnNonDurableSend <code>true</code> to block when sending
non-durable messages or <code>false</code> to send them asynchronously
-// */
-// void setBlockOnNonDurableSend(boolean blockOnNonDurableSend);
-//
-// /**
-// * Returns whether producers created through this factory will automatically
-// * assign a group ID to the messages they sent.
-// *
-// * if <code>true</code>, a random unique group ID is created and set on
each message for the property
-// * {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
-// * Default value is {@link HornetQClient#DEFAULT_AUTO_GROUP}.
-// *
-// * @return whether producers will automatically assign a group ID to their messages
-// */
-// boolean isAutoGroup();
-//
-// /**
-// * Sets whether producers created through this factory will automatically
-// * assign a group ID to the messages they sent.
-// *
-// * @param autoGroup <code>true</code> to automatically assign a group ID
to each messages sent through this factory, <code>false</code> else
-// */
-// void setAutoGroup(boolean autoGroup);
-//
-// /**
-// * Returns the group ID that will be eventually set on each message for the property
{@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
-// *
-// * Default value is is <code>null</code> and no group ID will be set on
the messages.
-// *
-// * @return the group ID that will be eventually set on each message
-// */
-// String getGroupID();
-//
-// /**
-// * Sets the group ID that will be set on each message sent through this factory.
-// *
-// * @param groupID the group ID to use
-// */
-// void setGroupID(String groupID);
-//
-// /**
-// * Returns whether messages will pre-acknowledged on the server before they are sent
to the consumers or not.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_PRE_ACKNOWLEDGE}
-// */
-// boolean isPreAcknowledge();
-//
-// /**
-// * Sets to <code>true</code> to pre-acknowledge consumed messages on the
server before they are sent to consumers, else set to <code>false</code> to
let
-// * clients acknowledge the message they consume.
-// *
-// * @param preAcknowledge <code>true</code> to enable
pre-acknowledgement, <code>false</code> else
-// */
-// void setPreAcknowledge(boolean preAcknowledge);
-//
-// /**
-// * Returns the acknowledgements batch size.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_ACK_BATCH_SIZE}.
-// *
-// * @return the acknowledgements batch size
-// */
-// int getAckBatchSize();
-//
-// /**
-// * Sets the acknowledgements batch size.
-// *
-// * Value must be equal or greater than 0.
-// *
-// * @param ackBatchSize acknowledgements batch size
-// */
-// void setAckBatchSize(int ackBatchSize);
-//
-// /**
-// * Returns the local bind address to which the multicast socket is bound for
discovery.
-// *
-// * This is null if the multicast socket is not bound, or no discovery is being used
-// *
-// * @return the local bind address to which the multicast socket is bound for
discovery
-// */
-// String getLocalBindAddress();
-//
-// /**
-// * Sets the local bind address to which the multicast socket is bound for
discovery.
-// *
-// * @param the local bind address
-// */
-// void setLocalBindAddress(String localBindAddress);
-//
-// /**
-// * Returns the address to listen to discover which connectors this factory can use.
-// * The discovery address must be set to enable this factory to discover HornetQ
servers.
-// *
-// * @return the address to listen to discover which connectors this factory can use
-// */
-// String getDiscoveryAddress();
-//
-// /**
-// * Sets the address to listen to discover which connectors this factory can use.
-// *
-// * @param discoveryAddress address to listen to discover which connectors this
factory can use
-// */
-// void setDiscoveryAddress(String discoveryAddress);
-//
-// /**
-// * Returns the port to listen to discover which connectors this factory can use.
-// * The discovery port must be set to enable this factory to discover HornetQ
servers.
-// *
-// * @return the port to listen to discover which connectors this factory can use
-// */
-// int getDiscoveryPort();
-//
-//
-// /**
-// * Sets the port to listen to discover which connectors this factory can use.
-// *
-// * @param discoveryPort port to listen to discover which connectors this factory can
use
-// */
-// void setDiscoveryPort(int discoveryPort);
-//
-// /**
-// * Returns the refresh timeout for discovered HornetQ servers.
-// *
-// * If this factory uses discovery to find HornetQ servers, the list of discovered
servers
-// * will be refreshed according to this timeout.
-// *
-// * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_DISCOVERY_REFRESH_TIMEOUT}.
-// *
-// * @return the refresh timeout for discovered HornetQ servers
-// */
-// long getDiscoveryRefreshTimeout();
-//
-// /**
-// * Sets the refresh timeout for discovered HornetQ servers.
-// *
-// * Value must be greater than 0.
-// *
-// * @param discoveryRefreshTimeout refresh timeout (in milliseconds) for discovered
HornetQ servers
-// */
-// void setDiscoveryRefreshTimeout(long discoveryRefreshTimeout);
-//
-// /**
-// * Returns the initial wait timeout if this factory is configured to use discovery.
-// *
-// * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT}.
-// *
-// * @return the initial wait timeout if this factory is configured to use discovery
-// */
-// long getDiscoveryInitialWaitTimeout();
-//
-// /**
-// * Sets the initial wait timeout if this factory is configured to use discovery.
-// *
-// * Value is in milliseconds and must be greater than 0.
-// *
-// * @param initialWaitTimeout initial wait timeout when using discovery
-// */
-// void setDiscoveryInitialWaitTimeout(long initialWaitTimeout);
-//
-// /**
-// * Returns whether this factory will use global thread pools (shared among all the
factories in the same JVM)
-// * or its own pools.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_USE_GLOBAL_POOLS}.
-// *
-// * @return <code>true</code> if this factory uses global thread pools,
<code>false</code> else
-// */
-// boolean isUseGlobalPools();
-//
-// /**
-// * Sets whether this factory will use global thread pools (shared among all the
factories in the same JVM)
-// * or its own pools.
-// *
-// * @param useGlobalPools <code>true</code> to let this factory uses
global thread pools, <code>false</code> else
-// */
-// void setUseGlobalPools(boolean useGlobalPools);
-//
-// /**
-// * Returns the maximum size of the scheduled thread pool.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE}.
-// *
-// * @return the maximum size of the scheduled thread pool.
-// */
-// int getScheduledThreadPoolMaxSize();
-//
-// /**
-// * Sets the maximum size of the scheduled thread pool.
-// *
-// * This setting is relevant only if this factory does not use global pools.
-// * Value must be greater than 0.
-// *
-// * @param scheduledThreadPoolMaxSize maximum size of the scheduled thread pool.
-// */
-// void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize);
-//
-// /**
-// * Returns the maximum size of the thread pool.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_THREAD_POOL_MAX_SIZE}.
-// *
-// * @return the maximum size of the thread pool.
-// */
-// int getThreadPoolMaxSize();
-//
-// /**
-// * Sets the maximum size of the thread pool.
-// *
-// * This setting is relevant only if this factory does not use global pools.
-// * Value must be -1 (for unlimited thread pool) or greater than 0.
-// *
-// * @param threadPoolMaxSize maximum size of the thread pool.
-// */
-// void setThreadPoolMaxSize(int threadPoolMaxSize);
-//
-// /**
-// * Returns the time to retry connections created by this factory after failure.
-// *
-// * Value is in milliseconds, default is {@link
HornetQClient#DEFAULT_RETRY_INTERVAL}.
-// *
-// * @return the time to retry connections created by this factory after failure
-// */
-// long getRetryInterval();
-//
-// /**
-// * Sets the time to retry connections created by this factory after failure.
-// *
-// * Value must be greater than 0.
-// *
-// * @param retryInterval time (in milliseconds) to retry connections created by this
factory after failure
-// */
-// void setRetryInterval(long retryInterval);
-//
-// /**
-// * Returns the multiplier to apply to successive retry intervals.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_RETRY_INTERVAL_MULTIPLIER}.
-// *
-// * @return the multiplier to apply to successive retry intervals
-// */
-// double getRetryIntervalMultiplier();
-//
-// /**
-// * Sets the multiplier to apply to successive retry intervals.
-// *
-// * Value must be positive.
-// *
-// * @param retryIntervalMultiplier multiplier to apply to successive retry intervals
-// */
-// void setRetryIntervalMultiplier(double retryIntervalMultiplier);
-//
-// /**
-// * Returns the maximum retry interval (in the case a retry interval multiplier has
been specified).
-// *
-// * Value is in milliseconds, default value is {@link
HornetQClient#DEFAULT_MAX_RETRY_INTERVAL}.
-// *
-// * @return the maximum retry interval
-// */
-// long getMaxRetryInterval();
-//
-// /**
-// * Sets the maximum retry interval.
-// *
-// * Value must be greater than 0.
-// *
-// * @param maxRetryInterval maximum retry interval to apply in the case a retry
interval multiplier has been specified
-// */
-// void setMaxRetryInterval(long maxRetryInterval);
-//
-// /**
-// * Returns the maximum number of attempts to retry connection in case of failure.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_RECONNECT_ATTEMPTS}.
-// *
-// * @return the maximum number of attempts to retry connection in case of failure.
-// */
-// int getReconnectAttempts();
-//
-// /**
-// * Sets the maximum number of attempts to retry connection in case of failure.
-// *
-// * Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater
than 0.
-// *
-// * @param reconnectAttempts maximum number of attempts to retry connection in case
of failure
-// */
-// void setReconnectAttempts(int reconnectAttempts);
-//
-// /**
-// * Returns true if the client will automatically attempt to connect to the backup
server if the initial
-// * connection to the live server fails
-// *
-// * Default value is {@link HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION}.
-// *
-// * @return
-// */
-// boolean isFailoverOnInitialConnection();
-//
-// /**
-// * Sets the value for FailoverOnInitialReconnection
-// *
-// * @param failover
-// */
-// void setFailoverOnInitialConnection(boolean failover);
-//
-// /**
-// * Returns whether connections created by this factory must failover in case the
server they are
-// * connected to <em>has normally shut down</em>.
-// *
-// * Default value is {@link HornetQClient#DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN}.
-// *
-// * @return <code>true</code> if connections must failover if the server
has normally shut down, else <code>false</code>
-// */
-// boolean isFailoverOnServerShutdown();
-//
-// /**
-// * Sets whether connections created by this factory must failover in case the server
they are
-// * connected to <em>has normally shut down</em>
-// *
-// * @param failoverOnServerShutdown <code>true</code> if connections must
failover if the server has normally shut down, <code>false</code> else
-// */
-// void setFailoverOnServerShutdown(boolean failoverOnServerShutdown);
-//
-// /**
-// * Returns the class name of the connection load balancing policy.
-// *
-// * Default value is
"org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy".
-// *
-// * @return the class name of the connection load balancing policy
-// */
-// String getConnectionLoadBalancingPolicyClassName();
-//
-// /**
-// * Sets the class name of the connection load balancing policy.
-// *
-// * Value must be the name of a class implementing {@link
ConnectionLoadBalancingPolicy}.
-// *
-// * @param loadBalancingPolicyClassName class name of the connection load balancing
policy
-// */
-// void setConnectionLoadBalancingPolicyClassName(String
loadBalancingPolicyClassName);
-//
-// /**
-// * Returns the initial size of messages created through this factory.
-// *
-// * Value is in bytes, default value is {@link
HornetQClient#DEFAULT_INITIAL_MESSAGE_PACKET_SIZE}.
-// *
-// * @return the initial size of messages created through this factory
-// */
-// int getInitialMessagePacketSize();
-//
-// /**
-// * Sets the initial size of messages created through this factory.
-// *
-// * Value must be greater than 0.
-// *
-// * @param size initial size of messages created through this factory.
-// */
-// void setInitialMessagePacketSize(int size);
-//
-// /**
-// * Adds an interceptor which will be executed <em>after packets are received
from the server</em>.
-// *
-// * @param interceptor an Interceptor
-// */
-// void addInterceptor(Interceptor interceptor);
-//
-// /**
-// * Removes an interceptor.
-// *
-// * @param interceptor interceptor to remove
-// *
-// * @return <code>true</code> if the interceptor is removed from this
factory, <code>false</code> else
-// */
-// boolean removeInterceptor(Interceptor interceptor);
-//
-// /**
-// * Closes this factory and release all its resources
-// */
-// void close();
-
+ ServerLocator getServerLocator();
-// List<Pair<TransportConfiguration, TransportConfiguration>>
getClusterTopology();
-//
-// void registerTopologyListener(ClusterTopologyListener listener);
-//
-// void unregisterTopologyListener(ClusterTopologyListener listener);
-
- void close();
+ CoreRemotingConnection getConnection();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -13,11 +13,10 @@
package org.hornetq.api.core.client;
-import java.util.List;
-
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
+
/**
* A ClusterTopologyListener
*
@@ -27,6 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void onTopologyChanged(List<Pair<TransportConfiguration,
TransportConfiguration>> clusterTopology);
-
+ void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration>
connectorPair, boolean last);
+
+ void nodeDown(String nodeID);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -27,9 +27,23 @@
*/
public interface ServerLocator
{
+ /**
+ * Create a ClientSessionFactory using whatever load balancing policy is in force
+ * @return The ClientSessionFactory
+ * @throws Exception
+ */
ClientSessionFactory createSessionFactory() throws Exception;
/**
+ * Create a ClientSessionFactory to a specific server. The server must already be
known about by this ServerLocator.
+ * This method allows the user to make a connection to a specific server bypassing any
load balancing policy in force
+ * @param transportConfiguration
+ * @return The ClientSesionFactory
+ * @throws Exception if a failure happened in creating the ClientSessionFactory or the
ServerLocator does not know about the passed in transportConfiguration
+ */
+ ClientSessionFactory createSessionFactory(final TransportConfiguration
transportConfiguration) throws Exception;
+
+ /**
* Returns the period used to check if a client has failed to receive pings from the
server.
*
* Period is in milliseconds, default value is {@link
HornetQClient#DEFAULT_CLIENT_FAILURE_CHECK_PERIOD}.
@@ -641,4 +655,10 @@
* Closes this factory and release all its resources
*/
void close();
+
+ void registerTopologyListener(ClusterTopologyListener listener);
+
+ void unregisterTopologyListener(ClusterTopologyListener listener);
+
+ boolean isHA();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/BridgeControl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/BridgeControl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/BridgeControl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -48,9 +48,9 @@
String getTransformerClassName();
/**
- * Returns the pair of connectors used by this bridge.
+ * Returns any list of static connectors used by this bridge
*/
- String[] getConnectorPair() throws Exception;
+ String[] getStaticConnectors() throws Exception;
/**
* Returns the name of the discovery group used by this bridge.
@@ -82,4 +82,9 @@
* Returns whether this bridge is using duplicate detection.
*/
boolean isUseDuplicateDetection();
+
+ /**
+ * Returns whether this bridge is using high availability
+ */
+ boolean isHA();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/ClusterConnectionControl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/ClusterConnectionControl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/ClusterConnectionControl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -54,15 +54,14 @@
int getMaxHops();
/**
- * Returns the pairs of live-backup connectors used by this cluster connection.
+ * Returns the list of static connectors
*/
- Object[] getStaticConnectorNamePairs();
+ Object[] getStaticConnectors();
/**
- * Returns the pairs of live-backup connectors used by this cluster connection
- * using JSON serialization.
+ * Returns the list of static connectors as JSON
*/
- String getStaticConnectorNamePairsAsJSON() throws Exception;
+ String getStaticConnectorsAsJSON() throws Exception;
/**
* Returns the name of the discovery group used by this cluster connection.
Deleted:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/DiscoveryGroupControl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/DiscoveryGroupControl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/management/DiscoveryGroupControl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -1,43 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.api.core.management;
-
-/**
- * A DiscoveryGroupControl is used to manage a discovery group.
- *
- * @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
- */
-public interface DiscoveryGroupControl extends HornetQComponentControl
-{
- /**
- * Returns the configuration name of this discovery group.
- */
- String getName();
-
- /**
- * Returns the address that this discovery group is listening to.
- */
- String getGroupAddress();
-
- /**
- * Returns the port that this discovery group is listening to.
- */
- int getGroupPort();
-
- /**
- * Returns the refresh timeout used by this discovery group.
- */
- long getRefreshTimeout();
-}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/HornetQJMSClient.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -12,14 +12,10 @@
*/
package org.hornetq.api.jms;
-import java.util.List;
-
import javax.jms.Queue;
import javax.jms.Topic;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
@@ -34,74 +30,61 @@
private static final Logger log = Logger.getLogger(HornetQJMSClient.class);
/**
- * Creates a HornetQConnectionFactory using all the defaults.
- *
- * @return The HornetQConnectionFactory.
+ * Create a HornetQConnectionFactory which will receive cluster topology updates from
the cluster as servers leave or join and new backups are appointed or removed.
+ * The discoveryAddress and discoveryPort parameters in this method are used to listen
for UDP broadcasts which contain connection information for members of the cluster.
+ * The broadcasted connection information is simply used to make an initial connection
to the cluster, once that connection is made, up to date
+ * cluster topology information is downloaded and automatically updated whenever the
cluster topology changes. If the topology includes backup servers
+ * that information is also propagated to the client so that it can know which server
to failover onto in case of live server failure.
+ * @param discoveryAddress The UDP group address to listen for updates
+ * @param discoveryPort the UDP port to listen for updates
+ * @return the HornetQConnectionFactory
*/
- public static HornetQConnectionFactory createConnectionFactory()
+ public static HornetQConnectionFactory createConnectionFactoryWithHA(final String
discoveryAddress, final int discoveryPort)
{
- return new HornetQConnectionFactory();
+ return new HornetQConnectionFactory(true, discoveryAddress, discoveryPort);
}
/**
- * Creates a HornetQConnectionFactory using the ClientSessionFactory for its
underlying connection.
- *
- * @param sessionFactory The underlying ClientSessionFactory to use.
- * @return The HornetQConnectionFactory.
+ * Create a HornetQConnectionFactory which creates session factories from a set of
live servers, no HA backup information is propagated to the client
+ *
+ * The UDP address and port are used to listen for live servers in the cluster
+ *
+ * @param discoveryAddress The UDP group address to listen for updates
+ * @param discoveryPort the UDP port to listen for updates
+ * @return the HornetQConnectionFactory
*/
- public static HornetQConnectionFactory createConnectionFactory(final
ClientSessionFactory sessionFactory)
+ public static HornetQConnectionFactory createConnectionFactoryWithoutHA(final String
discoveryAddress, final int discoveryPort)
{
- return new HornetQConnectionFactory(sessionFactory);
+ return new HornetQConnectionFactory(false, discoveryAddress, discoveryPort);
}
-
+
/**
- * Creates a HornetQConnectionFactory that will use discovery to connect to the
server.
- *
- * @param discoveryAddress The address to use for discovery.
- * @param discoveryPort The port to use for discovery.
- * @return The HornetQConnectionFactory.
+ * Create a HornetQConnectionFactory which will receive cluster topology updates from
the cluster as servers leave or join and new backups are appointed or removed.
+ * The initial list of servers supplied in this method is simply to make an initial
connection to the cluster, once that connection is made, up to date
+ * cluster topology information is downloaded and automatically updated whenever the
cluster topology changes. If the topology includes backup servers
+ * that information is also propagated to the client so that it can know which server
to failover onto in case of live server failure.
+ * @param initialServers The initial set of servers used to make a connection to the
cluster. Each one is tried in turn until a successful connection is made. Once
+ * a connection is made, the cluster topology is downloaded and the rest of the list
is ignored.
+ * @return the HornetQConnectionFactory
*/
- public static HornetQConnectionFactory createConnectionFactory(final String
discoveryAddress, final int discoveryPort)
+ public static HornetQConnectionFactory createConnectionFactoryWithHA(final
TransportConfiguration... initialServers)
{
- return new HornetQConnectionFactory(discoveryAddress, discoveryPort);
+ return new HornetQConnectionFactory(true, initialServers);
}
/**
- * Creates a HornetQConnectionFactory using a List of TransportConfigurations and
backups.
- *
- * @param staticConnectors The list of TransportConfiguration to use.
- * @return The HornetQConnectionFactory.
+ * Create a HornetQConnectionFactory which creates session factories using a static
list of transportConfigurations, the HornetQConnectionFactory is not updated
automatically
+ * as the cluster topology changes, and no HA backup information is propagated to the
client
+ *
+ * @param transportConfigurations
+ * @return the HornetQConnectionFactory
*/
- public static HornetQConnectionFactory createConnectionFactory(final
List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
+ public static HornetQConnectionFactory createConnectionFactoryWithoutHA(final
TransportConfiguration... initialServers)
{
- return new HornetQConnectionFactory(staticConnectors);
+ return new HornetQConnectionFactory(false, initialServers);
}
-
+
/**
- * Creates a HornetQConnectionFactory using a single pair of live-backup
TransportConfiguration.
- *
- * @param connectorConfig The TransportConfiguration of the server to connect to.
- * @param backupConnectorConfig The TransportConfiguration of the backup server to
connect to. can be null.
- * @return The HornetQConnectionFactory.
- */
- public static HornetQConnectionFactory createConnectionFactory(final
TransportConfiguration connectorConfig,
- final TransportConfiguration backupConnectorConfig)
- {
- return new HornetQConnectionFactory(connectorConfig, backupConnectorConfig);
- }
-
- /**
- * Creates a HornetQConnectionFactory to connect to a single live server.
- *
- * @param connectorConfig The TransportConfiguration of the server.
- * @return The HornetQConnectionFactory.
- */
- public static HornetQConnectionFactory createConnectionFactory(final
TransportConfiguration connectorConfig)
- {
- return new HornetQConnectionFactory(connectorConfig);
- }
-
- /**
* Creates a client-side representation of a JMS Topic.
*
* @param name the name of the topic
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -13,9 +13,6 @@
package org.hornetq.api.jms.management;
-import java.util.List;
-
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.management.Operation;
@@ -373,13 +370,8 @@
/**
* @see ClientSessionFactory#getStaticConnectors()
*/
- List<Pair<TransportConfiguration, TransportConfiguration>>
getStaticConnectors();
+ TransportConfiguration[] getStaticConnectors();
- /**
- * @see ClientSessionFactory#setStaticConnectors(java.util.List)
- */
- void setStaticConnectors(List<Pair<TransportConfiguration,
TransportConfiguration>> staticConnectors);
-
/**
* @see ClientSessionFactory#getLocalBindAddress()
*/
@@ -396,22 +388,10 @@
String getDiscoveryAddress();
/**
- * @see ClientSessionFactory#setDiscoveryAddress(String)
- */
- void setDiscoveryAddress(String discoveryAddress);
-
-
-
- /**
* @see ClientSessionFactory#getDiscoveryPort()
*/
int getDiscoveryPort();
- /**
- * @see ClientSessionFactory#setDiscoveryPort(int)
- */
- void setDiscoveryPort(int discoveryPort);
-
/**
* Add the JNDI binding to this destination
*/
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
@@ -39,10 +40,11 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
+import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -396,7 +398,12 @@
closed = true;
}
-
+
+ public ServerLocator getServerLocator()
+ {
+ return serverLocator;
+ }
+
// Public
//
---------------------------------------------------------------------------------------
@@ -1012,8 +1019,10 @@
connection = new RemotingConnectionImpl(tc, callTimeout, interceptors);
connection.addFailureListener(new
DelegatingFailureListener(connection.getID()));
+
+ Channel channel0 = connection.getChannel(0, -1);
- connection.getChannel(0, -1).setHandler(new Channel0Handler(connection));
+ channel0.setHandler(new Channel0Handler(connection));
if (clientFailureCheckPeriod != -1)
{
@@ -1033,6 +1042,11 @@
pingRunnable.run();
}
}
+
+ if (serverLocator.isHA())
+ {
+ channel0.send(new SubscribeClusterTopologyUpdatesMessage(false));
+ }
}
return connection;
@@ -1119,9 +1133,16 @@
}
else if (type == PacketImpl.CLUSTER_TOPOLOGY)
{
- ClusterTopologyMessage topMessage = (ClusterTopologyMessage)packet;
+ ClusterTopologyChangeMessage topMessage =
(ClusterTopologyChangeMessage)packet;
- serverLocator.onTopologyChanged(topMessage.getTopology());
+ if (topMessage.isExit())
+ {
+ serverLocator.nodeDown(topMessage.getNodeID());
+ }
+ else
+ {
+ serverLocator.nodeUP(topMessage.getNodeID(), topMessage.getPair(),
topMessage.isLast());
+ }
}
}
}
Deleted:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl_Old.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl_Old.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl_Old.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -1,1195 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.client.impl;
-
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
- * @version <tt>$Revision: 3602 $</tt>
- *
- */
-public class ClientSessionFactoryImpl_Old implements ClientSessionFactoryInternal,
DiscoveryListener, Serializable
-{
- // Constants
- //
------------------------------------------------------------------------------------
-
- private static final long serialVersionUID = 2512460695662741413L;
-
- private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
-
- // Attributes
- //
-----------------------------------------------------------------------------------
-
- private final Map<Pair<TransportConfiguration, TransportConfiguration>,
FailoverManager> failoverManagerMap = new
LinkedHashMap<Pair<TransportConfiguration, TransportConfiguration>,
FailoverManager>();
-
- private volatile boolean receivedBroadcast = false;
-
- private ExecutorService threadPool;
-
- private ScheduledExecutorService scheduledThreadPool;
-
- private DiscoveryGroup discoveryGroup;
-
- private ConnectionLoadBalancingPolicy loadBalancingPolicy;
-
- private FailoverManager[] failoverManagerArray;
-
- private boolean readOnly;
-
- // Settable attributes:
-
- private boolean cacheLargeMessagesClient =
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- private List<Pair<TransportConfiguration, TransportConfiguration>>
staticConnectors;
-
- private String localBindAddress;
-
- private String discoveryAddress;
-
- private int discoveryPort;
-
- private long discoveryRefreshTimeout;
-
- private long discoveryInitialWaitTimeout;
-
- private long clientFailureCheckPeriod;
-
- private long connectionTTL;
-
- private long callTimeout;
-
- private int minLargeMessageSize;
-
- private int consumerWindowSize;
-
- private int consumerMaxRate;
-
- private int confirmationWindowSize;
-
- private int producerWindowSize;
-
- private int producerMaxRate;
-
- private boolean blockOnAcknowledge;
-
- private boolean blockOnDurableSend;
-
- private boolean blockOnNonDurableSend;
-
- private boolean autoGroup;
-
- private boolean preAcknowledge;
-
- private String connectionLoadBalancingPolicyClassName;
-
- private int ackBatchSize;
-
- private boolean useGlobalPools;
-
- private int scheduledThreadPoolMaxSize;
-
- private int threadPoolMaxSize;
-
- private long retryInterval;
-
- private double retryIntervalMultiplier;
-
- private long maxRetryInterval;
-
- private int reconnectAttempts;
-
- private boolean failoverOnInitialConnection;
-
- private int initialMessagePacketSize;
-
- private volatile boolean closed;
-
- private boolean failoverOnServerShutdown;
-
- private final List<Interceptor> interceptors = new
CopyOnWriteArrayList<Interceptor>();
-
- private static ExecutorService globalThreadPool;
-
- private static ScheduledExecutorService globalScheduledThreadPool;
-
- private String groupID;
-
- private static synchronized ExecutorService getGlobalThreadPool()
- {
- if (ClientSessionFactoryImpl.globalThreadPool == null)
- {
- ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-global-threads", true,
getThisClassLoader());
-
- ClientSessionFactoryImpl.globalThreadPool =
Executors.newCachedThreadPool(factory);
- }
-
- return ClientSessionFactoryImpl.globalThreadPool;
- }
-
- private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
- {
- if (ClientSessionFactoryImpl.globalScheduledThreadPool == null)
- {
- ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-global-scheduled-threads", true,
getThisClassLoader());
-
- ClientSessionFactoryImpl.globalScheduledThreadPool =
Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-
-
factory);
- }
-
- return ClientSessionFactoryImpl.globalScheduledThreadPool;
- }
-
- private void setThreadPools()
- {
- if (useGlobalPools)
- {
- threadPool = ClientSessionFactoryImpl.getGlobalThreadPool();
-
- scheduledThreadPool = ClientSessionFactoryImpl.getGlobalScheduledThreadPool();
- }
- else
- {
- ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-factory-threads-" +
System.identityHashCode(this),
- true, getThisClassLoader());
-
- if (threadPoolMaxSize == -1)
- {
- threadPool = Executors.newCachedThreadPool(factory);
- }
- else
- {
- threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
- }
-
- factory = new
HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" +
System.identityHashCode(this),
- true, getThisClassLoader());
-
- scheduledThreadPool =
Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
- }
- }
-
- private synchronized void initialise() throws Exception
- {
- if (!readOnly)
- {
- setThreadPools();
-
- instantiateLoadBalancingPolicy();
-
- if (discoveryAddress != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
-
- InetAddress lbAddress;
-
- if (localBindAddress != null)
- {
- lbAddress = InetAddress.getByName(localBindAddress);
- }
- else
- {
- lbAddress = null;
- }
-
- discoveryGroup = new
DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
- discoveryAddress,
- lbAddress,
- groupAddress,
- discoveryPort,
- discoveryRefreshTimeout);
-
- discoveryGroup.registerListener(this);
-
- discoveryGroup.start();
- }
- else if (staticConnectors != null)
- {
- for (Pair<TransportConfiguration, TransportConfiguration> pair :
staticConnectors)
- {
- FailoverManager cm = new FailoverManagerImpl(this,
- pair.a,
- pair.b,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- failoverOnInitialConnection,
- threadPool,
- scheduledThreadPool,
- interceptors);
-
- failoverManagerMap.put(pair, cm);
- }
-
- updatefailoverManagerArray();
- }
- else
- {
- throw new IllegalStateException("Before using a session factory you must
either set discovery address and port or " + "provide some static transport
configuration");
- }
- readOnly = true;
- }
- }
-
- // Static
- //
---------------------------------------------------------------------------------------
-
- // Constructors
- // ---------------------------------------------------------------------------------
-
- public ClientSessionFactoryImpl_Old(final ClientSessionFactory other)
- {
- localBindAddress = other.getLocalBindAddress();
-
- discoveryAddress = other.getDiscoveryAddress();
-
- discoveryPort = other.getDiscoveryPort();
-
- staticConnectors = other.getStaticConnectors();
-
- discoveryRefreshTimeout = other.getDiscoveryRefreshTimeout();
-
- clientFailureCheckPeriod = other.getClientFailureCheckPeriod();
-
- connectionTTL = other.getConnectionTTL();
-
- callTimeout = other.getCallTimeout();
-
- minLargeMessageSize = other.getMinLargeMessageSize();
-
- consumerWindowSize = other.getConsumerWindowSize();
-
- consumerMaxRate = other.getConsumerMaxRate();
-
- confirmationWindowSize = other.getConfirmationWindowSize();
-
- producerWindowSize = other.getProducerWindowSize();
-
- producerMaxRate = other.getProducerMaxRate();
-
- blockOnAcknowledge = other.isBlockOnAcknowledge();
-
- blockOnDurableSend = other.isBlockOnDurableSend();
-
- blockOnNonDurableSend = other.isBlockOnNonDurableSend();
-
- autoGroup = other.isAutoGroup();
-
- preAcknowledge = other.isPreAcknowledge();
-
- ackBatchSize = other.getAckBatchSize();
-
- connectionLoadBalancingPolicyClassName =
other.getConnectionLoadBalancingPolicyClassName();
-
- discoveryInitialWaitTimeout = other.getDiscoveryInitialWaitTimeout();
-
- useGlobalPools = other.isUseGlobalPools();
-
- scheduledThreadPoolMaxSize = other.getScheduledThreadPoolMaxSize();
-
- threadPoolMaxSize = other.getThreadPoolMaxSize();
-
- retryInterval = other.getRetryInterval();
-
- retryIntervalMultiplier = other.getRetryIntervalMultiplier();
-
- maxRetryInterval = other.getMaxRetryInterval();
-
- reconnectAttempts = other.getReconnectAttempts();
-
- failoverOnInitialConnection = other.isFailoverOnInitialConnection();
-
- failoverOnServerShutdown = other.isFailoverOnServerShutdown();
-
- cacheLargeMessagesClient = other.isCacheLargeMessagesClient();
-
- initialMessagePacketSize = other.getInitialMessagePacketSize();
-
- groupID = other.getGroupID();
- }
-
- public ClientSessionFactoryImpl_Old()
- {
- discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
-
- clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
- connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
-
- minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
- consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
- consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
-
- confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
- producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
- producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
-
- blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
- blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
- blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
- autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
-
- preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
- ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
-
- connectionLoadBalancingPolicyClassName =
HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
- discoveryInitialWaitTimeout =
HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
-
- useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
-
- scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
- threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
- retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
-
- retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
- maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
- reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
- failoverOnInitialConnection =
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
-
- failoverOnServerShutdown = HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
- }
-
- public ClientSessionFactoryImpl_Old(final String discoveryAddress, final int
discoveryPort)
- {
- this();
-
- this.discoveryAddress = discoveryAddress;
-
- this.discoveryPort = discoveryPort;
- }
-
- public ClientSessionFactoryImpl_Old(final String localBindAddress, final String
discoveryAddress, final int discoveryPort)
- {
- this();
-
- this.localBindAddress = localBindAddress;
-
- this.discoveryAddress = discoveryAddress;
-
- this.discoveryPort = discoveryPort;
- }
-
- public ClientSessionFactoryImpl_Old(final List<Pair<TransportConfiguration,
TransportConfiguration>> staticConnectors)
- {
- this();
-
- this.staticConnectors = staticConnectors;
- }
-
- public ClientSessionFactoryImpl_Old(final TransportConfiguration connectorConfig,
- final TransportConfiguration backupConnectorConfig)
- {
- this();
-
- staticConnectors = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
-
- staticConnectors.add(new Pair<TransportConfiguration,
TransportConfiguration>(connectorConfig,
-
backupConnectorConfig));
- }
-
- public ClientSessionFactoryImpl_Old(final TransportConfiguration connectorConfig)
- {
- this(connectorConfig, null);
- }
-
- // ClientSessionFactory
implementation------------------------------------------------------------
-
- public synchronized boolean isCacheLargeMessagesClient()
- {
- return cacheLargeMessagesClient;
- }
-
- public synchronized void setCacheLargeMessagesClient(final boolean cached)
- {
- cacheLargeMessagesClient = cached;
- }
-
- public synchronized List<Pair<TransportConfiguration,
TransportConfiguration>> getStaticConnectors()
- {
- return staticConnectors;
- }
-
- public synchronized void setStaticConnectors(final
List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
- {
- checkWrite();
-
- this.staticConnectors = staticConnectors;
- }
-
- public synchronized long getClientFailureCheckPeriod()
- {
- return clientFailureCheckPeriod;
- }
-
- public synchronized void setClientFailureCheckPeriod(final long
clientFailureCheckPeriod)
- {
- checkWrite();
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- }
-
- public synchronized long getConnectionTTL()
- {
- return connectionTTL;
- }
-
- public synchronized void setConnectionTTL(final long connectionTTL)
- {
- checkWrite();
- this.connectionTTL = connectionTTL;
- }
-
- public synchronized long getCallTimeout()
- {
- return callTimeout;
- }
-
- public synchronized void setCallTimeout(final long callTimeout)
- {
- checkWrite();
- this.callTimeout = callTimeout;
- }
-
- public synchronized int getMinLargeMessageSize()
- {
- return minLargeMessageSize;
- }
-
- public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
- {
- checkWrite();
- this.minLargeMessageSize = minLargeMessageSize;
- }
-
- public synchronized int getConsumerWindowSize()
- {
- return consumerWindowSize;
- }
-
- public synchronized void setConsumerWindowSize(final int consumerWindowSize)
- {
- checkWrite();
- this.consumerWindowSize = consumerWindowSize;
- }
-
- public synchronized int getConsumerMaxRate()
- {
- return consumerMaxRate;
- }
-
- public synchronized void setConsumerMaxRate(final int consumerMaxRate)
- {
- checkWrite();
- this.consumerMaxRate = consumerMaxRate;
- }
-
- public synchronized int getConfirmationWindowSize()
- {
- return confirmationWindowSize;
- }
-
- public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
- {
- checkWrite();
- this.confirmationWindowSize = confirmationWindowSize;
- }
-
- public synchronized int getProducerWindowSize()
- {
- return producerWindowSize;
- }
-
- public synchronized void setProducerWindowSize(final int producerWindowSize)
- {
- checkWrite();
- this.producerWindowSize = producerWindowSize;
- }
-
- public synchronized int getProducerMaxRate()
- {
- return producerMaxRate;
- }
-
- public synchronized void setProducerMaxRate(final int producerMaxRate)
- {
- checkWrite();
- this.producerMaxRate = producerMaxRate;
- }
-
- public synchronized boolean isBlockOnAcknowledge()
- {
- return blockOnAcknowledge;
- }
-
- public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
- {
- checkWrite();
- this.blockOnAcknowledge = blockOnAcknowledge;
- }
-
- public synchronized boolean isBlockOnDurableSend()
- {
- return blockOnDurableSend;
- }
-
- public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
- {
- checkWrite();
- this.blockOnDurableSend = blockOnDurableSend;
- }
-
- public synchronized boolean isBlockOnNonDurableSend()
- {
- return blockOnNonDurableSend;
- }
-
- public synchronized void setBlockOnNonDurableSend(final boolean
blockOnNonDurableSend)
- {
- checkWrite();
- this.blockOnNonDurableSend = blockOnNonDurableSend;
- }
-
- public synchronized boolean isAutoGroup()
- {
- return autoGroup;
- }
-
- public synchronized void setAutoGroup(final boolean autoGroup)
- {
- checkWrite();
- this.autoGroup = autoGroup;
- }
-
- public synchronized boolean isPreAcknowledge()
- {
- return preAcknowledge;
- }
-
- public synchronized void setPreAcknowledge(final boolean preAcknowledge)
- {
- checkWrite();
- this.preAcknowledge = preAcknowledge;
- }
-
- public synchronized int getAckBatchSize()
- {
- return ackBatchSize;
- }
-
- public synchronized void setAckBatchSize(final int ackBatchSize)
- {
- checkWrite();
- this.ackBatchSize = ackBatchSize;
- }
-
- public synchronized long getDiscoveryInitialWaitTimeout()
- {
- return discoveryInitialWaitTimeout;
- }
-
- public synchronized void setDiscoveryInitialWaitTimeout(final long
initialWaitTimeout)
- {
- checkWrite();
- discoveryInitialWaitTimeout = initialWaitTimeout;
- }
-
- public synchronized boolean isUseGlobalPools()
- {
- return useGlobalPools;
- }
-
- public synchronized void setUseGlobalPools(final boolean useGlobalPools)
- {
- checkWrite();
- this.useGlobalPools = useGlobalPools;
- }
-
- public synchronized int getScheduledThreadPoolMaxSize()
- {
- return scheduledThreadPoolMaxSize;
- }
-
- public synchronized void setScheduledThreadPoolMaxSize(final int
scheduledThreadPoolMaxSize)
- {
- checkWrite();
- this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
- }
-
- public synchronized int getThreadPoolMaxSize()
- {
- return threadPoolMaxSize;
- }
-
- public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
- {
- checkWrite();
- this.threadPoolMaxSize = threadPoolMaxSize;
- }
-
- public synchronized long getRetryInterval()
- {
- return retryInterval;
- }
-
- public synchronized void setRetryInterval(final long retryInterval)
- {
- checkWrite();
- this.retryInterval = retryInterval;
- }
-
- public synchronized long getMaxRetryInterval()
- {
- return maxRetryInterval;
- }
-
- public synchronized void setMaxRetryInterval(final long retryInterval)
- {
- checkWrite();
- maxRetryInterval = retryInterval;
- }
-
- public synchronized double getRetryIntervalMultiplier()
- {
- return retryIntervalMultiplier;
- }
-
- public synchronized void setRetryIntervalMultiplier(final double
retryIntervalMultiplier)
- {
- checkWrite();
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- }
-
- public synchronized int getReconnectAttempts()
- {
- return reconnectAttempts;
- }
-
- public synchronized void setReconnectAttempts(final int reconnectAttempts)
- {
- checkWrite();
- this.reconnectAttempts = reconnectAttempts;
- }
-
- public synchronized boolean isFailoverOnInitialConnection()
- {
- return this.failoverOnInitialConnection;
- }
-
- public synchronized void setFailoverOnInitialConnection(final boolean failover)
- {
- checkWrite();
- this.failoverOnInitialConnection = failover;
- }
-
- public synchronized boolean isFailoverOnServerShutdown()
- {
- return failoverOnServerShutdown;
- }
-
- public synchronized void setFailoverOnServerShutdown(final boolean
failoverOnServerShutdown)
- {
- checkWrite();
- this.failoverOnServerShutdown = failoverOnServerShutdown;
- }
-
- public synchronized String getConnectionLoadBalancingPolicyClassName()
- {
- return connectionLoadBalancingPolicyClassName;
- }
-
- public synchronized void setConnectionLoadBalancingPolicyClassName(final String
loadBalancingPolicyClassName)
- {
- checkWrite();
- connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
- }
-
- public synchronized String getLocalBindAddress()
- {
- return localBindAddress;
- }
-
- public synchronized void setLocalBindAddress(final String localBindAddress)
- {
- checkWrite();
- this.localBindAddress = localBindAddress;
- }
-
- public synchronized String getDiscoveryAddress()
- {
- return discoveryAddress;
- }
-
- public synchronized void setDiscoveryAddress(final String discoveryAddress)
- {
- checkWrite();
- this.discoveryAddress = discoveryAddress;
- }
-
- public synchronized int getDiscoveryPort()
- {
- return discoveryPort;
- }
-
- public synchronized void setDiscoveryPort(final int discoveryPort)
- {
- checkWrite();
- this.discoveryPort = discoveryPort;
- }
-
- public synchronized long getDiscoveryRefreshTimeout()
- {
- return discoveryRefreshTimeout;
- }
-
- public void addInterceptor(final Interceptor interceptor)
- {
- interceptors.add(interceptor);
- }
-
- public boolean removeInterceptor(final Interceptor interceptor)
- {
- return interceptors.remove(interceptor);
- }
-
- public synchronized void setDiscoveryRefreshTimeout(final long
discoveryRefreshTimeout)
- {
- checkWrite();
- this.discoveryRefreshTimeout = discoveryRefreshTimeout;
- }
-
- public synchronized int getInitialMessagePacketSize()
- {
- return initialMessagePacketSize;
- }
-
- public synchronized void setInitialMessagePacketSize(final int size)
- {
- checkWrite();
- initialMessagePacketSize = size;
- }
-
- public ClientSession createSession(final String username,
- final String password,
- final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final int ackBatchSize) throws HornetQException
- {
- return createSessionInternal(username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- ackBatchSize);
- }
-
- public ClientSession createSession(final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final int ackBatchSize) throws HornetQException
- {
- return createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks,
preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createXASession() throws HornetQException
- {
- return createSessionInternal(null, null, true, false, false, preAcknowledge,
ackBatchSize);
- }
-
- public ClientSession createTransactedSession() throws HornetQException
- {
- return createSessionInternal(null, null, false, false, false, preAcknowledge,
ackBatchSize);
- }
-
- public ClientSession createSession() throws HornetQException
- {
- return createSessionInternal(null, null, false, true, true, preAcknowledge,
ackBatchSize);
- }
-
- public ClientSession createSession(final boolean autoCommitSends, final boolean
autoCommitAcks) throws HornetQException
- {
- return createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks,
preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createSession(final boolean xa, final boolean autoCommitSends,
final boolean autoCommitAcks) throws HornetQException
- {
- return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks,
preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createSession(final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge) throws
HornetQException
- {
- return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks,
preAcknowledge, ackBatchSize);
- }
-
- public int numSessions()
- {
- int num = 0;
-
- for (FailoverManager failoverManager : failoverManagerMap.values())
- {
- num += failoverManager.numSessions();
- }
-
- return num;
- }
-
- public int numConnections()
- {
- int num = 0;
-
- for (FailoverManager failoverManager : failoverManagerMap.values())
- {
- num += failoverManager.numConnections();
- }
-
- return num;
- }
-
- public void close()
- {
- if (closed)
- {
- return;
- }
-
- if (discoveryGroup != null)
- {
- try
- {
- discoveryGroup.stop();
- }
- catch (Exception e)
- {
- ClientSessionFactoryImpl.log.error("Failed to stop discovery
group", e);
- }
- }
-
- for (FailoverManager failoverManager : failoverManagerMap.values())
- {
- failoverManager.causeExit();
- }
-
- failoverManagerMap.clear();
-
- if (!useGlobalPools)
- {
- if (threadPool != null)
- {
- threadPool.shutdown();
-
- try
- {
- if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- ClientSessionFactoryImpl.log.warn("Timed out waiting for pool to
terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
-
- if (scheduledThreadPool != null)
- {
- scheduledThreadPool.shutdown();
-
- try
- {
- if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- ClientSessionFactoryImpl.log.warn("Timed out waiting for scheduled
pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
-
- closed = true;
- }
-
- public ClientSessionFactory copy()
- {
- return new ClientSessionFactoryImpl(this);
- }
-
- public void setGroupID(final String groupID)
- {
- this.groupID = groupID;
- }
-
- public String getGroupID()
- {
- return groupID;
- }
-
- // DiscoveryListener implementation
--------------------------------------------------------
-
- public synchronized void connectorsChanged()
- {
- receivedBroadcast = true;
-
- Map<String, DiscoveryEntry> newConnectors =
discoveryGroup.getDiscoveryEntryMap();
-
- Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet
= new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
-
- for (DiscoveryEntry entry : newConnectors.values())
- {
- connectorSet.add(entry.getConnectorPair());
- }
-
- Iterator<Map.Entry<Pair<TransportConfiguration,
TransportConfiguration>, FailoverManager>> iter = failoverManagerMap.entrySet()
-
.iterator();
- while (iter.hasNext())
- {
- Map.Entry<Pair<TransportConfiguration, TransportConfiguration>,
FailoverManager> entry = iter.next();
-
- if (!connectorSet.contains(entry.getKey()))
- {
- // failoverManager no longer there - we should remove it
-
- iter.remove();
- }
- }
-
- for (Pair<TransportConfiguration, TransportConfiguration> connectorPair :
connectorSet)
- {
- if (!failoverManagerMap.containsKey(connectorPair))
- {
- // Create a new failoverManager
-
- FailoverManager failoverManager = new FailoverManagerImpl(this,
- connectorPair.a,
- connectorPair.b,
-
failoverOnServerShutdown,
- callTimeout,
-
clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
-
retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
-
failoverOnInitialConnection,
- threadPool,
-
scheduledThreadPool,
- interceptors);
-
- failoverManagerMap.put(connectorPair, failoverManager);
- }
- }
-
- updatefailoverManagerArray();
- }
-
- public FailoverManager[] getFailoverManagers()
- {
- return failoverManagerArray;
- }
-
- // Protected
------------------------------------------------------------------------------
-
- @Override
- protected void finalize() throws Throwable
- {
- close();
-
- super.finalize();
- }
-
- // Private
--------------------------------------------------------------------------------
-
- private void checkWrite()
- {
- if (readOnly)
- {
- throw new IllegalStateException("Cannot set attribute on SessionFactory
after it has been used");
- }
- }
-
- private ClientSession createSessionInternal(final String username,
- final String password,
- final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final int ackBatchSize) throws
HornetQException
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session, factory is closed
(maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to
initialise session factory", e);
- }
-
- if (discoveryGroup != null && !receivedBroadcast)
- {
- boolean ok = discoveryGroup.waitForBroadcast(discoveryInitialWaitTimeout);
-
- if (!ok)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial
broadcast from discovery group");
- }
- }
-
- synchronized (this)
- {
- int pos = loadBalancingPolicy.select(failoverManagerArray.length);
-
- FailoverManager failoverManager = failoverManagerArray[pos];
-
- ClientSession session = failoverManager.createSession(username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- ackBatchSize,
- cacheLargeMessagesClient,
- minLargeMessageSize,
- blockOnAcknowledge,
- autoGroup,
- confirmationWindowSize,
- producerWindowSize,
- consumerWindowSize,
- producerMaxRate,
- consumerMaxRate,
- blockOnNonDurableSend,
- blockOnDurableSend,
- initialMessagePacketSize,
- groupID);
-
- return session;
- }
- }
-
- private void instantiateLoadBalancingPolicy()
- {
- if (connectionLoadBalancingPolicyClassName == null)
- {
- throw new IllegalStateException("Please specify a load balancing policy
class name on the session factory");
- }
-
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- public Object run()
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz =
loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
- return null;
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Unable to instantiate load
balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
- }
- }
- });
- }
-
- private static ClassLoader getThisClassLoader()
- {
- return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return
ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
- }
-
- private synchronized void updatefailoverManagerArray()
- {
- failoverManagerArray = new FailoverManager[failoverManagerMap.size()];
-
- failoverManagerMap.values().toArray(failoverManagerArray);
- }
-
-}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -23,6 +23,7 @@
*/
public interface ClientSessionFactoryInternal extends ClientSessionFactory
{
+ void causeExit();
void addFailureListener(SessionFailureListener listener);
Deleted:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/FailoverManagerImpl_Old.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/FailoverManagerImpl_Old.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/FailoverManagerImpl_Old.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -1,1227 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.ChannelHandler;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
-import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.Ping;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.version.Version;
-import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.spi.core.remoting.Connection;
-import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
-import org.hornetq.spi.core.remoting.Connector;
-import org.hornetq.spi.core.remoting.ConnectorFactory;
-import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.ConfigurationHelper;
-import org.hornetq.utils.ExecutorFactory;
-import org.hornetq.utils.OrderedExecutorFactory;
-import org.hornetq.utils.UUIDGenerator;
-import org.hornetq.utils.VersionLoader;
-
-/**
- * A ConnectionManagerImpl
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 27 Nov 2008 18:46:06
- *
- */
-public class FailoverManagerImpl_Old implements FailoverManager,
ConnectionLifeCycleListener
-{
- // Constants
- //
------------------------------------------------------------------------------------
-
- private static final long serialVersionUID = 2512460695662741413L;
-
- private static final Logger log = Logger.getLogger(FailoverManagerImpl.class);
-
- // debug
-
- private static Map<TransportConfiguration, Set<CoreRemotingConnection>>
debugConns;
-
- private static boolean debug = false;
-
- public static void enableDebug()
- {
- FailoverManagerImpl.debug = true;
-
- FailoverManagerImpl.debugConns = new ConcurrentHashMap<TransportConfiguration,
Set<CoreRemotingConnection>>();
- }
-
- public static void disableDebug()
- {
- FailoverManagerImpl.debug = false;
-
- FailoverManagerImpl.debugConns.clear();
- FailoverManagerImpl.debugConns = null;
- }
-
- private void checkAddDebug(final CoreRemotingConnection conn)
- {
- Set<CoreRemotingConnection> conns;
-
- synchronized (FailoverManagerImpl.debugConns)
- {
- conns = FailoverManagerImpl.debugConns.get(connectorConfig);
-
- if (conns == null)
- {
- conns = new HashSet<CoreRemotingConnection>();
-
- FailoverManagerImpl.debugConns.put(connectorConfig, conns);
- }
-
- conns.add(conn);
- }
- }
-
- public static void failAllConnectionsForConnector(final TransportConfiguration
config)
- {
- Set<CoreRemotingConnection> conns;
-
- synchronized (FailoverManagerImpl.debugConns)
- {
- conns = FailoverManagerImpl.debugConns.get(config);
-
- if (conns != null)
- {
- conns = new
HashSet<CoreRemotingConnection>(FailoverManagerImpl.debugConns.get(config));
- }
- }
-
- if (conns != null)
- {
- for (CoreRemotingConnection conn : conns)
- {
- conn.fail(new HornetQException(HornetQException.INTERNAL_ERROR,
"simulated connection failure"));
- }
- }
- }
-
- // Attributes
- //
-----------------------------------------------------------------------------------
-
- private final TransportConfiguration connectorConfig;
-
- private ConnectorFactory connectorFactory;
-
- private Map<String, Object> transportParams;
-
- private ConnectorFactory backupConnectorFactory;
-
- private Map<String, Object> backupTransportParams;
-
- private final long callTimeout;
-
- private final long clientFailureCheckPeriod;
-
- private final long connectionTTL;
-
- private final Set<ClientSessionInternal> sessions = new
HashSet<ClientSessionInternal>();
-
- private final Object exitLock = new Object();
-
- private final Object createSessionLock = new Object();
-
- private boolean inCreateSession;
-
- private final Object failoverLock = new Object();
-
- private final ExecutorFactory orderedExecutorFactory;
-
- private final ExecutorService threadPool;
-
- private final ScheduledExecutorService scheduledThreadPool;
-
- private final Executor closeExecutor;
-
- private CoreRemotingConnection connection;
-
- private final long retryInterval;
-
- private final double retryIntervalMultiplier; // For exponential backoff
-
- private final long maxRetryInterval;
-
- private final int reconnectAttempts;
-
- private final boolean failoverOnServerShutdown;
-
- private final Set<SessionFailureListener> listeners = new
ConcurrentHashSet<SessionFailureListener>();
-
- private Connector connector;
-
- private Future<?> pingerFuture;
-
- private PingRunnable pingRunnable;
-
- private volatile boolean exitLoop;
-
- private final List<Interceptor> interceptors;
-
- private volatile boolean stopPingingAfterOne;
-
- private final boolean failoverOnInitialConnection;
-
- // Static
- //
---------------------------------------------------------------------------------------
-
- // Constructors
- // ---------------------------------------------------------------------------------
-
- public FailoverManagerImpl_Old(final ClientSessionFactory sessionFactory,
- final TransportConfiguration connectorConfig,
- final TransportConfiguration backupConfig,
- final boolean failoverOnServerShutdown,
- final long callTimeout,
- final long clientFailureCheckPeriod,
- final long connectionTTL,
- final long retryInterval,
- final double retryIntervalMultiplier,
- final long maxRetryInterval,
- final int reconnectAttempts,
- final boolean failoverOnInitialConnection,
- final ExecutorService threadPool,
- final ScheduledExecutorService scheduledThreadPool,
- final List<Interceptor> interceptors)
- {
- this.connectorConfig = connectorConfig;
-
- this.failoverOnServerShutdown = failoverOnServerShutdown;
-
- connectorFactory =
instantiateConnectorFactory(connectorConfig.getFactoryClassName());
-
- transportParams = connectorConfig.getParams();
-
- checkTransportKeys(connectorFactory, transportParams);
-
- if (backupConfig != null)
- {
- backupConnectorFactory =
instantiateConnectorFactory(backupConfig.getFactoryClassName());
-
- backupTransportParams = backupConfig.getParams();
-
- checkTransportKeys(backupConnectorFactory, backupTransportParams);
- }
- else
- {
- backupConnectorFactory = null;
-
- backupTransportParams = null;
- }
-
- this.callTimeout = callTimeout;
-
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-
- this.connectionTTL = connectionTTL;
-
- this.retryInterval = retryInterval;
-
- this.retryIntervalMultiplier = retryIntervalMultiplier;
-
- this.maxRetryInterval = maxRetryInterval;
-
- this.reconnectAttempts = reconnectAttempts;
-
- this.failoverOnInitialConnection = failoverOnInitialConnection;
-
- this.scheduledThreadPool = scheduledThreadPool;
-
- this.threadPool = threadPool;
-
- orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
-
- closeExecutor = orderedExecutorFactory.getExecutor();
-
- this.interceptors = interceptors;
- }
-
- // ConnectionLifeCycleListener implementation
--------------------------------------------------
-
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
- {
- }
-
- public void connectionDestroyed(final Object connectionID)
- {
- handleConnectionFailure(connectionID,
- new HornetQException(HornetQException.NOT_CONNECTED,
"Channel disconnected"));
- }
-
- public void connectionException(final Object connectionID, final HornetQException me)
- {
- handleConnectionFailure(connectionID, me);
- }
-
- // ConnectionManager implementation
------------------------------------------------------------------
-
- public ClientSession createSession(final String username,
- final String password,
- final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final int ackBatchSize,
- final boolean cacheLargeMessageClient,
- final int minLargeMessageSize,
- final boolean blockOnAcknowledge,
- final boolean autoGroup,
- final int confWindowSize,
- final int producerWindowSize,
- final int consumerWindowSize,
- final int producerMaxRate,
- final int consumerMaxRate,
- final boolean blockOnNonDurableSend,
- final boolean blockOnDurableSend,
- final int initialMessagePacketSize,
- final String groupID) throws HornetQException
- {
- synchronized (createSessionLock)
- {
- String name =
UUIDGenerator.getInstance().generateSimpleStringUUID().toString();
-
- boolean retry = false;
- do
- {
- Version clientVersion = VersionLoader.getVersion();
-
- CoreRemotingConnection theConnection = null;
-
- Lock lock = null;
-
- try
- {
- Channel channel1;
-
- synchronized (failoverLock)
- {
- theConnection = getConnectionWithRetry(reconnectAttempts);
-
- if (theConnection == null)
- {
- if (exitLoop)
- {
- return null;
- }
-
- if (failoverOnInitialConnection && backupConnectorFactory !=
null)
- {
- // Try and connect to the backup
-
- log.warn("Server is not available to make initial connection
to. Will " + "try backup server instead.");
-
- connectorFactory = backupConnectorFactory;
-
- transportParams = backupTransportParams;
-
- backupConnectorFactory = null;
-
- backupTransportParams = null;
-
- theConnection = getConnectionWithRetry(reconnectAttempts);
- }
-
- if (exitLoop)
- {
- return null;
- }
-
- if (theConnection == null)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Unable to connect to server
using configuration " + connectorConfig);
- }
- }
-
- channel1 = theConnection.getChannel(1, -1);
-
- // Lock it - this must be done while the failoverLock is held
- channel1.getLock().lock();
-
- lock = channel1.getLock();
- } // We can now release the failoverLock
-
- // We now set a flag saying createSession is executing
- synchronized (exitLock)
- {
- inCreateSession = true;
- }
-
- long sessionChannelID = theConnection.generateChannelID();
-
- Packet request = new CreateSessionMessage(name,
- sessionChannelID,
-
clientVersion.getIncrementingVersion(),
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- confWindowSize,
- null);
-
- Packet pResponse;
- try
- {
- pResponse = channel1.sendBlocking(request);
- }
- catch (HornetQException e)
- {
- if (e.getCode() ==
HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
- {
- theConnection.destroy();
- }
-
- if (e.getCode() == HornetQException.UNBLOCKED)
- {
- // This means the thread was blocked on create session and failover
unblocked it
- // so failover could occur
-
- retry = true;
-
- continue;
- }
- else
- {
- throw e;
- }
- }
-
- CreateSessionResponseMessage response =
(CreateSessionResponseMessage)pResponse;
-
- Channel sessionChannel = theConnection.getChannel(sessionChannelID,
confWindowSize);
-
- ClientSessionInternal session = new ClientSessionImpl(this,
- name,
- username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- blockOnAcknowledge,
- autoGroup,
- ackBatchSize,
- consumerWindowSize,
- consumerMaxRate,
- confWindowSize,
- producerWindowSize,
- producerMaxRate,
-
blockOnNonDurableSend,
- blockOnDurableSend,
-
cacheLargeMessageClient,
-
minLargeMessageSize,
-
initialMessagePacketSize,
- groupID,
- theConnection,
-
response.getServerVersion(),
- sessionChannel,
-
orderedExecutorFactory.getExecutor());
-
- sessions.add(session);
-
- ChannelHandler handler = new ClientSessionPacketHandler(session,
sessionChannel);
-
- sessionChannel.setHandler(handler);
-
- return new DelegatingSession(session);
- }
- catch (Throwable t)
- {
- if (lock != null)
- {
- lock.unlock();
-
- lock = null;
- }
-
- if (t instanceof HornetQException)
- {
- throw (HornetQException)t;
- }
- else
- {
- HornetQException me = new
HornetQException(HornetQException.INTERNAL_ERROR,
- "Failed to create
session", t);
-
- throw me;
- }
- }
- finally
- {
- if (lock != null)
- {
- lock.unlock();
- }
-
- // Execution has finished so notify any failover thread that may be
waiting for us to be done
- synchronized (exitLock)
- {
- inCreateSession = false;
-
- exitLock.notify();
- }
- }
- }
- while (retry);
- }
-
- // Should never get here
- throw new IllegalStateException("Oh my God it's full of stars!");
- }
-
- // Must be synchronized to prevent it happening concurrently with failover which can
lead to
- // inconsistencies
- public void removeSession(final ClientSessionInternal session)
- {
- // TODO - can we simplify this locking?
- synchronized (createSessionLock)
- {
- synchronized (failoverLock)
- {
- sessions.remove(session);
-
- checkCloseConnection();
- }
- }
- }
-
- public synchronized int numConnections()
- {
- return connection != null ? 1 : 0;
- }
-
- public int numSessions()
- {
- return sessions.size();
- }
-
- public void addFailureListener(final SessionFailureListener listener)
- {
- listeners.add(listener);
- }
-
- public boolean removeFailureListener(final SessionFailureListener listener)
- {
- return listeners.remove(listener);
- }
-
- public void causeExit()
- {
- exitLoop = true;
- }
-
- // Public
- //
---------------------------------------------------------------------------------------
-
- public void stopPingingAfterOne()
- {
- stopPingingAfterOne = true;
- }
-
- // Protected
- //
------------------------------------------------------------------------------------
-
- // Package Private
- // ------------------------------------------------------------------------------
-
- // Private
- //
--------------------------------------------------------------------------------------
-
- private void handleConnectionFailure(final Object connectionID, final HornetQException
me)
- {
- failoverOrReconnect(connectionID, me);
- }
-
- private void failoverOrReconnect(final Object connectionID, final HornetQException
me)
- {
- Set<ClientSessionInternal> sessionsToClose = null;
-
- synchronized (failoverLock)
- {
- if (connection == null || connection.getID() != connectionID)
- {
- // We already failed over/reconnected - probably the first failure came in,
all the connections were failed
- // over then a async connection exception or disconnect
- // came in for one of the already exitLoop connections, so we return true -
we don't want to call the
- // listeners again
-
- return;
- }
-
- // We call before reconnection occurs to give the user a chance to do cleanup,
like cancel messages
- callFailureListeners(me, false);
-
- // Now get locks on all channel 1s, whilst holding the failoverLock - this makes
sure
- // There are either no threads executing in createSession, or one is blocking on
a createSession
- // result.
-
- // Then interrupt the channel 1 that is blocking (could just interrupt them
all)
-
- // Then release all channel 1 locks - this allows the createSession to exit the
monitor
-
- // Then get all channel 1 locks again - this ensures the any createSession
thread has executed the section and
- // returned all its connections to the connection manager (the code to return
connections to connection manager
- // must be inside the lock
-
- // Then perform failover
-
- // Then release failoverLock
-
- // The other side of the bargain - during createSession:
- // The calling thread must get the failoverLock and get its' connections
when this is locked.
- // While this is still locked it must then get the channel1 lock
- // It can then release the failoverLock
- // It should catch HornetQException.INTERRUPTED in the call to
channel.sendBlocking
- // It should then return its connections, with channel 1 lock still held
- // It can then release the channel 1 lock, and retry (which will cause locking
on failoverLock
- // until failover is complete
-
- boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
-
- // We will try to failover if there is a backup connector factory, but we
don't do this if the server
- // has been shutdown cleanly unless failoverOnServerShutdown is true
- boolean attemptFailover = backupConnectorFactory != null &&
(failoverOnServerShutdown || !serverShutdown);
-
- boolean attemptReconnect;
-
- if (attemptFailover)
- {
- attemptReconnect = false;
- }
- else
- {
- attemptReconnect = reconnectAttempts != 0;
- }
-
- if (attemptFailover || attemptReconnect)
- {
- lockChannel1();
-
- final boolean needToInterrupt;
-
- synchronized (exitLock)
- {
- needToInterrupt = inCreateSession;
- }
-
- unlockChannel1();
-
- if (needToInterrupt)
- {
- // Forcing return all channels won't guarantee that any blocked thread
will return immediately
- // So we need to wait for it
- forceReturnChannel1();
-
- // Now we need to make sure that the thread has actually exited and
returned it's connections
- // before failover occurs
-
- synchronized (exitLock)
- {
- while (inCreateSession)
- {
- try
- {
- exitLock.wait(5000);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- }
-
- // Now we absolutely know that no threads are executing in or blocked in
createSession, and no
- // more will execute it until failover is complete
-
- // So.. do failover / reconnection
-
- CoreRemotingConnection oldConnection = connection;
-
- connection = null;
-
- try
- {
- connector.close();
- }
- catch (Exception ignore)
- {
- }
-
- cancelScheduledTasks();
-
- connector = null;
-
- if (attemptFailover)
- {
- // Now try failing over to backup
-
- connectorFactory = backupConnectorFactory;
-
- transportParams = backupTransportParams;
-
- backupConnectorFactory = null;
-
- backupTransportParams = null;
-
- reconnectSessions(oldConnection, reconnectAttempts == -1 ? -1 :
reconnectAttempts + 1);
- }
- else
- {
- reconnectSessions(oldConnection, reconnectAttempts);
- }
-
- oldConnection.destroy();
- }
- else
- {
- connection.destroy();
-
- connection = null;
- }
-
- callFailureListeners(me, true);
-
- if (connection == null)
- {
- sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
- }
- }
-
- // This needs to be outside the failover lock to prevent deadlock
- if (sessionsToClose != null)
- {
- // If connection is null it means we didn't succeed in failing over or
reconnecting
- // so we close all the sessions, so they will throw exceptions when attempted to
be used
-
- for (ClientSessionInternal session : sessionsToClose)
- {
- try
- {
- session.cleanUp();
- }
- catch (Exception e)
- {
- FailoverManagerImpl.log.error("Failed to cleanup session");
- }
- }
- }
- }
-
- private void callFailureListeners(final HornetQException me, final boolean
afterReconnect)
- {
- final List<SessionFailureListener> listenersClone = new
ArrayList<SessionFailureListener>(listeners);
-
- for (final SessionFailureListener listener : listenersClone)
- {
- try
- {
- if (afterReconnect)
- {
- listener.connectionFailed(me);
- }
- else
- {
- listener.beforeReconnect(me);
- }
- }
- catch (final Throwable t)
- {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- FailoverManagerImpl.log.error("Failed to execute failure listener",
t);
- }
- }
- }
-
- /*
- * Re-attach sessions all pre-existing sessions to the new remoting connection
- */
- private void reconnectSessions(final CoreRemotingConnection oldConnection, final int
reconnectAttempts)
- {
- CoreRemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
-
- if (newConnection == null)
- {
- FailoverManagerImpl.log.warn("Failed to connect to server.");
-
- return;
- }
-
- List<FailureListener> oldListeners = oldConnection.getFailureListeners();
-
- List<FailureListener> newListeners = new
ArrayList<FailureListener>(newConnection.getFailureListeners());
-
- for (FailureListener listener : oldListeners)
- {
- // Add all apart from the first one which is the old DelegatingFailureListener
-
- if (listener instanceof DelegatingFailureListener == false)
- {
- newListeners.add(listener);
- }
- }
-
- newConnection.setFailureListeners(newListeners);
-
- for (ClientSessionInternal session : sessions)
- {
- session.handleFailover(newConnection);
- }
- }
-
- private CoreRemotingConnection getConnectionWithRetry(final int reconnectAttempts)
- {
- long interval = retryInterval;
-
- int count = 0;
-
- while (true)
- {
- if (exitLoop)
- {
- return null;
- }
-
- CoreRemotingConnection theConnection = getConnection();
-
- if (theConnection == null)
- {
- // Failed to get connection
-
- if (reconnectAttempts != 0)
- {
- count++;
-
- if (reconnectAttempts != -1 && count == reconnectAttempts)
- {
- FailoverManagerImpl.log.warn("Tried " + reconnectAttempts +
" times to connect. Now giving up.");
-
- return null;
- }
-
- try
- {
- Thread.sleep(interval);
- }
- catch (InterruptedException ignore)
- {
- }
-
- // Exponential back-off
- long newInterval = (long)(interval * retryIntervalMultiplier);
-
- if (newInterval > maxRetryInterval)
- {
- newInterval = maxRetryInterval;
- }
-
- interval = newInterval;
- }
- else
- {
- return null;
- }
- }
- else
- {
-
- if (FailoverManagerImpl.debug)
- {
- checkAddDebug(theConnection);
- }
-
- return theConnection;
- }
- }
- }
-
- private void cancelScheduledTasks()
- {
- if (pingerFuture != null)
- {
- pingRunnable.cancel();
-
- pingerFuture.cancel(false);
-
- pingRunnable = null;
-
- pingerFuture = null;
- }
- }
-
- private void checkCloseConnection()
- {
- if (connection != null && sessions.size() == 0)
- {
- cancelScheduledTasks();
-
- try
- {
- connection.destroy();
- }
- catch (Throwable ignore)
- {
- }
-
- connection = null;
-
- try
- {
- if (connector != null)
- {
- connector.close();
- }
- }
- catch (Throwable ignore)
- {
- }
-
- connector = null;
- }
- }
-
- public CoreRemotingConnection getConnection()
- {
- if (connection == null)
- {
- Connection tc = null;
-
- try
- {
- DelegatingBufferHandler handler = new DelegatingBufferHandler();
-
- connector = connectorFactory.createConnector(transportParams,
- handler,
- this,
- closeExecutor,
- threadPool,
- scheduledThreadPool);
-
- if (connector != null)
- {
- connector.start();
-
- tc = connector.createConnection();
-
- if (tc == null)
- {
- try
- {
- connector.close();
- }
- catch (Throwable t)
- {
- }
-
- connector = null;
- }
- }
- }
- catch (Exception e)
- {
- // Sanity catch for badly behaved remoting plugins
-
- FailoverManagerImpl.log.warn("connector.create or
connectorFactory.createConnector should never throw an exception, implementation is badly
behaved, but we'll deal with it anyway.",
- e);
-
- if (tc != null)
- {
- try
- {
- tc.close();
- }
- catch (Throwable t)
- {
- }
- }
-
- if (connector != null)
- {
- try
- {
- connector.close();
- }
- catch (Throwable t)
- {
- }
- }
-
- tc = null;
-
- connector = null;
- }
-
- if (tc == null)
- {
- return connection;
- }
-
- connection = new RemotingConnectionImpl(tc, callTimeout, interceptors);
-
- connection.addFailureListener(new
DelegatingFailureListener(connection.getID()));
-
- connection.getChannel(0, -1).setHandler(new Channel0Handler(connection));
-
- if (clientFailureCheckPeriod != -1)
- {
- if (pingerFuture == null)
- {
- pingRunnable = new PingRunnable();
-
- pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new
ActualScheduledPinger(pingRunnable),
- 0,
-
clientFailureCheckPeriod,
-
TimeUnit.MILLISECONDS);
- }
- // send a ping every time we create a new remoting connection
- // to set up its TTL on the server side
- else
- {
- pingRunnable.run();
- }
- }
- }
-
- return connection;
- }
-
- private ConnectorFactory instantiateConnectorFactory(final String
connectorFactoryClassName)
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectorFactoryClassName);
- return (ConnectorFactory)clazz.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Error instantiating connector factory
\"" + connectorFactoryClassName +
- "\"", e);
- }
- }
-
- private void lockChannel1()
- {
- Channel channel1 = connection.getChannel(1, -1);
-
- channel1.getLock().lock();
- }
-
- private void unlockChannel1()
- {
- Channel channel1 = connection.getChannel(1, -1);
-
- channel1.getLock().unlock();
- }
-
- private void forceReturnChannel1()
- {
- Channel channel1 = connection.getChannel(1, -1);
-
- channel1.returnBlocking();
- }
-
- private void checkTransportKeys(final ConnectorFactory factory, final Map<String,
Object> params)
- {
- if (params != null)
- {
- Set<String> invalid =
ConfigurationHelper.checkKeys(factory.getAllowableProperties(), params.keySet());
-
- if (!invalid.isEmpty())
- {
- String msg = ConfigurationHelper.stringSetToCommaListString("The
following keys are invalid for configuring a connector: ",
- invalid);
-
- throw new IllegalStateException(msg);
-
- }
- }
- }
-
- private class Channel0Handler implements ChannelHandler
- {
- private final CoreRemotingConnection conn;
-
- private Channel0Handler(final CoreRemotingConnection conn)
- {
- this.conn = conn;
- }
-
- public void handlePacket(final Packet packet)
- {
- final byte type = packet.getType();
-
- if (type == PacketImpl.DISCONNECT)
- {
- closeExecutor.execute(new Runnable()
- {
- // Must be executed on new thread since cannot block the netty thread for
a long time and fail can
- // cause reconnect loop
- public void run()
- {
- conn.fail(new HornetQException(HornetQException.DISCONNECTED,
- "The connection was disconnected
because of server shutdown"));
- }
- });
- }
- }
- }
-
- private class DelegatingBufferHandler implements BufferHandler
- {
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
- {
- CoreRemotingConnection theConn = connection;
-
- if (theConn != null && connectionID == theConn.getID())
- {
- theConn.bufferReceived(connectionID, buffer);
- }
- }
- }
-
- private class DelegatingFailureListener implements FailureListener
- {
- private final Object connectionID;
-
- DelegatingFailureListener(final Object connectionID)
- {
- this.connectionID = connectionID;
- }
-
- public void connectionFailed(final HornetQException me)
- {
- handleConnectionFailure(connectionID, me);
- }
- }
-
- private static final class ActualScheduledPinger implements Runnable
- {
- private final WeakReference<PingRunnable> pingRunnable;
-
- ActualScheduledPinger(final PingRunnable runnable)
- {
- pingRunnable = new WeakReference<PingRunnable>(runnable);
- }
-
- public void run()
- {
- PingRunnable runnable = pingRunnable.get();
-
- if (runnable != null)
- {
- runnable.run();
- }
- }
-
- }
-
- private final class PingRunnable implements Runnable
- {
- private boolean cancelled;
-
- private boolean first;
-
- private long lastCheck = System.currentTimeMillis();
-
- public synchronized void run()
- {
- if (cancelled || stopPingingAfterOne && !first)
- {
- return;
- }
-
- first = false;
-
- long now = System.currentTimeMillis();
-
- if (clientFailureCheckPeriod != -1 && now >= lastCheck +
clientFailureCheckPeriod)
- {
- if (!connection.checkDataReceived())
- {
- final HornetQException me = new
HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive
data from server for " + connection.getTransportConnection());
-
- cancelled = true;
-
- threadPool.execute(new Runnable()
- {
- // Must be executed on different thread
- public void run()
- {
- connection.fail(me);
- }
- });
-
- return;
- }
- else
- {
- lastCheck = now;
- }
- }
-
- // Send a ping
-
- Ping ping = new Ping(connectionTTL);
-
- Channel channel0 = connection.getChannel(0, -1);
-
- channel0.send(ping);
-
- connection.flush();
- }
-
- public synchronized void cancel()
- {
- cancelled = true;
- }
- }
-
-}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -35,6 +35,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
import org.hornetq.core.cluster.DiscoveryEntry;
@@ -58,18 +59,22 @@
private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
- private final boolean useHA;
+ private final boolean ha;
private final String discoveryAddress;
private final int discoveryPort;
- private final TransportConfiguration[] transportConfigs;
+ private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
private Set<ClientSessionFactory> factories = new
HashSet<ClientSessionFactory>();
- private Pair<TransportConfiguration, TransportConfiguration>[] topology;
+ private TransportConfiguration[] initialConnectors;
+ private Map<String, Pair<TransportConfiguration, TransportConfiguration>>
topology;
+
+ private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
+
private Map<TransportConfiguration, TransportConfiguration> pairs = new
HashMap<TransportConfiguration, TransportConfiguration>();
private boolean receivedTopology;
@@ -289,10 +294,7 @@
discoveryGroup.start();
}
- else
- {
- setTopologyFromStaticList();
- }
+
readOnly = true;
}
}
@@ -302,13 +304,13 @@
final int discoveryPort,
final TransportConfiguration[] transportConfigs)
{
- this.useHA = useHA;
+ this.ha = useHA;
this.discoveryAddress = discoveryAddress;
this.discoveryPort = discoveryPort;
- this.transportConfigs = transportConfigs;
+ this.initialConnectors = transportConfigs;
discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
@@ -390,6 +392,62 @@
this(useHA, null, -1, transportConfigs);
}
+ private TransportConfiguration selectConnector()
+ {
+ if (receivedTopology)
+ {
+ int pos = loadBalancingPolicy.select(topologyArray.length);
+
+ Pair<TransportConfiguration, TransportConfiguration> pair =
topologyArray[pos];
+
+ return pair.a;
+ }
+ else
+ {
+ // Get from initialconnectors
+
+ int pos = loadBalancingPolicy.select(initialConnectors.length);
+
+ return initialConnectors[pos];
+ }
+ }
+
+ public ClientSessionFactory createSessionFactory(final TransportConfiguration
transportConfiguration) throws Exception
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Cannot create session factory, server
locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to
initialise session factory", e);
+ }
+
+ ClientSessionFactory factory = new ClientSessionFactoryImpl(this,
+
transportConfiguration,
+
failoverOnServerShutdown,
+ callTimeout,
+
clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+
retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+
failoverOnInitialConnection,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+
+ factories.add(factory);
+
+ return factory;
+ }
+
public ClientSessionFactory createSessionFactory() throws Exception
{
if (closed)
@@ -406,7 +464,7 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to
initialise session factory", e);
}
- if (topology == null && discoveryGroup != null)
+ if (initialConnectors == null && discoveryGroup != null)
{
// Wait for an initial broadcast to give us at least one node in the cluster
@@ -420,7 +478,7 @@
}
ClientSessionFactory factory = null;
-
+
synchronized (this)
{
boolean retry;
@@ -428,17 +486,15 @@
do
{
retry = false;
-
- int pos = loadBalancingPolicy.select(topology.length);
-
- Pair<TransportConfiguration, TransportConfiguration> pair =
topology[pos];
-
+
+ TransportConfiguration tc = selectConnector();
+
// try each factory in the list until we find one which works
-
+
try
{
factory = new ClientSessionFactoryImpl(this,
- pair.a,
+ tc,
failoverOnServerShutdown,
callTimeout,
clientFailureCheckPeriod,
@@ -457,12 +513,13 @@
if (e.getCode() == HornetQException.NOT_CONNECTED)
{
attempts++;
-
- if (attempts == topology.length)
+
+ if (attempts == topologyArray.length)
{
- throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried
with all available servers.");
}
-
+
retry = true;
}
else
@@ -473,7 +530,7 @@
}
while (retry);
- if (useHA)
+ if (ha)
{
long toWait = 30000;
long start = System.currentTimeMillis();
@@ -483,7 +540,7 @@
try
{
- this.wait(toWait);
+ wait(toWait);
}
catch (InterruptedException ignore)
{
@@ -509,6 +566,11 @@
}
}
+ public synchronized boolean isHA()
+ {
+ return ha;
+ }
+
public synchronized boolean isCacheLargeMessagesClient()
{
return cacheLargeMessagesClient;
@@ -828,7 +890,7 @@
public TransportConfiguration[] getStaticTransportConfigurations()
{
- return transportConfigs;
+ return this.initialConnectors;
}
public synchronized long getDiscoveryRefreshTimeout()
@@ -954,85 +1016,120 @@
closed = true;
}
- public synchronized void onTopologyChanged(List<Pair<TransportConfiguration,
TransportConfiguration>> topology)
+ public synchronized void nodeDown(final String nodeID)
{
- if (!useHA)
+ if (!ha)
{
return;
}
- this.topology = topology.toArray(this.topology);
+ topology.remove(nodeID);
- this.pairs.clear();
+ if (!topology.isEmpty())
+ {
+ updateArraysAndPairs();
+ }
+ else
+ {
+ pairs.clear();
- for (Pair<TransportConfiguration, TransportConfiguration> pair : topology)
+ topologyArray = null;
+
+ receivedTopology = false;
+ }
+
+ for (ClusterTopologyListener listener : topologyListeners)
{
- if (pair.b != null)
- {
- pairs.put(pair.a, pair.b);
- }
+ listener.nodeDown(nodeID);
}
-
- receivedTopology = true;
}
- private void createTopologyArray(final int size)
+ public synchronized void nodeUP(final String nodeID,
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean last)
{
- topology = (Pair<TransportConfiguration,
TransportConfiguration>[])Array.newInstance(Pair.class, size);
+ if (!ha)
+ {
+ return;
+ }
+
+ topology.put(nodeID, connectorPair);
+
+ updateArraysAndPairs();
+
+ if (last)
+ {
+ receivedTopology = true;
+ }
+
+ for (ClusterTopologyListener listener : topologyListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last);
+ }
+
+ // Notify if waiting on getting topology
+
+ notify();
}
- public synchronized void connectorsChanged()
+ private void updateArraysAndPairs()
{
- if (receivedTopology)
+ topologyArray = (Pair<TransportConfiguration,
TransportConfiguration>[])Array.newInstance(Pair.class,
+
topology.size());
+
+ int count = 0;
+ for (Pair<TransportConfiguration, TransportConfiguration> pair :
topology.values())
{
- return;
+ if (pair.b != null)
+ {
+ pairs.put(pair.a, pair.b);
+ }
+
+ topologyArray[count++] = pair;
}
+ }
- Map<String, DiscoveryEntry> newConnectors =
discoveryGroup.getDiscoveryEntryMap();
+ public synchronized void connectorsChanged()
+ {
+ List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
- createTopologyArray(newConnectors.size());
+ this.initialConnectors = (TransportConfiguration[])Array.newInstance(Pair.class,
newConnectors.size());
- int i = 0;
- for (DiscoveryEntry entry : newConnectors.values())
+ int count = 0;
+ for (DiscoveryEntry entry : newConnectors)
{
- topology[i++] = new Pair<TransportConfiguration,
TransportConfiguration>(entry.getConnector(), null);
+ this.initialConnectors[count++] = entry.getConnector();
}
}
public synchronized void factoryClosed(final ClientSessionFactory factory)
{
- this.factories.remove(factory);
+ factories.remove(factory);
- if (this.factories.isEmpty())
+ if (factories.isEmpty())
{
// Go back to using the broadcast or static list
receivedTopology = false;
- if (transportConfigs != null)
- {
- setTopologyFromStaticList();
- }
- else
- {
- topology = null;
- }
+ topology = null;
+
}
}
- private void setTopologyFromStaticList()
+ public void registerTopologyListener(final ClusterTopologyListener listener)
{
- createTopologyArray(transportConfigs.length);
+ topologyListeners.add(listener);
+ }
- int i = 0;
- for (TransportConfiguration config : transportConfigs)
- {
- topology[i++] = new Pair<TransportConfiguration,
TransportConfiguration>(config, null);
- }
+ public void unregisterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topologyListeners.remove(listener);
}
public synchronized TransportConfiguration getBackup(final TransportConfiguration
live)
{
return pairs.get(live);
}
+
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -13,11 +13,9 @@
package org.hornetq.core.client.impl;
-import java.util.List;
-
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
/**
@@ -27,11 +25,9 @@
*
*
*/
-public interface ServerLocatorInternal extends ServerLocator
+public interface ServerLocatorInternal extends ServerLocator, ClusterTopologyListener
{
void factoryClosed(final ClientSessionFactory factory);
TransportConfiguration getBackup( TransportConfiguration live);
-
- void onTopologyChanged(List<Pair<TransportConfiguration,
TransportConfiguration>> clusterTopology);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryGroup.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryGroup.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryGroup.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -13,7 +13,7 @@
package org.hornetq.core.cluster;
-import java.util.Map;
+import java.util.List;
import org.hornetq.core.server.management.NotificationService;
@@ -32,7 +32,7 @@
String getName();
- Map<String, DiscoveryEntry> getDiscoveryEntryMap();
+ List<DiscoveryEntry> getDiscoveryEntries();
void start() throws Exception;
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -205,9 +205,13 @@
return name;
}
- public synchronized Map<String, DiscoveryEntry> getDiscoveryEntryMap()
+ public synchronized List<DiscoveryEntry> getDiscoveryEntries()
{
- return new HashMap<String, DiscoveryEntry>(connectors);
+ List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>();
+
+ list.addAll(connectors.values());
+
+ return list;
}
public boolean waitForBroadcast(final long timeout)
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BridgeConfiguration.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BridgeConfiguration.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -14,9 +14,8 @@
package org.hornetq.core.config;
import java.io.Serializable;
+import java.util.List;
-import org.hornetq.api.core.Pair;
-
/**
* A BridgeConfiguration
*
@@ -38,9 +37,11 @@
private String filterString;
- private Pair<String, String> connectorPair;
+ private List<String> staticConnectors;
private String discoveryGroupName;
+
+ private boolean ha;
private String transformerClassName;
@@ -74,7 +75,8 @@
final boolean useDuplicateDetection,
final int confirmationWindowSize,
final long clientFailureCheckPeriod,
- final Pair<String, String> connectorPair,
+ final List<String> staticConnectors,
+ final boolean ha,
final String user,
final String password)
{
@@ -90,7 +92,7 @@
this.useDuplicateDetection = useDuplicateDetection;
this.confirmationWindowSize = confirmationWindowSize;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- this.connectorPair = connectorPair;
+ this.staticConnectors = staticConnectors;
this.user = user;
this.password = password;
discoveryGroupName = null;
@@ -109,6 +111,7 @@
final int confirmationWindowSize,
final long clientFailureCheckPeriod,
final String discoveryGroupName,
+ final boolean ha,
final String user,
final String password)
{
@@ -124,8 +127,9 @@
this.useDuplicateDetection = useDuplicateDetection;
this.confirmationWindowSize = confirmationWindowSize;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- connectorPair = null;
+ this.staticConnectors = null;
this.discoveryGroupName = discoveryGroupName;
+ this.ha = ha;
this.user = user;
this.password = password;
}
@@ -155,15 +159,20 @@
return transformerClassName;
}
- public Pair<String, String> getConnectorPair()
+ public List<String> getStaticConnectors()
{
- return connectorPair;
+ return staticConnectors;
}
public String getDiscoveryGroupName()
{
return discoveryGroupName;
}
+
+ public boolean isHA()
+ {
+ return ha;
+ }
public long getRetryInterval()
{
@@ -233,11 +242,11 @@
}
/**
- * @param connectorPair the connectorPair to set
+ * @param staticConnectors the staticConnectors to set
*/
- public void setConnectorPair(final Pair<String, String> connectorPair)
+ public void setStaticConnectors(final List<String> staticConnectors)
{
- this.connectorPair = connectorPair;
+ this.staticConnectors = staticConnectors;
}
/**
@@ -247,6 +256,15 @@
{
this.discoveryGroupName = discoveryGroupName;
}
+
+ /**
+ *
+ * @param ha is the bridge supporting HA?
+ */
+ public void setHA(final boolean ha)
+ {
+ this.ha = ha;
+ }
/**
* @param transformerClassName the transformerClassName to set
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -16,8 +16,6 @@
import java.io.Serializable;
import java.util.List;
-import org.hornetq.api.core.Pair;
-
/**
* A ClusterConnectionConfiguration
*
@@ -41,7 +39,7 @@
private final boolean forwardWhenNoConsumers;
- private final List<Pair<String, String>> staticConnectorNamePairs;
+ private final List<String> staticConnectors;
private final String discoveryGroupName;
@@ -56,12 +54,12 @@
final boolean forwardWhenNoConsumers,
final int maxHops,
final int confirmationWindowSize,
- final List<Pair<String, String>>
staticConnectorNamePairs)
+ final List<String> staticConnectors)
{
this.name = name;
this.address = address;
this.retryInterval = retryInterval;
- this.staticConnectorNamePairs = staticConnectorNamePairs;
+ this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
discoveryGroupName = null;
@@ -84,7 +82,7 @@
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = discoveryGroupName;
- staticConnectorNamePairs = null;
+ this.staticConnectors = null;
this.maxHops = maxHops;
this.confirmationWindowSize = confirmationWindowSize;
}
@@ -119,9 +117,9 @@
return confirmationWindowSize;
}
- public List<Pair<String, String>> getStaticConnectorNamePairs()
+ public List<String> getStaticConnectors()
{
- return staticConnectorNamePairs;
+ return staticConnectors;
}
public String getDiscoveryGroupName()
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -961,8 +961,8 @@
String discoveryGroupName = null;
- List<Pair<String, String>> connectorPairs = new
ArrayList<Pair<String, String>>();
-
+ List<String> staticConnectorNames = new ArrayList<String>();
+
NodeList children = e.getChildNodes();
for (int j = 0; j < children.getLength(); j++)
@@ -973,22 +973,21 @@
{
discoveryGroupName =
child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
}
- else if (child.getNodeName().equals("connector-ref"))
+ else if (child.getNodeName().equals("static-connectors"))
{
- String connectorName =
child.getAttributes().getNamedItem("connector-name").getNodeValue();
+ NodeList children2 = child.getChildNodes();
+
+ for (int k = 0; k < children2.getLength(); k++)
+ {
+ Node child2 = children.item(k);
+
+ if (child2.getNodeName().equals("connector-ref"))
+ {
+ String connectorName =
child.getAttributes().getNamedItem("connector-name").getNodeValue();
- Node backupNode =
child.getAttributes().getNamedItem("backup-connector-name");
-
- String backupConnectorName = null;
-
- if (backupNode != null)
- {
- backupConnectorName = backupNode.getNodeValue();
+ staticConnectorNames.add(connectorName);
+ }
}
-
- Pair<String, String> connectorPair = new Pair<String,
String>(connectorName, backupConnectorName);
-
- connectorPairs.add(connectorPair);
}
}
@@ -1003,7 +1002,7 @@
forwardWhenNoConsumers,
maxHops,
confirmationWindowSize,
- connectorPairs);
+ staticConnectorNames);
}
else
{
@@ -1087,11 +1086,13 @@
"password",
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD,
Validators.NO_CHECK);
+
+ boolean ha = XMLConfigurationUtil.getBoolean(brNode, "ha", false);
String filterString = null;
+
+ List<String> staticConnectorNames = new ArrayList<String>();
- Pair<String, String> connectorPair = null;
-
String discoveryGroupName = null;
NodeList children = brNode.getChildNodes();
@@ -1108,26 +1109,27 @@
{
discoveryGroupName =
child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
}
- else if (child.getNodeName().equals("connector-ref"))
+ else if (child.getNodeName().equals("static-connectors"))
{
- String connectorName =
child.getAttributes().getNamedItem("connector-name").getNodeValue();
+ NodeList children2 = child.getChildNodes();
+
+ for (int k = 0; k < children2.getLength(); k++)
+ {
+ Node child2 = children.item(k);
+
+ if (child2.getNodeName().equals("connector-ref"))
+ {
+ String connectorName =
child.getAttributes().getNamedItem("connector-name").getNodeValue();
- Node backupNode =
child.getAttributes().getNamedItem("backup-connector-name");
-
- String backupConnectorName = null;
-
- if (backupNode != null)
- {
- backupConnectorName = backupNode.getNodeValue();
+ staticConnectorNames.add(connectorName);
+ }
}
-
- connectorPair = new Pair<String, String>(connectorName,
backupConnectorName);
}
}
BridgeConfiguration config;
- if (connectorPair != null)
+ if (!staticConnectorNames.isEmpty())
{
config = new BridgeConfiguration(name,
queueName,
@@ -1141,7 +1143,8 @@
useDuplicateDetection,
confirmationWindowSize,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- connectorPair,
+ staticConnectorNames,
+ ha,
user,
password);
}
@@ -1160,6 +1163,7 @@
confirmationWindowSize,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
discoveryGroupName,
+ ha,
user,
password);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -15,7 +15,6 @@
import javax.management.MBeanOperationInfo;
-import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.BridgeControl;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.persistence.StorageManager;
@@ -54,17 +53,12 @@
// BridgeControlMBean implementation ---------------------------
- public String[] getConnectorPair() throws Exception
+ public String[] getStaticConnectors() throws Exception
{
clearIO();
try
{
- String[] pair = new String[2];
-
- pair[0] = configuration.getConnectorPair().a;
- pair[1] = configuration.getConnectorPair().b != null ?
configuration.getConnectorPair().b : null;
-
- return pair;
+ return configuration.getStaticConnectors().toArray(new String[0]);
}
finally
{
@@ -227,6 +221,19 @@
blockOnIO();
}
}
+
+ public boolean isHA()
+ {
+ clearIO();
+ try
+ {
+ return configuration.isHA();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
public void start() throws Exception
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -18,14 +18,11 @@
import javax.management.MBeanOperationInfo;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ClusterConnectionControl;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.utils.json.JSONArray;
-import org.hornetq.utils.json.JSONObject;
/**
* A ClusterConnectionControl
@@ -141,32 +138,12 @@
}
}
- public Object[] getStaticConnectorNamePairs()
+ public String[] getStaticConnectors()
{
clearIO();
try
{
- List<Pair<String, String>> pairs =
configuration.getStaticConnectorNamePairs();
-
- if (pairs == null)
- {
- return null;
- }
-
- Object[] ret = new Object[pairs.size()];
-
- int i = 0;
- for (Pair<String, String> pair :
configuration.getStaticConnectorNamePairs())
- {
- String[] opair = new String[2];
-
- opair[0] = pair.a;
- opair[1] = pair.b != null ? pair.b : null;
-
- ret[i++] = opair;
- }
-
- return ret;
+ return configuration.getStaticConnectors().toArray(new String[0]);
}
finally
{
@@ -174,26 +151,23 @@
}
}
- public String getStaticConnectorNamePairsAsJSON() throws Exception
+ public String getStaticConnectorsAsJSON() throws Exception
{
clearIO();
try
{
- List<Pair<String, String>> pairs =
configuration.getStaticConnectorNamePairs();
+ List<String> connectors = configuration.getStaticConnectors();
- if (pairs == null)
+ if (connectors == null)
{
return null;
}
JSONArray array = new JSONArray();
-
- for (Pair<String, String> pair : pairs)
- {
- JSONObject p = new JSONObject();
- p.put("a", pair.a);
- p.put("b", pair.b);
- array.put(p);
+
+ for (String connector : connectors)
+ {
+ array.put(connector);
}
return array.toString();
}
Deleted:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -1,171 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management.impl;
-
-import javax.management.MBeanOperationInfo;
-
-import org.hornetq.api.core.management.AddressControl;
-import org.hornetq.api.core.management.DiscoveryGroupControl;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.config.DiscoveryGroupConfiguration;
-import org.hornetq.core.persistence.StorageManager;
-
-/**
- * A AcceptorControl
- *
- * @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
- * Created 11 dec. 2008 17:09:04
- */
-public class DiscoveryGroupControlImpl extends AbstractControl implements
DiscoveryGroupControl
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final DiscoveryGroup discoveryGroup;
-
- private final DiscoveryGroupConfiguration configuration;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public DiscoveryGroupControlImpl(final DiscoveryGroup acceptor,
- final StorageManager storageManager,
- final DiscoveryGroupConfiguration configuration)
throws Exception
- {
- super(DiscoveryGroupControl.class, storageManager);
- discoveryGroup = acceptor;
- this.configuration = configuration;
- }
-
- // DiscoveryGroupControlMBean implementation ---------------------------
-
- public String getName()
- {
- clearIO();
- try
- {
- return configuration.getName();
- }
- finally
- {
- blockOnIO();
- }
- }
-
- public String getGroupAddress()
- {
- clearIO();
- try
- {
- return configuration.getGroupAddress();
- }
- finally
- {
- blockOnIO();
- }
-
- }
-
- public int getGroupPort()
- {
- clearIO();
- try
- {
- return configuration.getGroupPort();
- }
- finally
- {
- blockOnIO();
- }
-
- }
-
- public long getRefreshTimeout()
- {
- clearIO();
- try
- {
- return configuration.getRefreshTimeout();
- }
- finally
- {
- blockOnIO();
- }
-
- }
-
- public boolean isStarted()
- {
- clearIO();
- try
- {
- return discoveryGroup.isStarted();
- }
- finally
- {
- blockOnIO();
- }
-
- }
-
- public void start() throws Exception
- {
- clearIO();
- try
- {
- discoveryGroup.start();
- }
- finally
- {
- blockOnIO();
- }
-
- }
-
- public void stop() throws Exception
- {
- clearIO();
- try
- {
- discoveryGroup.stop();
- }
- finally
- {
- blockOnIO();
- }
-
- }
-
- @Override
- MBeanOperationInfo[] fillMBeanOperationInfo()
- {
- return MBeanInfoHelper.getMBeanOperationsInfo(DiscoveryGroupControl.class);
- }
-
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -19,6 +19,9 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.protocol.core.Channel;
@@ -26,7 +29,11 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
+import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
@@ -97,8 +104,45 @@
// Just send a ping back
channel0.send(packet);
}
+ else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
+ {
+ SubscribeClusterTopologyUpdatesMessage msg =
(SubscribeClusterTopologyUpdatesMessage)packet;
+
+ final ClusterTopologyListener listener = new ClusterTopologyListener()
+ {
+ public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ }
+ };
+
+ final boolean isCC = msg.isClusterConnection();
+
+ server.getClusterManager().registerTopologyListener(listener, isCC);
+
+ rc.addCloseListener(new CloseListener()
+ {
+ public void connectionClosed()
+ {
+ server.getClusterManager().unregisterTopologyListener(listener,
isCC);
+ }
+ });
+ }
+ else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
+ {
+ NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
+
+ server.getClusterManager().announceNode(msg.getNodeID(), msg.isBackup(),
msg.getConnector());
+ }
}
});
+
+
return entry;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -21,6 +21,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -79,16 +80,18 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -143,6 +146,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
/**
* A PacketDecoder
@@ -490,9 +494,19 @@
}
case CLUSTER_TOPOLOGY:
{
- packet = new ClusterTopologyMessage();
+ packet = new ClusterTopologyChangeMessage();
break;
}
+ case NODE_ANNOUNCE:
+ {
+ packet = new NodeAnnounceMessage();
+ break;
+ }
+ case SUBSCRIBE_TOPOLOGY:
+ {
+ packet = new SubscribeClusterTopologyUpdatesMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -185,6 +185,10 @@
// HA
public static final byte CLUSTER_TOPOLOGY = 110;
+
+ public static final byte NODE_ANNOUNCE = 111;
+
+ public static final byte SUBSCRIBE_TOPOLOGY = 112;
// Static --------------------------------------------------------
Copied:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
(from rev 9308,
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java)
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class ClusterTopologyChangeMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log =
Logger.getLogger(ClusterTopologyChangeMessage.class);
+
+ // Attributes ----------------------------------------------------
+
+ private boolean exit;
+
+ private String nodeID;
+
+ private Pair<TransportConfiguration, TransportConfiguration> pair;
+
+ private boolean last;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ClusterTopologyChangeMessage(final String nodeID, final
Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY);
+
+ this.nodeID = nodeID;
+
+ this.pair = pair;
+
+ this.last = last;
+
+ this.exit = false;
+ }
+
+ public ClusterTopologyChangeMessage(final String nodeID)
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY);
+
+ this.exit = true;
+ }
+
+ public ClusterTopologyChangeMessage()
+ {
+ super(PacketImpl.CLUSTER_TOPOLOGY);
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public Pair<TransportConfiguration, TransportConfiguration> getPair()
+ {
+ return pair;
+ }
+
+ public boolean isLast()
+ {
+ return last;
+ }
+
+ public boolean isExit()
+ {
+ return exit;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeBoolean(exit);
+ buffer.writeString(nodeID);
+ if (!exit)
+ {
+ pair.a.encode(buffer);
+ if (pair.b != null)
+ {
+ buffer.writeBoolean(true);
+ pair.b.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
+ buffer.writeBoolean(last);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ exit = buffer.readBoolean();
+ nodeID = buffer.readString();
+ if (!exit)
+ {
+ TransportConfiguration a = new TransportConfiguration();
+ a.decode(buffer);
+ boolean hasBackup = buffer.readBoolean();
+ TransportConfiguration b;
+ if (hasBackup)
+ {
+ b = new TransportConfiguration();
+ b.decode(buffer);
+ }
+ else
+ {
+ b = null;
+ }
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+ last = buffer.readBoolean();
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyMessage.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -1,116 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public class ClusterTopologyMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ClusterTopologyMessage.class);
-
- // Attributes ----------------------------------------------------
-
- private List<Pair<TransportConfiguration, TransportConfiguration>>
topology;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ClusterTopologyMessage(final List<Pair<TransportConfiguration,
TransportConfiguration>> topology)
- {
- super(PacketImpl.CLUSTER_TOPOLOGY);
-
- this.topology = topology;
- }
-
- public ClusterTopologyMessage()
- {
- super(PacketImpl.CLUSTER_TOPOLOGY);
- }
-
- // Public --------------------------------------------------------
-
-
- public List<Pair<TransportConfiguration, TransportConfiguration>>
getTopology()
- {
- return topology;
- }
-
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeInt(topology.size());
- for (Pair<TransportConfiguration, TransportConfiguration> pair: topology)
- {
- pair.a.encode(buffer);
- if (pair.b != null)
- {
- buffer.writeBoolean(true);
- pair.b.encode(buffer);
- }
- else
- {
- buffer.writeBoolean(false);
- }
- }
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- int size = buffer.readInt();
- topology = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
- for (int i = 0; i < size; i++)
- {
- TransportConfiguration a = new TransportConfiguration();
- a.decode(buffer);
- boolean hasBackup = buffer.readBoolean();
- TransportConfiguration b;
- if (hasBackup)
- {
- b = new TransportConfiguration();
- b.decode(buffer);
- }
- else
- {
- b = null;
- }
- Pair<TransportConfiguration, TransportConfiguration> pair = new
Pair<TransportConfiguration, TransportConfiguration>(a, b);
- topology.add(pair);
- }
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -30,7 +30,9 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
@@ -371,9 +373,9 @@
if (config.isBackup())
{
serverSideReplicatingConnection = entry.connection;
- }
+ }
}
-
+
public void connectionDestroyed(final Object connectionID)
{
ConnectionEntry conn = connections.get(connectionID);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -22,7 +22,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.FailoverManager;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
@@ -67,7 +67,7 @@
private final ResponseHandler responseHandler = new ResponseHandler();
- private final FailoverManager failoverManager;
+ private final ClientSessionFactoryInternal sessionFactory;
private CoreRemotingConnection replicatingConnection;
@@ -89,10 +89,10 @@
// Constructors --------------------------------------------------
- public ReplicationManagerImpl(final FailoverManager failoverManager, final
ExecutorFactory executorFactory)
+ public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory, final
ExecutorFactory executorFactory)
{
super();
- this.failoverManager = failoverManager;
+ this.sessionFactory = sessionFactory;
this.executorFactory = executorFactory;
}
@@ -304,7 +304,7 @@
throw new IllegalStateException("ReplicationManager is already
started");
}
- replicatingConnection = failoverManager.getConnection();
+ replicatingConnection = sessionFactory.getConnection();
if (replicatingConnection == null)
{
@@ -353,7 +353,7 @@
{
}
};
- failoverManager.addFailureListener(failureListener);
+ sessionFactory.addFailureListener(failureListener);
started = true;
@@ -392,8 +392,8 @@
replicatingChannel.close();
}
- failoverManager.causeExit();
- failoverManager.removeFailureListener(failureListener);
+ sessionFactory.causeExit();
+ sessionFactory.removeFailureListener(failureListener);
if (replicatingConnection != null)
{
replicatingConnection.destroy();
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -15,7 +15,9 @@
import java.util.Map;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -47,4 +49,6 @@
int distance) throws Exception;
void activate();
+
+ Pair<TransportConfiguration, TransportConfiguration>[] getTopology();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -13,13 +13,12 @@
package org.hornetq.core.server.cluster;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -30,7 +29,7 @@
* Created 18 Nov 2008 09:23:26
*
*/
-public interface ClusterManager extends HornetQComponent
+public interface ClusterManager extends HornetQComponent, ClusterTopologyListener
{
Map<String, Bridge> getBridges();
@@ -46,9 +45,9 @@
void stopAnnouncement();
- List<Pair<TransportConfiguration, TransportConfiguration>>
getClusterTopology();
+ void registerTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
- void registerTopologyListener(ClusterTopologyListener listener);
+ void unregisterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
- void unregisterTopologyListener(ClusterTopologyListener listener);
+ void announceNode(String nodeID, boolean backup, TransportConfiguration connector);
}
Deleted:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterTopologyListener.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -1,32 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.cluster;
-
-import java.util.List;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-
-/**
- * A ClusterTopologyListener
- *
- * @author tim
- *
- *
- */
-public interface ClusterTopologyListener
-{
- void onTopologyChanged(List<Pair<TransportConfiguration,
TransportConfiguration>> clusterTopology);
-
-}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -14,41 +14,30 @@
package org.hornetq.core.server.cluster.impl;
import java.nio.ByteBuffer;
-import java.util.HashSet;
import java.util.LinkedList;
-import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
-import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.Bridge;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
@@ -56,7 +45,6 @@
import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
-import org.hornetq.utils.UUIDGenerator;
/**
* A Core BridgeImpl
@@ -68,6 +56,7 @@
*
*
*/
+
public class BridgeImpl implements Bridge, SessionFailureListener,
SendAcknowledgementHandler
{
// Constants -----------------------------------------------------
@@ -76,6 +65,8 @@
// Attributes ----------------------------------------------------
+ protected final ServerLocator serverLocator;
+
private final UUID nodeUUID;
private final SimpleString name;
@@ -94,7 +85,7 @@
private volatile ClientSessionFactory csf;
- private volatile ClientSessionInternal session;
+ protected volatile ClientSessionInternal session;
private volatile ClientProducer producer;
@@ -104,30 +95,6 @@
private volatile boolean active;
- private final Pair<TransportConfiguration, TransportConfiguration>
connectorPair;
-
- private final String discoveryAddress;
-
- private final int discoveryPort;
-
- private final long retryInterval;
-
- private final double retryIntervalMultiplier;
-
- private final int reconnectAttempts;
-
- private final boolean failoverOnServerShutdown;
-
- private final int confirmationWindowSize;
-
- private final SimpleString idsHeaderName;
-
- private final MessageFlowRecord flowRecord;
-
- private final SimpleString managementAddress;
-
- private final SimpleString managementNotificationAddress;
-
private final String user;
private final String password;
@@ -136,44 +103,29 @@
private NotificationService notificationService;
- private ClientConsumer notifConsumer;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- /**
- * discoveryAddress (+ discoveryPort) and connectorPair are mutually exclusive.
- * If discoveryAddress is != null, it will be used to create the bridge's client
session factory.
- * Otherwise, the connectorPair will be used
- */
- public BridgeImpl(final UUID nodeUUID,
+ public BridgeImpl(final ServerLocator serverLocator,
+ final UUID nodeUUID,
final SimpleString name,
final Queue queue,
- final String discoveryAddress,
- final int discoveryPort,
- final Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
final Executor executor,
final SimpleString filterString,
final SimpleString forwardingAddress,
final ScheduledExecutorService scheduledExecutor,
final Transformer transformer,
- final long retryInterval,
- final double retryIntervalMultiplier,
- final int reconnectAttempts,
- final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
- final int confirmationWindowSize,
- final SimpleString managementAddress,
- final SimpleString managementNotificationAddress,
final String user,
final String password,
- final MessageFlowRecord flowRecord,
final boolean activated,
final StorageManager storageManager) throws Exception
{
+ this.serverLocator = serverLocator;
+
this.nodeUUID = nodeUUID;
this.name = name;
@@ -190,39 +142,10 @@
this.useDuplicateDetection = useDuplicateDetection;
- if (!(confirmationWindowSize > 0))
- {
- throw new IllegalStateException("confirmation-window-size must be > 0
for a bridge");
- }
-
- this.confirmationWindowSize = confirmationWindowSize;
-
- this.discoveryAddress = discoveryAddress;
-
- this.discoveryPort = discoveryPort;
-
- this.connectorPair = connectorPair;
-
- this.retryInterval = retryInterval;
-
- this.retryIntervalMultiplier = retryIntervalMultiplier;
-
- this.reconnectAttempts = reconnectAttempts;
-
- this.failoverOnServerShutdown = failoverOnServerShutdown;
-
- idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
-
- this.managementAddress = managementAddress;
-
- this.managementNotificationAddress = managementNotificationAddress;
-
this.user = user;
this.password = password;
- this.flowRecord = flowRecord;
-
this.activated = activated;
}
@@ -242,7 +165,7 @@
if (activated)
{
- executor.execute(new CreateObjectsRunnable());
+ activate();
}
if (notificationService != null)
@@ -389,7 +312,39 @@
}
// Consumer implementation ---------------------------------------
+
+ /* Hook for processing message before forwarding */
+ protected ServerMessage beforeForward(ServerMessage message)
+ {
+ if (useDuplicateDetection &&
!message.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID))
+ {
+ // If we are using duplicate detection and there's not already a duplicate
detection header, then
+ // we add a header composed of the persistent node id and the message id, which
makes it globally unique
+ // between restarts.
+ // If you use a cluster connection then a guid based duplicate id will be used
since it is added *before*
+ // the
+ // message goes into the store and forward queue.
+ // But with this technique it also works when the messages don't already
have such a header in them in the
+ // queue.
+ byte[] bytes = new byte[24];
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ bb.put(nodeUUID.asBytes());
+
+ bb.putLong(message.getMessageID());
+
+ message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, bytes);
+ }
+
+ if (transformer != null)
+ {
+ message = transformer.transform(message);
+ }
+
+ return message;
+ }
+
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (filter != null && !filter.match(ref.getMessage()))
@@ -410,58 +365,8 @@
refs.add(ref);
- if (flowRecord != null)
- {
- // We make a copy of the message, then we strip out the unwanted routing id
headers and leave
- // only
- // the one pertinent for the address node - this is important since different
queues on different
- // nodes could have same queue ids
- // Note we must copy since same message may get routed to other nodes which
require different headers
- message = message.copy();
-
- // TODO - we can optimise this
-
- Set<SimpleString> propNames = new
HashSet<SimpleString>(message.getPropertyNames());
-
- byte[] queueIds = message.getBytesProperty(idsHeaderName);
-
- for (SimpleString propName : propNames)
- {
- if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
- {
- message.removeProperty(propName);
- }
- }
-
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- }
-
- if (useDuplicateDetection &&
!message.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID))
- {
- // If we are using duplicate detection and there's not already a
duplicate detection header, then
- // we add a header composed of the persistent node id and the message id,
which makes it globally unique
- // between restarts.
- // If you use a cluster connection then a guid based duplicate id will be
used since it is added *before*
- // the
- // message goes into the store and forward queue.
- // But with this technique it also works when the messages don't already
have such a header in them in the
- // queue.
- byte[] bytes = new byte[24];
-
- ByteBuffer bb = ByteBuffer.wrap(bytes);
-
- bb.put(nodeUUID.asBytes());
-
- bb.putLong(message.getMessageID());
-
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, bytes);
- }
-
- if (transformer != null)
- {
- message = transformer.transform(message);
- }
-
+ message = beforeForward(message);
+
SimpleString dest;
if (forwardingAddress != null)
@@ -539,7 +444,7 @@
}
else
{
- setupNotificationConsumer();
+ afterConnect();
active = true;
@@ -555,85 +460,21 @@
BridgeImpl.log.error("Failed to cancel refs", e);
}
}
-
- // TODO - we should move this code to the ClusterConnectorImpl - and just execute it
when the bridge
- // connection is opened and closed - we can use
- // a callback to tell us that
- private void setupNotificationConsumer() throws Exception
+
+ /* Hook for doing extra stuff after connection */
+ protected void afterConnect() throws Exception
{
- if (flowRecord != null)
- {
- flowRecord.reset();
+ //NOOP
+ }
- if (notifConsumer != null)
- {
- try
- {
- notifConsumer.close();
-
- notifConsumer = null;
- }
- catch (HornetQException e)
- {
- BridgeImpl.log.error("Failed to close consumer", e);
- }
- }
-
- // Get the queue data
-
- String qName = "notif." +
UUIDGenerator.getInstance().generateStringUUID();
-
- SimpleString notifQueueName = new SimpleString(qName);
-
- SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE +
"<>" +
- BindingType.DIVERT.toInt() +
- " AND " +
- ManagementHelper.HDR_NOTIFICATION_TYPE +
- " IN ('" +
- NotificationType.BINDING_ADDED +
- "','" +
- NotificationType.BINDING_REMOVED +
- "','" +
- NotificationType.CONSUMER_CREATED +
- "','" +
- NotificationType.CONSUMER_CLOSED +
- "','" +
- NotificationType.PROPOSAL +
- "','" +
- NotificationType.PROPOSAL_RESPONSE +
- "') AND " +
- ManagementHelper.HDR_DISTANCE +
- "<" +
- flowRecord.getMaxHops() +
- " AND (" +
- ManagementHelper.HDR_ADDRESS +
- " LIKE '" +
- flowRecord.getAddress() +
- "%')");
-
- session.createQueue(managementNotificationAddress, notifQueueName, filter,
false);
-
- notifConsumer = session.createConsumer(notifQueueName);
-
- notifConsumer.setMessageHandler(flowRecord);
-
- session.start();
-
- ClientMessage message = session.createMessage(false);
-
- ManagementHelper.putOperationInvocation(message,
- ResourceNames.CORE_SERVER,
- "sendQueueInfoToQueue",
- notifQueueName.toString(),
- flowRecord.getAddress());
-
- ClientProducer prod = session.createProducer(managementAddress);
-
- prod.send(message);
- }
+ /* Hook for creating session factory */
+ protected ClientSessionFactory createSessionFactory() throws Exception
+ {
+ return serverLocator.createSessionFactory();
}
- private synchronized boolean createObjects()
+ /* This is called only when the bridge is activated */
+ protected synchronized boolean createObjects()
{
if (!started)
{
@@ -647,26 +488,9 @@
BridgeImpl.log.info("Connecting bridge " + name + " to its
destination");
try
- {
- if (discoveryAddress != null)
- {
- csf = HornetQClient.createClientSessionFactory(discoveryAddress,
discoveryPort);
- }
- else
- {
- csf = HornetQClient.createClientSessionFactory(connectorPair.a,
connectorPair.b);
- }
+ {
+ csf = createSessionFactory();
- csf.setFailoverOnServerShutdown(failoverOnServerShutdown);
- csf.setRetryInterval(retryInterval);
- csf.setRetryIntervalMultiplier(retryIntervalMultiplier);
- csf.setReconnectAttempts(reconnectAttempts);
- csf.setBlockOnDurableSend(false);
-
- // Must have confirmations enabled so we get send acks
-
- csf.setConfirmationWindowSize(confirmationWindowSize);
-
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);
@@ -682,7 +506,7 @@
session.setSendAcknowledgementHandler(BridgeImpl.this);
- setupNotificationConsumer();
+ afterConnect();
active = true;
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -18,8 +18,6 @@
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
@@ -28,11 +26,10 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -48,7 +45,6 @@
import org.hornetq.core.server.group.impl.Response;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
-import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
@@ -62,7 +58,7 @@
*
*
*/
-public class ClusterConnectionImpl implements ClusterConnection, DiscoveryListener
+public class ClusterConnectionImpl implements ClusterConnection, ClusterTopologyListener
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
@@ -78,38 +74,32 @@
private final SimpleString address;
- private final long retryInterval;
-
private final boolean useDuplicateDetection;
- private final int confirmationWindowSize;
-
private final boolean routeWhenNoConsumers;
private final Map<String, MessageFlowRecord> records = new HashMap<String,
MessageFlowRecord>();
- private final DiscoveryGroup discoveryGroup;
-
private final ScheduledExecutorService scheduledExecutor;
private final int maxHops;
private final UUID nodeUUID;
- private final List<Pair<TransportConfiguration, TransportConfiguration>>
staticConnectors;
-
private boolean backup;
private volatile boolean started;
-
+
private final String clusterUser;
-
+
private final String clusterPassword;
- /*
- * Constructor using static list of connectors
- */
- public ClusterConnectionImpl(final SimpleString name,
+ private Pair<TransportConfiguration, TransportConfiguration>[] topology;
+
+ private final ServerLocator serverLocator;
+
+ public ClusterConnectionImpl(final ServerLocator serverLocator,
+ final SimpleString name,
final SimpleString address,
final long retryInterval,
final boolean useDuplicateDetection,
@@ -120,25 +110,22 @@
final PostOffice postOffice,
final ManagementService managementService,
final ScheduledExecutorService scheduledExecutor,
- final List<Pair<TransportConfiguration,
TransportConfiguration>> connectors,
final int maxHops,
final UUID nodeUUID,
final boolean backup,
final String clusterUser,
final String clusterPassword) throws Exception
{
+ this.serverLocator = serverLocator;
+
this.name = name;
this.address = address;
- this.retryInterval = retryInterval;
-
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
- this.confirmationWindowSize = confirmationWindowSize;
-
this.executorFactory = executorFactory;
this.server = server;
@@ -147,8 +134,6 @@
this.managementService = managementService;
- discoveryGroup = null;
-
this.scheduledExecutor = scheduledExecutor;
this.maxHops = maxHops;
@@ -162,74 +147,9 @@
this.backup = backup;
- staticConnectors = connectors;
-
this.clusterUser = clusterUser;
-
- this.clusterPassword = clusterPassword;
- if (!backup)
- {
- updateFromStaticConnectors(connectors);
- }
- }
-
- /*
- * Constructor using discovery to get connectors
- */
- public ClusterConnectionImpl(final SimpleString name,
- final SimpleString address,
- final long retryInterval,
- final boolean useDuplicateDetection,
- final boolean routeWhenNoConsumers,
- final int confirmationWindowSize,
- final ExecutorFactory executorFactory,
- final HornetQServer server,
- final PostOffice postOffice,
- final ManagementService managementService,
- final ScheduledExecutorService scheduledExecutor,
- final DiscoveryGroup discoveryGroup,
- final int maxHops,
- final UUID nodeUUID,
- final boolean backup,
- final String clusterUser,
- final String clusterPassword) throws Exception
- {
- this.name = name;
-
- this.address = address;
-
- this.retryInterval = retryInterval;
-
- this.executorFactory = executorFactory;
-
- this.server = server;
-
- this.postOffice = postOffice;
-
- this.managementService = managementService;
-
- this.scheduledExecutor = scheduledExecutor;
-
- this.discoveryGroup = discoveryGroup;
-
- this.useDuplicateDetection = useDuplicateDetection;
-
- this.routeWhenNoConsumers = routeWhenNoConsumers;
-
- this.confirmationWindowSize = confirmationWindowSize;
-
- this.maxHops = maxHops;
-
- this.nodeUUID = nodeUUID;
-
- this.backup = backup;
-
- this.clusterUser = clusterUser;
-
this.clusterPassword = clusterPassword;
-
- staticConnectors = null;
}
public synchronized void start() throws Exception
@@ -239,10 +159,7 @@
return;
}
- if (discoveryGroup != null)
- {
- discoveryGroup.registerListener(this);
- }
+ serverLocator.registerTopologyListener(this);
started = true;
@@ -264,10 +181,7 @@
return;
}
- if (discoveryGroup != null)
- {
- discoveryGroup.unregisterListener(this);
- }
+ serverLocator.unregisterTopologyListener(this);
for (MessageFlowRecord record : records.values())
{
@@ -293,6 +207,11 @@
started = false;
}
+ public Pair<TransportConfiguration, TransportConfiguration>[] getTopology()
+ {
+ return topology;
+ }
+
public boolean isStarted()
{
return started;
@@ -329,86 +248,45 @@
}
backup = false;
+ }
- if (discoveryGroup != null)
+ // ClusterTopologyListener implementation
------------------------------------------------------------------
+
+ public synchronized void nodeDown(final String nodeID)
+ {
+ server.getClusterManager().nodeDown(nodeID);
+
+ MessageFlowRecord record = records.remove(nodeID);
+
+ if (record != null)
{
- connectorsChanged();
- }
- else
- {
try
{
- updateFromStaticConnectors(staticConnectors);
+ record.close();
}
catch (Exception e)
{
- ClusterConnectionImpl.log.error("Failed to update connectors", e);
+ log.error("Failed to close flow record", e);
}
}
}
- // DiscoveryListener implementation
------------------------------------------------------------------
-
- public synchronized void connectorsChanged()
+ public synchronized void nodeUP(final String nodeID,
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean last)
{
- if (backup)
- {
- return;
- }
-
try
{
- Map<String, DiscoveryEntry> connectors =
discoveryGroup.getDiscoveryEntryMap();
+ server.getClusterManager().nodeUP(nodeID, connectorPair, false);
- updateConnectors(connectors);
- }
- catch (Exception e)
- {
- ClusterConnectionImpl.log.error("Failed to update connectors", e);
- }
- }
+ MessageFlowRecord record = records.get(nodeID);
- private void updateFromStaticConnectors(final List<Pair<TransportConfiguration,
TransportConfiguration>> connectors) throws Exception
- {
- Map<String, DiscoveryEntry> map = new HashMap<String,
DiscoveryEntry>();
-
- // TODO - we fudge the node id - it's never updated anyway
- int i = 0;
- for (Pair<TransportConfiguration, TransportConfiguration> connectorPair :
connectors)
- {
- map.put(String.valueOf(i++), new DiscoveryEntry(connectorPair, 0));
- }
-
- updateConnectors(map);
- }
-
- private void updateConnectors(final Map<String, DiscoveryEntry> connectors)
throws Exception
- {
- Iterator<Map.Entry<String, MessageFlowRecord>> iter =
records.entrySet().iterator();
-
- while (iter.hasNext())
- {
- Map.Entry<String, MessageFlowRecord> entry = iter.next();
-
- if (!connectors.containsKey(entry.getKey()))
+ if (record == null)
{
- // Connector no longer there - we should remove and close it - we don't
delete the queue though - it may
- // have messages - this is up to the administrator to do this
+ // New node - create a new flow record
- entry.getValue().close();
+ final SimpleString queueName = new SimpleString("sf." + name +
"." + nodeID);
- iter.remove();
- }
- }
-
- for (final Map.Entry<String, DiscoveryEntry> entry : connectors.entrySet())
- {
- if (!records.containsKey(entry.getKey()))
- {
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
entry.getValue().getConnectorPair();
-
- final SimpleString queueName = new SimpleString("sf." + name +
"." + entry.getKey());
-
Binding queueBinding = postOffice.getBinding(queueName);
Queue queue;
@@ -417,7 +295,7 @@
{
queue = (Queue)queueBinding.getBindable();
- createNewRecord(entry.getKey(), connectorPair, queueName, queue, true);
+ createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
else
{
@@ -426,44 +304,49 @@
queue = server.createQueue(queueName, queueName, null, true, false);
- createNewRecord(entry.getKey(), connectorPair, queueName, queue, true);
+ createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
}
+ else
+ {
+ if
(!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
+ {
+ // New live node - close it and recreate it - TODO - CAN THIS EVER
HAPPEN?
+ }
+ }
}
+ catch (Exception e)
+ {
+ log.error("Failed to update topology", e);
+ }
}
private void createNewRecord(final String nodeID,
- final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue,
final boolean start) throws Exception
{
MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
- Bridge bridge = new BridgeImpl(nodeUUID,
- queueName,
- queue,
- null,
- -1,
- connectorPair,
- executorFactory.getExecutor(),
- null,
- null,
- scheduledExecutor,
- null,
- retryInterval,
- 1d,
- -1,
- false,
- useDuplicateDetection,
- confirmationWindowSize,
- managementService.getManagementAddress(),
-
managementService.getManagementNotificationAddress(),
- clusterUser,
- clusterPassword,
- record,
- !backup,
- server.getStorageManager());
+ Bridge bridge = new ClusterConnectionBridge(serverLocator,
+ nodeUUID,
+ queueName,
+ queue,
+ executorFactory.getExecutor(),
+ null,
+ null,
+ scheduledExecutor,
+ null,
+ useDuplicateDetection,
+ clusterUser,
+ clusterPassword,
+ !backup,
+ server.getStorageManager(),
+
managementService.getManagementAddress(),
+
managementService.getManagementNotificationAddress(),
+ record,
+ connector);
record.setBridge(bridge);
@@ -576,11 +459,17 @@
break;
}
case PROPOSAL:
+ {
doProposalReceived(message);
+
break;
+ }
case PROPOSAL_RESPONSE:
+ {
doProposalResponseReceived(message);
+
break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type " + ntype);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -15,8 +15,8 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.lang.reflect.Array;
import java.net.InetAddress;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -28,8 +28,9 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -44,7 +45,6 @@
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
-import org.hornetq.core.server.cluster.ClusterTopologyListener;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.utils.ConcurrentHashSet;
@@ -66,8 +66,6 @@
private final Map<String, BroadcastGroup> broadcastGroups = new
HashMap<String, BroadcastGroup>();
- private final Map<String, DiscoveryGroup> discoveryGroups = new
HashMap<String, DiscoveryGroup>();
-
private final Map<String, Bridge> bridges = new HashMap<String,
Bridge>();
private final Map<String, ClusterConnection> clusterConnections = new
HashMap<String, ClusterConnection>();
@@ -140,15 +138,11 @@
deployBroadcastGroup(config);
}
- for (DiscoveryGroupConfiguration config :
configuration.getDiscoveryGroupConfigurations().values())
- {
- deployDiscoveryGroup(config);
- }
-
for (ClusterConnectionConfiguration config :
configuration.getClusterConfigurations())
{
deployClusterConnection(config);
}
+
}
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
@@ -174,12 +168,6 @@
managementService.unregisterBroadcastGroup(group.getName());
}
- for (DiscoveryGroup group : discoveryGroups.values())
- {
- group.stop();
- managementService.unregisterDiscoveryGroup(group.getName());
- }
-
for (ClusterConnection clusterConnection : clusterConnections.values())
{
clusterConnection.stop();
@@ -187,8 +175,6 @@
}
broadcastGroups.clear();
-
- discoveryGroups.clear();
}
for (Bridge bridge : bridges.values())
@@ -246,36 +232,124 @@
backup = false;
}
-
+
public void startAnnouncement()
{
-
+
}
-
+
public void stopAnnouncement()
{
-
+
}
-
- private Set<ClusterTopologyListener> listeners = new
ConcurrentHashSet<ClusterTopologyListener>();
-
- private List<Pair<TransportConfiguration, TransportConfiguration>>
topology;
-
- public List<Pair<TransportConfiguration, TransportConfiguration>>
getClusterTopology()
+
+ private Set<ClusterTopologyListener> clientListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
+
+ private Set<ClusterTopologyListener> clusterConnectionListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
+
+ private Map<String, Pair<TransportConfiguration, TransportConfiguration>>
topology;
+
+ public synchronized void registerTopologyListener(final ClusterTopologyListener
listener,
+ final boolean clusterConnection)
{
- return topology;
+ if (clusterConnection)
+ {
+ this.clusterConnectionListeners.add(listener);
+ }
+ else
+ {
+ this.clientListeners.add(listener);
+ }
+
+ // We now need to send the current topology to the client
+
+ int count = 0;
+ for (Map.Entry<String, Pair<TransportConfiguration,
TransportConfiguration>> entry : topology.entrySet())
+ {
+ listener.nodeUP(entry.getKey(), entry.getValue(), ++count == topology.size());
+ }
}
-
- public void registerTopologyListener(final ClusterTopologyListener listener)
+
+ public synchronized void unregisterTopologyListener(final ClusterTopologyListener
listener,
+ final boolean clusterConnection)
{
- listeners.add(listener);
+ if (clusterConnection)
+ {
+ this.clusterConnectionListeners.remove(listener);
+ }
+ else
+ {
+ this.clientListeners.remove(listener);
+ }
}
-
- public void unregisterTopologyListener(final ClusterTopologyListener listener)
+
+ public synchronized void announceNode(final String nodeID,
+ final boolean backup,
+ final TransportConfiguration connector)
{
- listeners.remove(listener);
+ Pair<TransportConfiguration, TransportConfiguration> pair =
topology.get(nodeID);
+
+ if (pair == null)
+ {
+ if (backup)
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(null,
connector);
+ }
+ else
+ {
+ pair = new Pair<TransportConfiguration,
TransportConfiguration>(connector, null);
+ }
+
+ topology.put(nodeID, pair);
+ }
+ else
+ {
+ if (backup)
+ {
+ pair.b = connector;
+ }
+ else
+ {
+ pair.a = connector;
+ }
+ }
+
+ // Propagate the announcement
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeUP(nodeID, pair, false);
+ }
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeUP(nodeID, pair, false);
+ }
+
}
+ public synchronized void nodeDown(final String nodeID)
+ {
+ topology.remove(nodeID);
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
+ }
+
+ public synchronized void nodeUP(final String nodeID,
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean last)
+ {
+ topology.put(nodeID, connectorPair);
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, false);
+ }
+ }
+
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration
config) throws Exception
{
if (broadcastGroups.containsKey(config.getName()))
@@ -338,41 +412,27 @@
"' will not be deployed.");
}
- private synchronized void deployDiscoveryGroup(final DiscoveryGroupConfiguration
config) throws Exception
+ private TransportConfiguration[] connectorNameListToArray(final List<String>
connectorNames)
{
- if (discoveryGroups.containsKey(config.getName()))
+ TransportConfiguration[] tcConfigs =
(TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+
connectorNames.size());
+ int count = 0;
+ for (String connectorName : connectorNames)
{
- ClusterManagerImpl.log.warn("There is already a discovery-group with name
" + config.getName() +
- " deployed. This one will not be
deployed.");
+ TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorName);
- return;
- }
+ if (connector == null)
+ {
+ ClusterManagerImpl.log.warn("No connector defined with name '"
+ connectorName +
+ "'. The bridge will not be
deployed.");
- InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
-
- InetAddress localBindAddress;
-
- if (config.getLocalBindAddress() != null)
- {
- localBindAddress = InetAddress.getByName(config.getLocalBindAddress());
+ return null;
+ }
+
+ tcConfigs[count++] = connector;
}
- else
- {
- localBindAddress = null;
- }
- DiscoveryGroup group = new DiscoveryGroupImpl(nodeUUID.toString(),
- config.getName(),
- localBindAddress,
- groupAddress,
- config.getGroupPort(),
- config.getRefreshTimeout());
-
- discoveryGroups.put(config.getName(), group);
-
- managementService.registerDiscoveryGroup(group, config);
-
- group.start();
+ return tcConfigs;
}
private synchronized void deployBridge(final BridgeConfiguration config) throws
Exception
@@ -406,8 +466,6 @@
Transformer transformer =
instantiateTransformer(config.getTransformerClassName());
- Pair<String, String> connectorNamePair = config.getConnectorPair();
-
Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName()));
if (binding == null)
@@ -420,7 +478,7 @@
Queue queue = (Queue)binding.getBindable();
- Bridge bridge = null;
+ ServerLocator serverLocator;
if (config.getDiscoveryGroupName() != null)
{
@@ -434,86 +492,60 @@
return;
}
- bridge = new BridgeImpl(nodeUUID,
- new SimpleString(config.getName()),
- queue,
- discoveryGroupConfiguration.getGroupAddress(),
- discoveryGroupConfiguration.getGroupPort(),
- null,
- executorFactory.getExecutor(),
- SimpleString.toSimpleString(config.getFilterString()),
- new SimpleString(config.getForwardingAddress()),
- scheduledExecutor,
- transformer,
- config.getRetryInterval(),
- config.getRetryIntervalMultiplier(),
- config.getReconnectAttempts(),
- config.isFailoverOnServerShutdown(),
- config.isUseDuplicateDetection(),
- config.getConfirmationWindowSize(),
- managementService.getManagementAddress(),
- managementService.getManagementNotificationAddress(),
- config.getUser(),
- config.getPassword(),
- null,
- !backup,
- server.getStorageManager());
+ if (config.isHA())
+ {
+ serverLocator =
HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration.getGroupAddress(),
+
discoveryGroupConfiguration.getGroupPort());
+ }
+ else
+ {
+ serverLocator =
HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration.getGroupAddress(),
+
discoveryGroupConfiguration.getGroupPort());
+ }
+
}
else
{
- TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorNamePair.a);
+ TransportConfiguration[] tcConfigs =
connectorNameListToArray(config.getStaticConnectors());
- if (connector == null)
+ if (tcConfigs == null)
{
- ClusterManagerImpl.log.warn("No connector defined with name '"
+ connectorNamePair.a +
- "'. The bridge will not be
deployed.");
-
return;
}
- TransportConfiguration backupConnector = null;
-
- if (connectorNamePair.b != null)
+ if (config.isHA())
{
- backupConnector =
configuration.getConnectorConfigurations().get(connectorNamePair.b);
-
- if (backupConnector == null)
- {
- ClusterManagerImpl.log.warn("No connector defined with name
'" + connectorNamePair.b +
- "'. The bridge will not be
deployed.");
-
- return;
- }
+ serverLocator = HornetQClient.createServerLocatorWithHA(tcConfigs);
}
+ else
+ {
+ serverLocator = HornetQClient.createServerLocatorWithoutHA(tcConfigs);
+ }
- Pair<TransportConfiguration, TransportConfiguration> pair = new
Pair<TransportConfiguration, TransportConfiguration>(connector,
-
backupConnector);
- bridge = new BridgeImpl(nodeUUID,
- new SimpleString(config.getName()),
- queue,
- null,
- -1,
- pair,
- executorFactory.getExecutor(),
- SimpleString.toSimpleString(config.getFilterString()),
-
SimpleString.toSimpleString(config.getForwardingAddress()),
- scheduledExecutor,
- transformer,
- config.getRetryInterval(),
- config.getRetryIntervalMultiplier(),
- config.getReconnectAttempts(),
- config.isFailoverOnServerShutdown(),
- config.isUseDuplicateDetection(),
- config.getConfirmationWindowSize(),
- managementService.getManagementAddress(),
- managementService.getManagementNotificationAddress(),
- server.getConfiguration().getClusterUser(),
- server.getConfiguration().getClusterPassword(),
- null,
- !backup,
- server.getStorageManager());
}
+ serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
+ serverLocator.setFailoverOnServerShutdown(config.isFailoverOnServerShutdown());
+ serverLocator.setReconnectAttempts(config.getReconnectAttempts());
+ serverLocator.setRetryInterval(config.getRetryInterval());
+ serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
+ serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
+
+ Bridge bridge = new BridgeImpl(serverLocator,
+ nodeUUID,
+ new SimpleString(config.getName()),
+ queue,
+ executorFactory.getExecutor(),
+
SimpleString.toSimpleString(config.getFilterString()),
+ new SimpleString(config.getForwardingAddress()),
+ scheduledExecutor,
+ transformer,
+ config.isUseDuplicateDetection(),
+ config.getUser(),
+ config.getPassword(),
+ !backup,
+ server.getStorageManager());
+
bridges.put(config.getName(), bridge);
managementService.registerBridge(bridge, config);
@@ -537,66 +569,18 @@
return;
}
- ClusterConnection clusterConnection;
+ ServerLocator serverLocator;
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors =
new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
-
- if (config.getStaticConnectorNamePairs() != null)
+ if (config.getStaticConnectors() != null)
{
- for (Pair<String, String> connectorNamePair :
config.getStaticConnectorNamePairs())
- {
- TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorNamePair.a);
+ TransportConfiguration[] tcConfigs =
connectorNameListToArray(config.getStaticConnectors());
- if (connector == null)
- {
- ClusterManagerImpl.log.warn("No connector defined with name
'" + connectorNamePair.a +
- "'. The cluster connection will not
be deployed.");
-
- return;
- }
-
- TransportConfiguration backupConnector = null;
-
- if (connectorNamePair.b != null)
- {
- backupConnector =
configuration.getConnectorConfigurations().get(connectorNamePair.b);
-
- if (backupConnector == null)
- {
- ClusterManagerImpl.log.warn("No connector defined with name
'" + connectorNamePair.b +
- "'. The cluster connection will
not be deployed.");
-
- return;
- }
- }
-
- Pair<TransportConfiguration, TransportConfiguration> pair = new
Pair<TransportConfiguration, TransportConfiguration>(connector,
-
backupConnector);
-
- connectors.add(pair);
- }
-
- clusterConnection = new ClusterConnectionImpl(new
SimpleString(config.getName()),
- new
SimpleString(config.getAddress()),
- config.getRetryInterval(),
- config.isDuplicateDetection(),
-
config.isForwardWhenNoConsumers(),
-
config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- connectors,
- config.getMaxHops(),
- nodeUUID,
- backup,
-
server.getConfiguration().getClusterUser(),
-
server.getConfiguration().getClusterPassword());
+ serverLocator = HornetQClient.createServerLocatorWithHA(tcConfigs);
}
else
{
- DiscoveryGroup dg = discoveryGroups.get(config.getDiscoveryGroupName());
+ DiscoveryGroupConfiguration dg =
configuration.getDiscoveryGroupConfigurations()
+
.get(config.getDiscoveryGroupName());
if (dg == null)
{
@@ -604,25 +588,27 @@
"'. The cluster connection will not be
deployed.");
}
- clusterConnection = new ClusterConnectionImpl(new
SimpleString(config.getName()),
- new
SimpleString(config.getAddress()),
- config.getRetryInterval(),
- config.isDuplicateDetection(),
-
config.isForwardWhenNoConsumers(),
-
config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- dg,
- config.getMaxHops(),
- nodeUUID,
- backup,
-
server.getConfiguration().getClusterUser(),
-
server.getConfiguration().getClusterPassword());
+ serverLocator = HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(),
dg.getGroupPort());
}
+ ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
+ new
SimpleString(config.getName()),
+ new
SimpleString(config.getAddress()),
+
config.getRetryInterval(),
+
config.isDuplicateDetection(),
+
config.isForwardWhenNoConsumers(),
+
config.getConfirmationWindowSize(),
+ executorFactory,
+ server,
+ postOffice,
+ managementService,
+ scheduledExecutor,
+
config.getMaxHops(),
+ nodeUUID,
+ backup,
+
server.getConfiguration().getClusterUser(),
+
server.getConfiguration().getClusterPassword());
+
managementService.registerCluster(clusterConnection, config);
clusterConnections.put(config.getName(), clusterConnection);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/management/ManagementService.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/management/ManagementService.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/management/ManagementService.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -114,9 +114,9 @@
void unregisterBroadcastGroup(String name) throws Exception;
- void registerDiscoveryGroup(DiscoveryGroup discoveryGroup, DiscoveryGroupConfiguration
configuration) throws Exception;
+ // void registerDiscoveryGroup(DiscoveryGroup discoveryGroup,
DiscoveryGroupConfiguration configuration) throws Exception;
- void unregisterDiscoveryGroup(String name) throws Exception;
+ //void unregisterDiscoveryGroup(String name) throws Exception;
void registerBridge(Bridge bridge, BridgeConfiguration configuration) throws
Exception;
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -34,17 +34,14 @@
import org.hornetq.api.core.management.BridgeControl;
import org.hornetq.api.core.management.BroadcastGroupControl;
import org.hornetq.api.core.management.ClusterConnectionControl;
-import org.hornetq.api.core.management.DiscoveryGroupControl;
import org.hornetq.api.core.management.DivertControl;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.ObjectNameBuilder;
import org.hornetq.api.core.management.ResourceNames;
-import org.hornetq.core.cluster.DiscoveryGroup;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.DiscoveryGroupConfiguration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.AcceptorControlImpl;
@@ -52,7 +49,6 @@
import org.hornetq.core.management.impl.BridgeControlImpl;
import org.hornetq.core.management.impl.BroadcastGroupControlImpl;
import org.hornetq.core.management.impl.ClusterConnectionControlImpl;
-import org.hornetq.core.management.impl.DiscoveryGroupControlImpl;
import org.hornetq.core.management.impl.DivertControlImpl;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.management.impl.QueueControlImpl;
@@ -357,23 +353,6 @@
unregisterFromRegistry(ResourceNames.CORE_BROADCAST_GROUP + name);
}
- public synchronized void registerDiscoveryGroup(final DiscoveryGroup discoveryGroup,
- final DiscoveryGroupConfiguration
configuration) throws Exception
- {
- discoveryGroup.setNotificationService(this);
- ObjectName objectName =
objectNameBuilder.getDiscoveryGroupObjectName(configuration.getName());
- DiscoveryGroupControl control = new DiscoveryGroupControlImpl(discoveryGroup,
storageManager, configuration);
- registerInJMX(objectName, new StandardMBean(control,
DiscoveryGroupControl.class));
- registerInRegistry(ResourceNames.CORE_DISCOVERY_GROUP + configuration.getName(),
control);
- }
-
- public synchronized void unregisterDiscoveryGroup(final String name) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getDiscoveryGroupObjectName(name);
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_DISCOVERY_GROUP + name);
- }
-
public synchronized void registerBridge(final Bridge bridge, final BridgeConfiguration
configuration) throws Exception
{
bridge.setNotificationService(this);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -483,7 +483,7 @@
isXA,
false,
false,
- sessionFactory.isPreAcknowledge(),
+
sessionFactory.getServerLocator().isPreAcknowledge(),
transactionBatchSize);
}
else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
@@ -493,7 +493,7 @@
isXA,
true,
true,
- sessionFactory.isPreAcknowledge(),
+
sessionFactory.getServerLocator().isPreAcknowledge(),
0);
}
else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
@@ -503,7 +503,7 @@
isXA,
true,
true,
- sessionFactory.isPreAcknowledge(),
+
sessionFactory.getServerLocator().isPreAcknowledge(),
dupsOKBatchSize);
}
else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
@@ -513,7 +513,7 @@
isXA,
true,
false,
- sessionFactory.isPreAcknowledge(),
+
sessionFactory.getServerLocator().isPreAcknowledge(),
transactionBatchSize);
}
else if (acknowledgeMode == HornetQJMSConstants.PRE_ACKNOWLEDGE)
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -14,7 +14,6 @@
package org.hornetq.jms.client;
import java.io.Serializable;
-import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -33,10 +32,10 @@
import javax.naming.Reference;
import javax.naming.Referenceable;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.referenceable.ConnectionFactoryObjectFactory;
import org.hornetq.jms.referenceable.SerializableObjectRefAddr;
@@ -61,7 +60,7 @@
// Attributes
-----------------------------------------------------------------------------------
- private final ClientSessionFactory sessionFactory;
+ private final ServerLocator serverLocator;
private String clientID;
@@ -75,35 +74,33 @@
public HornetQConnectionFactory()
{
- sessionFactory = HornetQClient.createClientSessionFactory();
+ serverLocator = null;
}
- public HornetQConnectionFactory(final ClientSessionFactory sessionFactory)
+ public HornetQConnectionFactory(final boolean ha, final String discoveryAddress, final
int discoveryPort)
{
- this.sessionFactory = sessionFactory;
+ if (ha)
+ {
+ serverLocator = HornetQClient.createServerLocatorWithHA(discoveryAddress,
discoveryPort);
+ }
+ else
+ {
+ serverLocator = HornetQClient.createServerLocatorWithoutHA(discoveryAddress,
discoveryPort);
+ }
}
- public HornetQConnectionFactory(final String discoveryAddress, final int
discoveryPort)
+ public HornetQConnectionFactory(final boolean ha, final TransportConfiguration...
initialConnectors)
{
- sessionFactory = HornetQClient.createClientSessionFactory(discoveryAddress,
discoveryPort);
+ if (ha)
+ {
+ serverLocator = HornetQClient.createServerLocatorWithHA(initialConnectors);
+ }
+ else
+ {
+ serverLocator = HornetQClient.createServerLocatorWithoutHA(initialConnectors);
+ }
}
- public HornetQConnectionFactory(final List<Pair<TransportConfiguration,
TransportConfiguration>> staticConnectors)
- {
- sessionFactory = HornetQClient.createClientSessionFactory(staticConnectors);
- }
-
- public HornetQConnectionFactory(final TransportConfiguration connectorConfig,
- final TransportConfiguration backupConnectorConfig)
- {
- sessionFactory = HornetQClient.createClientSessionFactory(connectorConfig,
backupConnectorConfig);
- }
-
- public HornetQConnectionFactory(final TransportConfiguration connectorConfig)
- {
- this(connectorConfig, null);
- }
-
// ConnectionFactory implementation
-------------------------------------------------------------
public Connection createConnection() throws JMSException
@@ -190,79 +187,61 @@
public synchronized String getConnectionLoadBalancingPolicyClassName()
{
- return sessionFactory.getConnectionLoadBalancingPolicyClassName();
+ return serverLocator.getConnectionLoadBalancingPolicyClassName();
}
public synchronized void setConnectionLoadBalancingPolicyClassName(final String
connectionLoadBalancingPolicyClassName)
{
checkWrite();
-
sessionFactory.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName);
+
serverLocator.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName);
}
- public synchronized List<Pair<TransportConfiguration,
TransportConfiguration>> getStaticConnectors()
+ public synchronized TransportConfiguration[] getStaticConnectors()
{
- return sessionFactory.getStaticConnectors();
+ return serverLocator.getStaticTransportConfigurations();
}
- public synchronized void setStaticConnectors(final
List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
- {
- checkWrite();
- sessionFactory.setStaticConnectors(staticConnectors);
- }
-
public synchronized String getLocalBindAddress()
{
- return sessionFactory.getLocalBindAddress();
+ return serverLocator.getLocalBindAddress();
}
public synchronized void setLocalBindAddress(final String localBindAddress)
{
checkWrite();
- sessionFactory.setLocalBindAddress(localBindAddress);
+ serverLocator.setLocalBindAddress(localBindAddress);
}
public synchronized String getDiscoveryAddress()
{
- return sessionFactory.getDiscoveryAddress();
+ return serverLocator.getDiscoveryAddress();
}
- public synchronized void setDiscoveryAddress(final String discoveryAddress)
- {
- checkWrite();
- sessionFactory.setDiscoveryAddress(discoveryAddress);
- }
-
public synchronized int getDiscoveryPort()
{
- return sessionFactory.getDiscoveryPort();
+ return serverLocator.getDiscoveryPort();
}
- public synchronized void setDiscoveryPort(final int discoveryPort)
- {
- checkWrite();
- sessionFactory.setDiscoveryPort(discoveryPort);
- }
-
public synchronized long getDiscoveryRefreshTimeout()
{
- return sessionFactory.getDiscoveryRefreshTimeout();
+ return serverLocator.getDiscoveryRefreshTimeout();
}
public synchronized void setDiscoveryRefreshTimeout(final long
discoveryRefreshTimeout)
{
checkWrite();
- sessionFactory.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
+ serverLocator.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
}
public synchronized long getDiscoveryInitialWaitTimeout()
{
- return sessionFactory.getDiscoveryInitialWaitTimeout();
+ return serverLocator.getDiscoveryInitialWaitTimeout();
}
public synchronized void setDiscoveryInitialWaitTimeout(final long
discoveryInitialWaitTimeout)
{
checkWrite();
- sessionFactory.setDiscoveryInitialWaitTimeout(discoveryInitialWaitTimeout);
+ serverLocator.setDiscoveryInitialWaitTimeout(discoveryInitialWaitTimeout);
}
public synchronized String getClientID()
@@ -300,90 +279,90 @@
public synchronized long getClientFailureCheckPeriod()
{
- return sessionFactory.getClientFailureCheckPeriod();
+ return serverLocator.getClientFailureCheckPeriod();
}
public synchronized void setClientFailureCheckPeriod(final long
clientFailureCheckPeriod)
{
checkWrite();
- sessionFactory.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
}
public synchronized long getConnectionTTL()
{
- return sessionFactory.getConnectionTTL();
+ return serverLocator.getConnectionTTL();
}
public synchronized void setConnectionTTL(final long connectionTTL)
{
checkWrite();
- sessionFactory.setConnectionTTL(connectionTTL);
+ serverLocator.setConnectionTTL(connectionTTL);
}
public synchronized long getCallTimeout()
{
- return sessionFactory.getCallTimeout();
+ return serverLocator.getCallTimeout();
}
public synchronized void setCallTimeout(final long callTimeout)
{
checkWrite();
- sessionFactory.setCallTimeout(callTimeout);
+ serverLocator.setCallTimeout(callTimeout);
}
public synchronized int getConsumerWindowSize()
{
- return sessionFactory.getConsumerWindowSize();
+ return serverLocator.getConsumerWindowSize();
}
public synchronized void setConsumerWindowSize(final int consumerWindowSize)
{
checkWrite();
- sessionFactory.setConsumerWindowSize(consumerWindowSize);
+ serverLocator.setConsumerWindowSize(consumerWindowSize);
}
public synchronized int getConsumerMaxRate()
{
- return sessionFactory.getConsumerMaxRate();
+ return serverLocator.getConsumerMaxRate();
}
public synchronized void setConsumerMaxRate(final int consumerMaxRate)
{
checkWrite();
- sessionFactory.setConsumerMaxRate(consumerMaxRate);
+ serverLocator.setConsumerMaxRate(consumerMaxRate);
}
public synchronized int getConfirmationWindowSize()
{
- return sessionFactory.getConfirmationWindowSize();
+ return serverLocator.getConfirmationWindowSize();
}
public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
{
checkWrite();
- sessionFactory.setConfirmationWindowSize(confirmationWindowSize);
+ serverLocator.setConfirmationWindowSize(confirmationWindowSize);
}
public synchronized int getProducerMaxRate()
{
- return sessionFactory.getProducerMaxRate();
+ return serverLocator.getProducerMaxRate();
}
public synchronized void setProducerMaxRate(final int producerMaxRate)
{
checkWrite();
- sessionFactory.setProducerMaxRate(producerMaxRate);
+ serverLocator.setProducerMaxRate(producerMaxRate);
}
public synchronized int getProducerWindowSize()
{
- return sessionFactory.getProducerWindowSize();
+ return serverLocator.getProducerWindowSize();
}
public synchronized void setProducerWindowSize(final int producerWindowSize)
{
checkWrite();
- sessionFactory.setProducerWindowSize(producerWindowSize);
+ serverLocator.setProducerWindowSize(producerWindowSize);
}
/**
@@ -392,208 +371,203 @@
public synchronized void setCacheLargeMessagesClient(final boolean
cacheLargeMessagesClient)
{
checkWrite();
- sessionFactory.setCacheLargeMessagesClient(cacheLargeMessagesClient);
+ serverLocator.setCacheLargeMessagesClient(cacheLargeMessagesClient);
}
public synchronized boolean isCacheLargeMessagesClient()
{
- return sessionFactory.isCacheLargeMessagesClient();
+ return serverLocator.isCacheLargeMessagesClient();
}
public synchronized int getMinLargeMessageSize()
{
- return sessionFactory.getMinLargeMessageSize();
+ return serverLocator.getMinLargeMessageSize();
}
public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
{
checkWrite();
- sessionFactory.setMinLargeMessageSize(minLargeMessageSize);
+ serverLocator.setMinLargeMessageSize(minLargeMessageSize);
}
public synchronized boolean isBlockOnAcknowledge()
{
- return sessionFactory.isBlockOnAcknowledge();
+ return serverLocator.isBlockOnAcknowledge();
}
public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
{
checkWrite();
- sessionFactory.setBlockOnAcknowledge(blockOnAcknowledge);
+ serverLocator.setBlockOnAcknowledge(blockOnAcknowledge);
}
public synchronized boolean isBlockOnNonDurableSend()
{
- return sessionFactory.isBlockOnNonDurableSend();
+ return serverLocator.isBlockOnNonDurableSend();
}
public synchronized void setBlockOnNonDurableSend(final boolean
blockOnNonDurableSend)
{
checkWrite();
- sessionFactory.setBlockOnNonDurableSend(blockOnNonDurableSend);
+ serverLocator.setBlockOnNonDurableSend(blockOnNonDurableSend);
}
public synchronized boolean isBlockOnDurableSend()
{
- return sessionFactory.isBlockOnDurableSend();
+ return serverLocator.isBlockOnDurableSend();
}
public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
{
checkWrite();
- sessionFactory.setBlockOnDurableSend(blockOnDurableSend);
+ serverLocator.setBlockOnDurableSend(blockOnDurableSend);
}
public synchronized boolean isAutoGroup()
{
- return sessionFactory.isAutoGroup();
+ return serverLocator.isAutoGroup();
}
public synchronized void setAutoGroup(final boolean autoGroup)
{
checkWrite();
- sessionFactory.setAutoGroup(autoGroup);
+ serverLocator.setAutoGroup(autoGroup);
}
public synchronized boolean isPreAcknowledge()
{
- return sessionFactory.isPreAcknowledge();
+ return serverLocator.isPreAcknowledge();
}
public synchronized void setPreAcknowledge(final boolean preAcknowledge)
{
checkWrite();
- sessionFactory.setPreAcknowledge(preAcknowledge);
+ serverLocator.setPreAcknowledge(preAcknowledge);
}
public synchronized long getRetryInterval()
{
- return sessionFactory.getRetryInterval();
+ return serverLocator.getRetryInterval();
}
public synchronized void setRetryInterval(final long retryInterval)
{
checkWrite();
- sessionFactory.setRetryInterval(retryInterval);
+ serverLocator.setRetryInterval(retryInterval);
}
public synchronized long getMaxRetryInterval()
{
- return sessionFactory.getMaxRetryInterval();
+ return serverLocator.getMaxRetryInterval();
}
public synchronized void setMaxRetryInterval(final long retryInterval)
{
checkWrite();
- sessionFactory.setMaxRetryInterval(retryInterval);
+ serverLocator.setMaxRetryInterval(retryInterval);
}
public synchronized double getRetryIntervalMultiplier()
{
- return sessionFactory.getRetryIntervalMultiplier();
+ return serverLocator.getRetryIntervalMultiplier();
}
public synchronized void setRetryIntervalMultiplier(final double
retryIntervalMultiplier)
{
checkWrite();
- sessionFactory.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ serverLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
}
public synchronized int getReconnectAttempts()
{
- return sessionFactory.getReconnectAttempts();
+ return serverLocator.getReconnectAttempts();
}
public synchronized void setReconnectAttempts(final int reconnectAttempts)
{
checkWrite();
- sessionFactory.setReconnectAttempts(reconnectAttempts);
+ serverLocator.setReconnectAttempts(reconnectAttempts);
}
public synchronized boolean isFailoverOnInitialConnection()
{
- return sessionFactory.isFailoverOnInitialConnection();
+ return serverLocator.isFailoverOnInitialConnection();
}
public synchronized void setFailoverOnInitialConnection(final boolean failover)
{
checkWrite();
- sessionFactory.setFailoverOnInitialConnection(failover);
+ serverLocator.setFailoverOnInitialConnection(failover);
}
public synchronized boolean isFailoverOnServerShutdown()
{
- return sessionFactory.isFailoverOnServerShutdown();
+ return serverLocator.isFailoverOnServerShutdown();
}
public synchronized void setFailoverOnServerShutdown(final boolean
failoverOnServerShutdown)
{
checkWrite();
- sessionFactory.setFailoverOnServerShutdown(failoverOnServerShutdown);
+ serverLocator.setFailoverOnServerShutdown(failoverOnServerShutdown);
}
public synchronized boolean isUseGlobalPools()
{
- return sessionFactory.isUseGlobalPools();
+ return serverLocator.isUseGlobalPools();
}
public synchronized void setUseGlobalPools(final boolean useGlobalPools)
{
checkWrite();
- sessionFactory.setUseGlobalPools(useGlobalPools);
+ serverLocator.setUseGlobalPools(useGlobalPools);
}
public synchronized int getScheduledThreadPoolMaxSize()
{
- return sessionFactory.getScheduledThreadPoolMaxSize();
+ return serverLocator.getScheduledThreadPoolMaxSize();
}
public synchronized void setScheduledThreadPoolMaxSize(final int
scheduledThreadPoolMaxSize)
{
checkWrite();
- sessionFactory.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+ serverLocator.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
}
public synchronized int getThreadPoolMaxSize()
{
- return sessionFactory.getThreadPoolMaxSize();
+ return serverLocator.getThreadPoolMaxSize();
}
public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
{
checkWrite();
- sessionFactory.setThreadPoolMaxSize(threadPoolMaxSize);
+ serverLocator.setThreadPoolMaxSize(threadPoolMaxSize);
}
public synchronized int getInitialMessagePacketSize()
{
- return sessionFactory.getInitialMessagePacketSize();
+ return serverLocator.getInitialMessagePacketSize();
}
public synchronized void setInitialMessagePacketSize(final int size)
{
checkWrite();
- sessionFactory.setInitialMessagePacketSize(size);
+ serverLocator.setInitialMessagePacketSize(size);
}
- public ClientSessionFactory getCoreFactory()
- {
- return sessionFactory;
- }
-
public void setGroupID(final String groupID)
{
- sessionFactory.setGroupID(groupID);
+ serverLocator.setGroupID(groupID);
}
public String getGroupID()
{
- return sessionFactory.getGroupID();
+ return serverLocator.getGroupID();
}
public void close()
{
- sessionFactory.close();
+ serverLocator.close();
}
// Package protected
----------------------------------------------------------------------------
@@ -607,9 +581,20 @@
{
readOnly = true;
- // Note that each JMS connection gets it's own copy of the connection factory
- // This means there is one underlying remoting connection per jms connection (if
not load balanced)
- ClientSessionFactory factory = sessionFactory.copy();
+ ClientSessionFactory factory;
+
+ try
+ {
+ factory = serverLocator.createSessionFactory();
+ }
+ catch (Exception e)
+ {
+ JMSException jmse = new JMSException("Failed to create session
factory");
+
+ jmse.setLinkedException(e);
+
+ throw jmse;
+ }
HornetQConnection connection = new HornetQConnection(username,
password,
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java 2010-06-30
12:54:13 UTC (rev 9375)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
@@ -13,13 +13,10 @@
package org.hornetq.jms.management.impl;
-import java.util.List;
-
import javax.management.MBeanInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.StandardMBean;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.Parameter;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
@@ -293,16 +290,11 @@
cf.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName);
}
- public List<Pair<TransportConfiguration, TransportConfiguration>>
getStaticConnectors()
+ public TransportConfiguration[] getStaticConnectors()
{
return cf.getStaticConnectors();
}
- public void setStaticConnectors(List<Pair<TransportConfiguration,
TransportConfiguration>> staticConnectors)
- {
- cf.setStaticConnectors(staticConnectors);
- }
-
public String getLocalBindAddress()
{
return cf.getLocalBindAddress();
@@ -318,21 +310,11 @@
return cf.getDiscoveryAddress();
}
- public void setDiscoveryAddress(String discoveryAddress)
- {
- cf.setDiscoveryAddress(discoveryAddress);
- }
-
public int getDiscoveryPort()
{
return cf.getDiscoveryPort();
}
- public void setDiscoveryPort(int discoveryPort)
- {
- cf.setDiscoveryPort(discoveryPort);
- }
-
public void addJNDI(@Parameter(name = "jndiBinding", desc = "the name
of the binding for JNDI") String jndi) throws Exception
{
jmsManager.addConnectionFactoryToJNDI(name, jndi);