JBoss hornetq SVN: r8760 - in trunk/src/main/org/hornetq/api: jms/management and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-07 04:59:41 -0500 (Thu, 07 Jan 2010)
New Revision: 8760
Modified:
trunk/src/main/org/hornetq/api/core/management/ManagementHelper.java
trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java
Log:
javadoc
Modified: trunk/src/main/org/hornetq/api/core/management/ManagementHelper.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/ManagementHelper.java 2010-01-07 09:49:21 UTC (rev 8759)
+++ trunk/src/main/org/hornetq/api/core/management/ManagementHelper.java 2010-01-07 09:59:41 UTC (rev 8760)
@@ -360,7 +360,7 @@
/**
* Returns the result of an operation invocation or an attribute value.
*
- * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@ code false}.
+ * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
* and the result will be a String corresponding to the server exception.
*/
public static Object[] getResults(final Message message) throws Exception
@@ -384,7 +384,7 @@
/**
* Returns the result of an operation invocation or an attribute value.
*
- * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@ code false}.
+ * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
* and the result will be a String corresponding to the server exception.
*/
public static Object getResult(final Message message) throws Exception
Modified: trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java 2010-01-07 09:49:21 UTC (rev 8759)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java 2010-01-07 09:59:41 UTC (rev 8760)
@@ -153,7 +153,7 @@
/**
* Returns the result of an operation invocation or an attribute value.
*
- * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@ code false}.
+ * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
* and the result will be a String corresponding to the server exception.
*/
public static Object[] getResults(final Message message) throws Exception
@@ -164,7 +164,7 @@
/**
* Returns the result of an operation invocation or an attribute value.
*
- * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@ code false}.
+ * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
* and the result will be a String corresponding to the server exception.
*/
public static Object getResult(final Message message) throws Exception
14 years, 11 months
JBoss hornetq SVN: r8759 - in trunk/src/main/org/hornetq/api: jms/management and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-07 04:49:21 -0500 (Thu, 07 Jan 2010)
New Revision: 8759
Modified:
trunk/src/main/org/hornetq/api/core/management/ManagementHelper.java
trunk/src/main/org/hornetq/api/core/management/ResourceNames.java
trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java
Log:
javadoc
Modified: trunk/src/main/org/hornetq/api/core/management/ManagementHelper.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/ManagementHelper.java 2010-01-07 09:38:09 UTC (rev 8758)
+++ trunk/src/main/org/hornetq/api/core/management/ManagementHelper.java 2010-01-07 09:49:21 UTC (rev 8759)
@@ -81,12 +81,30 @@
// Static --------------------------------------------------------
+ /**
+ * Stores a resource attribute in a message to retrieve the value from the server resource.
+ *
+ * @param message message
+ * @param resourceName the name of the resource
+ * @param attribute the name of the attribute
+ *
+ * @see ResourceNames
+ */
public static void putAttribute(final Message message, final String resourceName, final String attribute)
{
message.putStringProperty(ManagementHelper.HDR_RESOURCE_NAME, new SimpleString(resourceName));
message.putStringProperty(ManagementHelper.HDR_ATTRIBUTE, new SimpleString(attribute));
}
+ /**
+ * Stores a operation invocation in a message to invoke the corresponding operation the value from the server resource.
+ *
+ * @param message message
+ * @param resourceName the name of the resource
+ * @param operationName the name of the operation to invoke on the resource
+ *
+ * @see ResourceNames
+ */
public static void putOperationInvocation(final Message message,
final String resourceName,
final String operationName) throws Exception
@@ -94,6 +112,16 @@
ManagementHelper.putOperationInvocation(message, resourceName, operationName, (Object[])null);
}
+ /**
+ * Stores a operation invocation in a message to invoke the corresponding operation the value from the server resource.
+ *
+ * @param message message
+ * @param resourceName the name of the server resource
+ * @param operationName the name of the operation to invoke on the server resource
+ * @param parameters the parameters to use to invoke the server resource
+ *
+ * @see ResourceNames
+ */
public static void putOperationInvocation(final Message message,
final String resourceName,
final String operationName,
@@ -271,6 +299,9 @@
}
}
+ /**
+ * Used by HornetQ management service.
+ */
public static Object[] retrieveOperationParameters(final Message message) throws Exception
{
String jsonString = message.getBodyBuffer().readNullableString();
@@ -287,16 +318,25 @@
}
}
+ /**
+ * Returns whether the JMS message corresponds to the result of a management operation invocation.
+ */
public static boolean isOperationResult(final Message message)
{
return message.containsProperty(ManagementHelper.HDR_OPERATION_SUCCEEDED);
}
+ /**
+ * Returns whether the JMS message corresponds to the result of a management attribute value.
+ */
public static boolean isAttributesResult(final Message message)
{
return !ManagementHelper.isOperationResult(message);
}
+ /**
+ * Used by HornetQ management service.
+ */
public static void storeResult(final Message message, final Object result) throws Exception
{
String resultString;
@@ -317,6 +357,12 @@
message.getBodyBuffer().writeNullableString(resultString);
}
+ /**
+ * Returns the result of an operation invocation or an attribute value.
+ *
+ * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@ code false}.
+ * and the result will be a String corresponding to the server exception.
+ */
public static Object[] getResults(final Message message) throws Exception
{
String jsonString = message.getBodyBuffer().readNullableString();
@@ -335,6 +381,12 @@
}
}
+ /**
+ * Returns the result of an operation invocation or an attribute value.
+ *
+ * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@ code false}.
+ * and the result will be a String corresponding to the server exception.
+ */
public static Object getResult(final Message message) throws Exception
{
Object[] res = ManagementHelper.getResults(message);
@@ -349,6 +401,9 @@
}
}
+ /**
+ * Returns whether the invocation of the management operation on the server resource succeeded.
+ */
public static boolean hasOperationSucceeded(final Message message)
{
if (!ManagementHelper.isOperationResult(message))
@@ -362,6 +417,9 @@
return false;
}
+ /**
+ * Used by HornetQ management service.
+ */
public static Map<String, Object> fromCommaSeparatedKeyValues(final String str) throws Exception
{
if (str == null || str.trim().length() == 0)
@@ -375,6 +433,9 @@
return params;
}
+ /**
+ * Used by HornetQ management service.
+ */
public static Object[] fromCommaSeparatedArrayOfCommaSeparatedKeyValues(final String str) throws Exception
{
if (str == null || str.trim().length() == 0)
Modified: trunk/src/main/org/hornetq/api/core/management/ResourceNames.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/ResourceNames.java 2010-01-07 09:38:09 UTC (rev 8758)
+++ trunk/src/main/org/hornetq/api/core/management/ResourceNames.java 2010-01-07 09:49:21 UTC (rev 8759)
@@ -15,6 +15,9 @@
/**
* Helper class used to build resource names used by management messages.
+ *
+ * Resource's name is build by appending its <em>name</em> to its corresponding type.
+ * For example, the resource name of the "foo" queue is {@code CORE_QUEUE + "foo"}.
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
Modified: trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java 2010-01-07 09:38:09 UTC (rev 8758)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java 2010-01-07 09:49:21 UTC (rev 8759)
@@ -17,9 +17,12 @@
import javax.jms.Message;
import org.hornetq.api.core.management.ManagementHelper;
+import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.jms.client.HornetQMessage;
-/*
+/**
+ * Helper class to use JMS messages to manage HornetQ server resources.
+ *
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
@@ -44,11 +47,31 @@
return ((HornetQMessage)jmsMessage).getCoreMessage();
}
+ /**
+ * Stores a resource attribute in a JMS message to retrieve the value from the server resource.
+ *
+ * @param message JMS message
+ * @param resourceName the name of the resource
+ * @param attribute the name of the attribute
+ * @throws JMSException if an exception occurs while putting the information in the message
+ *
+ * @see ResourceNames
+ */
public static void putAttribute(final Message message, final String resourceName, final String attribute) throws JMSException
{
ManagementHelper.putAttribute(JMSManagementHelper.getCoreMessage(message), resourceName, attribute);
}
+ /**
+ * Stores a operation invocation in a JMS message to invoke the corresponding operation the value from the server resource.
+ *
+ * @param message JMS message
+ * @param resourceName the name of the resource
+ * @param operationName the name of the operation to invoke on the resource
+ * @throws JMSException if an exception occurs while putting the information in the message
+ *
+ * @see ResourceNames
+ */
public static void putOperationInvocation(final Message message,
final String resourceName,
final String operationName) throws JMSException
@@ -74,6 +97,17 @@
return jmse;
}
+ /**
+ * Stores a operation invocation in a JMS message to invoke the corresponding operation the value from the server resource.
+ *
+ * @param message JMS message
+ * @param resourceName the name of the server resource
+ * @param operationName the name of the operation to invoke on the server resource
+ * @param parameters the parameters to use to invoke the server resource
+ * @throws JMSException if an exception occurs while putting the information in the message
+ *
+ * @see ResourceNames
+ */
public static void putOperationInvocation(final Message message,
final String resourceName,
final String operationName,
@@ -92,26 +126,47 @@
}
}
+ /**
+ * Returns whether the JMS message corresponds to the result of a management operation invocation.
+ */
public static boolean isOperationResult(final Message message) throws JMSException
{
return ManagementHelper.isOperationResult(JMSManagementHelper.getCoreMessage(message));
}
+ /**
+ * Returns whether the JMS message corresponds to the result of a management attribute value.
+ */
public static boolean isAttributesResult(final Message message) throws JMSException
{
return ManagementHelper.isAttributesResult(JMSManagementHelper.getCoreMessage(message));
}
+ /**
+ * Returns whether the invocation of the management operation on the server resource succeeded.
+ */
public static boolean hasOperationSucceeded(final Message message) throws JMSException
{
return ManagementHelper.hasOperationSucceeded(JMSManagementHelper.getCoreMessage(message));
}
+ /**
+ * Returns the result of an operation invocation or an attribute value.
+ *
+ * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@ code false}.
+ * and the result will be a String corresponding to the server exception.
+ */
public static Object[] getResults(final Message message) throws Exception
{
return ManagementHelper.getResults(JMSManagementHelper.getCoreMessage(message));
}
+ /**
+ * Returns the result of an operation invocation or an attribute value.
+ *
+ * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@ code false}.
+ * and the result will be a String corresponding to the server exception.
+ */
public static Object getResult(final Message message) throws Exception
{
return ManagementHelper.getResult(JMSManagementHelper.getCoreMessage(message));
14 years, 11 months
JBoss hornetq SVN: r8758 - in trunk: src/main/org/hornetq/core/client/impl and 10 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-07 04:38:09 -0500 (Thu, 07 Jan 2010)
New Revision: 8758
Added:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Removed:
trunk/src/main/org/hornetq/api/core/client/ClientSessionFactoryImpl.java
Modified:
trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/BroadcastGroupControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/DiscoveryGroupControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/DivertControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/NetworkAddressTestBase.java
trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java
trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
moved clientsessionfactoryimpl out of the api
Deleted: trunk/src/main/org/hornetq/api/core/client/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSessionFactoryImpl.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSessionFactoryImpl.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -1,1116 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.api.core.client;
-
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.FailoverManager;
-import org.hornetq.core.client.impl.FailoverManagerImpl;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
- * @version <tt>$Revision: 3602 $</tt>
- *
- */
-public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, DiscoveryListener, Serializable
-{
- // Constants
- // ------------------------------------------------------------------------------------
-
- private static final long serialVersionUID = 2512460695662741413L;
-
- private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
-
- // Attributes
- // -----------------------------------------------------------------------------------
-
- private final Map<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager> failoverManagerMap = new LinkedHashMap<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager>();
-
- private volatile boolean receivedBroadcast = false;
-
- private ExecutorService threadPool;
-
- private ScheduledExecutorService scheduledThreadPool;
-
- private DiscoveryGroup discoveryGroup;
-
- private ConnectionLoadBalancingPolicy loadBalancingPolicy;
-
- private FailoverManager[] failoverManagerArray;
-
- private boolean readOnly;
-
- // Settable attributes:
-
- private boolean cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- private List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors;
-
- private String discoveryAddress;
-
- private int discoveryPort;
-
- private long discoveryRefreshTimeout;
-
- private long discoveryInitialWaitTimeout;
-
- private long clientFailureCheckPeriod;
-
- private long connectionTTL;
-
- private long callTimeout;
-
- private int minLargeMessageSize;
-
- private int consumerWindowSize;
-
- private int consumerMaxRate;
-
- private int confirmationWindowSize;
-
- private int producerWindowSize;
-
- private int producerMaxRate;
-
- private boolean blockOnAcknowledge;
-
- private boolean blockOnDurableSend;
-
- private boolean blockOnNonDurableSend;
-
- private boolean autoGroup;
-
- private boolean preAcknowledge;
-
- private String connectionLoadBalancingPolicyClassName;
-
- private int ackBatchSize;
-
- private boolean useGlobalPools;
-
- private int scheduledThreadPoolMaxSize;
-
- private int threadPoolMaxSize;
-
- private long retryInterval;
-
- private double retryIntervalMultiplier;
-
- private long maxRetryInterval;
-
- private int reconnectAttempts;
-
- private int initialMessagePacketSize;
-
- private volatile boolean closed;
-
- private boolean failoverOnServerShutdown;
-
- private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
-
- private static ExecutorService globalThreadPool;
-
- private static ScheduledExecutorService globalScheduledThreadPool;
-
- private String groupID;
-
- private static synchronized ExecutorService getGlobalThreadPool()
- {
- if (ClientSessionFactoryImpl.globalThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true);
-
- ClientSessionFactoryImpl.globalThreadPool = Executors.newCachedThreadPool(factory);
- }
-
- return ClientSessionFactoryImpl.globalThreadPool;
- }
-
- private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
- {
- if (ClientSessionFactoryImpl.globalScheduledThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads", true);
-
- ClientSessionFactoryImpl.globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- factory);
- }
-
- return ClientSessionFactoryImpl.globalScheduledThreadPool;
- }
-
- private void setThreadPools()
- {
- if (useGlobalPools)
- {
- threadPool = ClientSessionFactoryImpl.getGlobalThreadPool();
-
- scheduledThreadPool = ClientSessionFactoryImpl.getGlobalScheduledThreadPool();
- }
- else
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
- true);
-
- if (threadPoolMaxSize == -1)
- {
- threadPool = Executors.newCachedThreadPool(factory);
- }
- else
- {
- threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
- }
-
- factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
- true);
-
- scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
- }
- }
-
- private synchronized void initialise() throws Exception
- {
- if (!readOnly)
- {
- setThreadPools();
-
- instantiateLoadBalancingPolicy();
-
- if (discoveryAddress != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
-
- discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
- discoveryAddress,
- groupAddress,
- discoveryPort,
- discoveryRefreshTimeout);
-
- discoveryGroup.registerListener(this);
-
- discoveryGroup.start();
- }
- else if (staticConnectors != null)
- {
- for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
- {
- FailoverManager cm = new FailoverManagerImpl(this,
- pair.a,
- pair.b,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
-
- failoverManagerMap.put(pair, cm);
- }
-
- updatefailoverManagerArray();
- }
- else
- {
- throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
- }
- readOnly = true;
- }
- }
-
- // Static
- // ---------------------------------------------------------------------------------------
-
- // Constructors
- // ---------------------------------------------------------------------------------
-
- public ClientSessionFactoryImpl(final ClientSessionFactory other)
- {
- discoveryAddress = other.getDiscoveryAddress();
-
- discoveryPort = other.getDiscoveryPort();
-
- staticConnectors = other.getStaticConnectors();
-
- discoveryRefreshTimeout = other.getDiscoveryRefreshTimeout();
-
- clientFailureCheckPeriod = other.getClientFailureCheckPeriod();
-
- connectionTTL = other.getConnectionTTL();
-
- callTimeout = other.getCallTimeout();
-
- minLargeMessageSize = other.getMinLargeMessageSize();
-
- consumerWindowSize = other.getConsumerWindowSize();
-
- consumerMaxRate = other.getConsumerMaxRate();
-
- confirmationWindowSize = other.getConfirmationWindowSize();
-
- producerWindowSize = other.getProducerWindowSize();
-
- producerMaxRate = other.getProducerMaxRate();
-
- blockOnAcknowledge = other.isBlockOnAcknowledge();
-
- blockOnDurableSend = other.isBlockOnDurableSend();
-
- blockOnNonDurableSend = other.isBlockOnNonDurableSend();
-
- autoGroup = other.isAutoGroup();
-
- preAcknowledge = other.isPreAcknowledge();
-
- ackBatchSize = other.getAckBatchSize();
-
- connectionLoadBalancingPolicyClassName = other.getConnectionLoadBalancingPolicyClassName();
-
- discoveryInitialWaitTimeout = other.getDiscoveryInitialWaitTimeout();
-
- useGlobalPools = other.isUseGlobalPools();
-
- scheduledThreadPoolMaxSize = other.getScheduledThreadPoolMaxSize();
-
- threadPoolMaxSize = other.getThreadPoolMaxSize();
-
- retryInterval = other.getRetryInterval();
-
- retryIntervalMultiplier = other.getRetryIntervalMultiplier();
-
- maxRetryInterval = other.getMaxRetryInterval();
-
- reconnectAttempts = other.getReconnectAttempts();
-
- failoverOnServerShutdown = other.isFailoverOnServerShutdown();
-
- cacheLargeMessagesClient = other.isCacheLargeMessagesClient();
-
- initialMessagePacketSize = other.getInitialMessagePacketSize();
-
- groupID = other.getGroupID();
- }
-
- public ClientSessionFactoryImpl()
- {
- discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
-
- clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
- connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
-
- minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
- consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
- consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
-
- confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
- producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
- producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
-
- blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
- blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
- blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
- autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
-
- preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
- ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
-
- connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
- discoveryInitialWaitTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
-
- useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
-
- scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
- threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
- retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
-
- retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
- maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
- reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
- failoverOnServerShutdown = HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
- }
-
- public ClientSessionFactoryImpl(final String discoveryAddress, final int discoveryPort)
- {
- this();
-
- this.discoveryAddress = discoveryAddress;
-
- this.discoveryPort = discoveryPort;
- }
-
- public ClientSessionFactoryImpl(final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
- {
- this();
-
- this.staticConnectors = staticConnectors;
- }
-
- public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
- final TransportConfiguration backupConnectorConfig)
- {
- this();
-
- staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
-
- staticConnectors.add(new Pair<TransportConfiguration, TransportConfiguration>(connectorConfig,
- backupConnectorConfig));
- }
-
- public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig)
- {
- this(connectorConfig, null);
- }
-
- // ClientSessionFactory implementation------------------------------------------------------------
-
- public synchronized boolean isCacheLargeMessagesClient()
- {
- return cacheLargeMessagesClient;
- }
-
- public synchronized void setCacheLargeMessagesClient(final boolean cached)
- {
- cacheLargeMessagesClient = cached;
- }
-
- public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getStaticConnectors()
- {
- return staticConnectors;
- }
-
- public synchronized void setStaticConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
- {
- checkWrite();
-
- this.staticConnectors = staticConnectors;
- }
-
- public synchronized long getClientFailureCheckPeriod()
- {
- return clientFailureCheckPeriod;
- }
-
- public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
- {
- checkWrite();
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- }
-
- public synchronized long getConnectionTTL()
- {
- return connectionTTL;
- }
-
- public synchronized void setConnectionTTL(final long connectionTTL)
- {
- checkWrite();
- this.connectionTTL = connectionTTL;
- }
-
- public synchronized long getCallTimeout()
- {
- return callTimeout;
- }
-
- public synchronized void setCallTimeout(final long callTimeout)
- {
- checkWrite();
- this.callTimeout = callTimeout;
- }
-
- public synchronized int getMinLargeMessageSize()
- {
- return minLargeMessageSize;
- }
-
- public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
- {
- checkWrite();
- this.minLargeMessageSize = minLargeMessageSize;
- }
-
- public synchronized int getConsumerWindowSize()
- {
- return consumerWindowSize;
- }
-
- public synchronized void setConsumerWindowSize(final int consumerWindowSize)
- {
- checkWrite();
- this.consumerWindowSize = consumerWindowSize;
- }
-
- public synchronized int getConsumerMaxRate()
- {
- return consumerMaxRate;
- }
-
- public synchronized void setConsumerMaxRate(final int consumerMaxRate)
- {
- checkWrite();
- this.consumerMaxRate = consumerMaxRate;
- }
-
- public synchronized int getConfirmationWindowSize()
- {
- return confirmationWindowSize;
- }
-
- public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
- {
- checkWrite();
- this.confirmationWindowSize = confirmationWindowSize;
- }
-
- public synchronized int getProducerWindowSize()
- {
- return producerWindowSize;
- }
-
- public synchronized void setProducerWindowSize(final int producerWindowSize)
- {
- checkWrite();
- this.producerWindowSize = producerWindowSize;
- }
-
- public synchronized int getProducerMaxRate()
- {
- return producerMaxRate;
- }
-
- public synchronized void setProducerMaxRate(final int producerMaxRate)
- {
- checkWrite();
- this.producerMaxRate = producerMaxRate;
- }
-
- public synchronized boolean isBlockOnAcknowledge()
- {
- return blockOnAcknowledge;
- }
-
- public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
- {
- checkWrite();
- this.blockOnAcknowledge = blockOnAcknowledge;
- }
-
- public synchronized boolean isBlockOnDurableSend()
- {
- return blockOnDurableSend;
- }
-
- public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
- {
- checkWrite();
- this.blockOnDurableSend = blockOnDurableSend;
- }
-
- public synchronized boolean isBlockOnNonDurableSend()
- {
- return blockOnNonDurableSend;
- }
-
- public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
- {
- checkWrite();
- this.blockOnNonDurableSend = blockOnNonDurableSend;
- }
-
- public synchronized boolean isAutoGroup()
- {
- return autoGroup;
- }
-
- public synchronized void setAutoGroup(final boolean autoGroup)
- {
- checkWrite();
- this.autoGroup = autoGroup;
- }
-
- public synchronized boolean isPreAcknowledge()
- {
- return preAcknowledge;
- }
-
- public synchronized void setPreAcknowledge(final boolean preAcknowledge)
- {
- checkWrite();
- this.preAcknowledge = preAcknowledge;
- }
-
- public synchronized int getAckBatchSize()
- {
- return ackBatchSize;
- }
-
- public synchronized void setAckBatchSize(final int ackBatchSize)
- {
- checkWrite();
- this.ackBatchSize = ackBatchSize;
- }
-
- public synchronized long getDiscoveryInitialWaitTimeout()
- {
- return discoveryInitialWaitTimeout;
- }
-
- public synchronized void setDiscoveryInitialWaitTimeout(final long initialWaitTimeout)
- {
- checkWrite();
- discoveryInitialWaitTimeout = initialWaitTimeout;
- }
-
- public synchronized boolean isUseGlobalPools()
- {
- return useGlobalPools;
- }
-
- public synchronized void setUseGlobalPools(final boolean useGlobalPools)
- {
- checkWrite();
- this.useGlobalPools = useGlobalPools;
- }
-
- public synchronized int getScheduledThreadPoolMaxSize()
- {
- return scheduledThreadPoolMaxSize;
- }
-
- public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
- {
- checkWrite();
- this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
- }
-
- public synchronized int getThreadPoolMaxSize()
- {
- return threadPoolMaxSize;
- }
-
- public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
- {
- checkWrite();
- this.threadPoolMaxSize = threadPoolMaxSize;
- }
-
- public synchronized long getRetryInterval()
- {
- return retryInterval;
- }
-
- public synchronized void setRetryInterval(final long retryInterval)
- {
- checkWrite();
- this.retryInterval = retryInterval;
- }
-
- public synchronized long getMaxRetryInterval()
- {
- return maxRetryInterval;
- }
-
- public synchronized void setMaxRetryInterval(final long retryInterval)
- {
- checkWrite();
- maxRetryInterval = retryInterval;
- }
-
- public synchronized double getRetryIntervalMultiplier()
- {
- return retryIntervalMultiplier;
- }
-
- public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
- {
- checkWrite();
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- }
-
- public synchronized int getReconnectAttempts()
- {
- return reconnectAttempts;
- }
-
- public synchronized void setReconnectAttempts(final int reconnectAttempts)
- {
- checkWrite();
- this.reconnectAttempts = reconnectAttempts;
- }
-
- public synchronized boolean isFailoverOnServerShutdown()
- {
- return failoverOnServerShutdown;
- }
-
- public synchronized void setFailoverOnServerShutdown(final boolean failoverOnServerShutdown)
- {
- checkWrite();
- this.failoverOnServerShutdown = failoverOnServerShutdown;
- }
-
- public synchronized String getConnectionLoadBalancingPolicyClassName()
- {
- return connectionLoadBalancingPolicyClassName;
- }
-
- public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
- {
- checkWrite();
- connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
- }
-
- public synchronized String getDiscoveryAddress()
- {
- return discoveryAddress;
- }
-
- public synchronized void setDiscoveryAddress(final String discoveryAddress)
- {
- checkWrite();
- this.discoveryAddress = discoveryAddress;
- }
-
- public synchronized int getDiscoveryPort()
- {
- return discoveryPort;
- }
-
- public synchronized void setDiscoveryPort(final int discoveryPort)
- {
- checkWrite();
- this.discoveryPort = discoveryPort;
- }
-
- public synchronized long getDiscoveryRefreshTimeout()
- {
- return discoveryRefreshTimeout;
- }
-
- public void addInterceptor(final Interceptor interceptor)
- {
- interceptors.add(interceptor);
- }
-
- public boolean removeInterceptor(final Interceptor interceptor)
- {
- return interceptors.remove(interceptor);
- }
-
- public synchronized void setDiscoveryRefreshTimeout(final long discoveryRefreshTimeout)
- {
- checkWrite();
- this.discoveryRefreshTimeout = discoveryRefreshTimeout;
- }
-
- public synchronized int getInitialMessagePacketSize()
- {
- return initialMessagePacketSize;
- }
-
- public synchronized void setInitialMessagePacketSize(final int size)
- {
- checkWrite();
- initialMessagePacketSize = size;
- }
-
- public ClientSession createSession(final String username,
- final String password,
- final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final int ackBatchSize) throws HornetQException
- {
- return createSessionInternal(username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- ackBatchSize);
- }
-
- public ClientSession createSession(final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final int ackBatchSize) throws HornetQException
- {
- return createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createXASession() throws HornetQException
- {
- return createSessionInternal(null, null, true, false, false, preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createTransactedSession() throws HornetQException
- {
- return createSessionInternal(null, null, false, false, false, preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createSession() throws HornetQException
- {
- return createSessionInternal(null, null, false, true, true, preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createSession(final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
- {
- return createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
- {
- return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
- }
-
- public ClientSession createSession(final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge) throws HornetQException
- {
- return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
- }
-
- public int numSessions()
- {
- int num = 0;
-
- for (FailoverManager failoverManager : failoverManagerMap.values())
- {
- num += failoverManager.numSessions();
- }
-
- return num;
- }
-
- public int numConnections()
- {
- int num = 0;
-
- for (FailoverManager failoverManager : failoverManagerMap.values())
- {
- num += failoverManager.numConnections();
- }
-
- return num;
- }
-
- public void close()
- {
- if (closed)
- {
- return;
- }
-
- if (discoveryGroup != null)
- {
- try
- {
- discoveryGroup.stop();
- }
- catch (Exception e)
- {
- ClientSessionFactoryImpl.log.error("Failed to stop discovery group", e);
- }
- }
-
- for (FailoverManager failoverManager : failoverManagerMap.values())
- {
- failoverManager.causeExit();
- }
-
- failoverManagerMap.clear();
-
- if (!useGlobalPools)
- {
- if (threadPool != null)
- {
- threadPool.shutdown();
-
- try
- {
- if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- ClientSessionFactoryImpl.log.warn("Timed out waiting for pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
-
- if (scheduledThreadPool != null)
- {
- scheduledThreadPool.shutdown();
-
- try
- {
- if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- ClientSessionFactoryImpl.log.warn("Timed out waiting for scheduled pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
-
- closed = true;
- }
-
- public ClientSessionFactory copy()
- {
- return new ClientSessionFactoryImpl(this);
- }
-
- public void setGroupID(final String groupID)
- {
- this.groupID = groupID;
- }
-
- public String getGroupID()
- {
- return groupID;
- }
-
- // DiscoveryListener implementation --------------------------------------------------------
-
- public synchronized void connectorsChanged()
- {
- receivedBroadcast = true;
-
- Map<String, DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntryMap();
-
- Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
-
- for (DiscoveryEntry entry : newConnectors.values())
- {
- connectorSet.add(entry.getConnectorPair());
- }
-
- Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager>> iter = failoverManagerMap.entrySet()
- .iterator();
- while (iter.hasNext())
- {
- Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager> entry = iter.next();
-
- if (!connectorSet.contains(entry.getKey()))
- {
- // failoverManager no longer there - we should remove it
-
- iter.remove();
- }
- }
-
- for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorSet)
- {
- if (!failoverManagerMap.containsKey(connectorPair))
- {
- // Create a new failoverManager
-
- FailoverManager failoverManager = new FailoverManagerImpl(this,
- connectorPair.a,
- connectorPair.b,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
-
- failoverManagerMap.put(connectorPair, failoverManager);
- }
- }
-
- updatefailoverManagerArray();
- }
-
- public FailoverManager[] getFailoverManagers()
- {
- return failoverManagerArray;
- }
-
- // Protected ------------------------------------------------------------------------------
-
- @Override
- protected void finalize() throws Throwable
- {
- close();
-
- super.finalize();
- }
-
- // Private --------------------------------------------------------------------------------
-
- private void checkWrite()
- {
- if (readOnly)
- {
- throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
- }
- }
-
- private ClientSession createSessionInternal(final String username,
- final String password,
- final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final int ackBatchSize) throws HornetQException
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session, factory is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- if (discoveryGroup != null && !receivedBroadcast)
- {
- boolean ok = discoveryGroup.waitForBroadcast(discoveryInitialWaitTimeout);
-
- if (!ok)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial broadcast from discovery group");
- }
- }
-
- synchronized (this)
- {
- int pos = loadBalancingPolicy.select(failoverManagerArray.length);
-
- FailoverManager failoverManager = failoverManagerArray[pos];
-
- ClientSession session = failoverManager.createSession(username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- ackBatchSize,
- cacheLargeMessagesClient,
- minLargeMessageSize,
- blockOnAcknowledge,
- autoGroup,
- confirmationWindowSize,
- producerWindowSize,
- consumerWindowSize,
- producerMaxRate,
- consumerMaxRate,
- blockOnNonDurableSend,
- blockOnDurableSend,
- initialMessagePacketSize,
- groupID);
-
- return session;
- }
- }
-
- private void instantiateLoadBalancingPolicy()
- {
- if (connectionLoadBalancingPolicyClassName == null)
- {
- throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
- }
-
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
- }
- }
-
- private synchronized void updatefailoverManagerArray()
- {
- failoverManagerArray = new FailoverManager[failoverManagerMap.size()];
-
- failoverManagerMap.values().toArray(failoverManagerArray);
- }
-
-}
Modified: trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import java.util.List;
Copied: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java (from rev 8755, trunk/src/main/org/hornetq/api/core/client/ClientSessionFactoryImpl.java)
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -0,0 +1,1116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @version <tt>$Revision: 3602 $</tt>
+ *
+ */
+public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, DiscoveryListener, Serializable
+{
+ // Constants
+ // ------------------------------------------------------------------------------------
+
+ private static final long serialVersionUID = 2512460695662741413L;
+
+ private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
+
+ // Attributes
+ // -----------------------------------------------------------------------------------
+
+ private final Map<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager> failoverManagerMap = new LinkedHashMap<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager>();
+
+ private volatile boolean receivedBroadcast = false;
+
+ private ExecutorService threadPool;
+
+ private ScheduledExecutorService scheduledThreadPool;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+ private FailoverManager[] failoverManagerArray;
+
+ private boolean readOnly;
+
+ // Settable attributes:
+
+ private boolean cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ private List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors;
+
+ private String discoveryAddress;
+
+ private int discoveryPort;
+
+ private long discoveryRefreshTimeout;
+
+ private long discoveryInitialWaitTimeout;
+
+ private long clientFailureCheckPeriod;
+
+ private long connectionTTL;
+
+ private long callTimeout;
+
+ private int minLargeMessageSize;
+
+ private int consumerWindowSize;
+
+ private int consumerMaxRate;
+
+ private int confirmationWindowSize;
+
+ private int producerWindowSize;
+
+ private int producerMaxRate;
+
+ private boolean blockOnAcknowledge;
+
+ private boolean blockOnDurableSend;
+
+ private boolean blockOnNonDurableSend;
+
+ private boolean autoGroup;
+
+ private boolean preAcknowledge;
+
+ private String connectionLoadBalancingPolicyClassName;
+
+ private int ackBatchSize;
+
+ private boolean useGlobalPools;
+
+ private int scheduledThreadPoolMaxSize;
+
+ private int threadPoolMaxSize;
+
+ private long retryInterval;
+
+ private double retryIntervalMultiplier;
+
+ private long maxRetryInterval;
+
+ private int reconnectAttempts;
+
+ private int initialMessagePacketSize;
+
+ private volatile boolean closed;
+
+ private boolean failoverOnServerShutdown;
+
+ private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+
+ private static ExecutorService globalThreadPool;
+
+ private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private String groupID;
+
+ private static synchronized ExecutorService getGlobalThreadPool()
+ {
+ if (ClientSessionFactoryImpl.globalThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true);
+
+ ClientSessionFactoryImpl.globalThreadPool = Executors.newCachedThreadPool(factory);
+ }
+
+ return ClientSessionFactoryImpl.globalThreadPool;
+ }
+
+ private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+ {
+ if (ClientSessionFactoryImpl.globalScheduledThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads", true);
+
+ ClientSessionFactoryImpl.globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ factory);
+ }
+
+ return ClientSessionFactoryImpl.globalScheduledThreadPool;
+ }
+
+ private void setThreadPools()
+ {
+ if (useGlobalPools)
+ {
+ threadPool = ClientSessionFactoryImpl.getGlobalThreadPool();
+
+ scheduledThreadPool = ClientSessionFactoryImpl.getGlobalScheduledThreadPool();
+ }
+ else
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
+ true);
+
+ if (threadPoolMaxSize == -1)
+ {
+ threadPool = Executors.newCachedThreadPool(factory);
+ }
+ else
+ {
+ threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+ }
+
+ factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
+ true);
+
+ scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+ }
+ }
+
+ private synchronized void initialise() throws Exception
+ {
+ if (!readOnly)
+ {
+ setThreadPools();
+
+ instantiateLoadBalancingPolicy();
+
+ if (discoveryAddress != null)
+ {
+ InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
+
+ discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+ discoveryAddress,
+ groupAddress,
+ discoveryPort,
+ discoveryRefreshTimeout);
+
+ discoveryGroup.registerListener(this);
+
+ discoveryGroup.start();
+ }
+ else if (staticConnectors != null)
+ {
+ for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
+ {
+ FailoverManager cm = new FailoverManagerImpl(this,
+ pair.a,
+ pair.b,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+
+ failoverManagerMap.put(pair, cm);
+ }
+
+ updatefailoverManagerArray();
+ }
+ else
+ {
+ throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
+ }
+ readOnly = true;
+ }
+ }
+
+ // Static
+ // ---------------------------------------------------------------------------------------
+
+ // Constructors
+ // ---------------------------------------------------------------------------------
+
+ public ClientSessionFactoryImpl(final ClientSessionFactory other)
+ {
+ discoveryAddress = other.getDiscoveryAddress();
+
+ discoveryPort = other.getDiscoveryPort();
+
+ staticConnectors = other.getStaticConnectors();
+
+ discoveryRefreshTimeout = other.getDiscoveryRefreshTimeout();
+
+ clientFailureCheckPeriod = other.getClientFailureCheckPeriod();
+
+ connectionTTL = other.getConnectionTTL();
+
+ callTimeout = other.getCallTimeout();
+
+ minLargeMessageSize = other.getMinLargeMessageSize();
+
+ consumerWindowSize = other.getConsumerWindowSize();
+
+ consumerMaxRate = other.getConsumerMaxRate();
+
+ confirmationWindowSize = other.getConfirmationWindowSize();
+
+ producerWindowSize = other.getProducerWindowSize();
+
+ producerMaxRate = other.getProducerMaxRate();
+
+ blockOnAcknowledge = other.isBlockOnAcknowledge();
+
+ blockOnDurableSend = other.isBlockOnDurableSend();
+
+ blockOnNonDurableSend = other.isBlockOnNonDurableSend();
+
+ autoGroup = other.isAutoGroup();
+
+ preAcknowledge = other.isPreAcknowledge();
+
+ ackBatchSize = other.getAckBatchSize();
+
+ connectionLoadBalancingPolicyClassName = other.getConnectionLoadBalancingPolicyClassName();
+
+ discoveryInitialWaitTimeout = other.getDiscoveryInitialWaitTimeout();
+
+ useGlobalPools = other.isUseGlobalPools();
+
+ scheduledThreadPoolMaxSize = other.getScheduledThreadPoolMaxSize();
+
+ threadPoolMaxSize = other.getThreadPoolMaxSize();
+
+ retryInterval = other.getRetryInterval();
+
+ retryIntervalMultiplier = other.getRetryIntervalMultiplier();
+
+ maxRetryInterval = other.getMaxRetryInterval();
+
+ reconnectAttempts = other.getReconnectAttempts();
+
+ failoverOnServerShutdown = other.isFailoverOnServerShutdown();
+
+ cacheLargeMessagesClient = other.isCacheLargeMessagesClient();
+
+ initialMessagePacketSize = other.getInitialMessagePacketSize();
+
+ groupID = other.getGroupID();
+ }
+
+ public ClientSessionFactoryImpl()
+ {
+ discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
+
+ clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+ connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+ minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+ consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+ confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+ producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+ producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+ blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+ blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+ blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+ autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+ preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+ ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+ connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+ discoveryInitialWaitTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+
+ useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+ scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+ threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+ retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+ retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+ reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+ failoverOnServerShutdown = HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+ }
+
+ public ClientSessionFactoryImpl(final String discoveryAddress, final int discoveryPort)
+ {
+ this();
+
+ this.discoveryAddress = discoveryAddress;
+
+ this.discoveryPort = discoveryPort;
+ }
+
+ public ClientSessionFactoryImpl(final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
+ {
+ this();
+
+ this.staticConnectors = staticConnectors;
+ }
+
+ public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
+ final TransportConfiguration backupConnectorConfig)
+ {
+ this();
+
+ staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
+
+ staticConnectors.add(new Pair<TransportConfiguration, TransportConfiguration>(connectorConfig,
+ backupConnectorConfig));
+ }
+
+ public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig)
+ {
+ this(connectorConfig, null);
+ }
+
+ // ClientSessionFactory implementation------------------------------------------------------------
+
+ public synchronized boolean isCacheLargeMessagesClient()
+ {
+ return cacheLargeMessagesClient;
+ }
+
+ public synchronized void setCacheLargeMessagesClient(final boolean cached)
+ {
+ cacheLargeMessagesClient = cached;
+ }
+
+ public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getStaticConnectors()
+ {
+ return staticConnectors;
+ }
+
+ public synchronized void setStaticConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
+ {
+ checkWrite();
+
+ this.staticConnectors = staticConnectors;
+ }
+
+ public synchronized long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
+
+ public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+ {
+ checkWrite();
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ }
+
+ public synchronized long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ public synchronized void setConnectionTTL(final long connectionTTL)
+ {
+ checkWrite();
+ this.connectionTTL = connectionTTL;
+ }
+
+ public synchronized long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ public synchronized void setCallTimeout(final long callTimeout)
+ {
+ checkWrite();
+ this.callTimeout = callTimeout;
+ }
+
+ public synchronized int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+ {
+ checkWrite();
+ this.minLargeMessageSize = minLargeMessageSize;
+ }
+
+ public synchronized int getConsumerWindowSize()
+ {
+ return consumerWindowSize;
+ }
+
+ public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+ {
+ checkWrite();
+ this.consumerWindowSize = consumerWindowSize;
+ }
+
+ public synchronized int getConsumerMaxRate()
+ {
+ return consumerMaxRate;
+ }
+
+ public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+ {
+ checkWrite();
+ this.consumerMaxRate = consumerMaxRate;
+ }
+
+ public synchronized int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
+
+ public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+ {
+ checkWrite();
+ this.confirmationWindowSize = confirmationWindowSize;
+ }
+
+ public synchronized int getProducerWindowSize()
+ {
+ return producerWindowSize;
+ }
+
+ public synchronized void setProducerWindowSize(final int producerWindowSize)
+ {
+ checkWrite();
+ this.producerWindowSize = producerWindowSize;
+ }
+
+ public synchronized int getProducerMaxRate()
+ {
+ return producerMaxRate;
+ }
+
+ public synchronized void setProducerMaxRate(final int producerMaxRate)
+ {
+ checkWrite();
+ this.producerMaxRate = producerMaxRate;
+ }
+
+ public synchronized boolean isBlockOnAcknowledge()
+ {
+ return blockOnAcknowledge;
+ }
+
+ public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+ {
+ checkWrite();
+ this.blockOnAcknowledge = blockOnAcknowledge;
+ }
+
+ public synchronized boolean isBlockOnDurableSend()
+ {
+ return blockOnDurableSend;
+ }
+
+ public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+ {
+ checkWrite();
+ this.blockOnDurableSend = blockOnDurableSend;
+ }
+
+ public synchronized boolean isBlockOnNonDurableSend()
+ {
+ return blockOnNonDurableSend;
+ }
+
+ public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+ {
+ checkWrite();
+ this.blockOnNonDurableSend = blockOnNonDurableSend;
+ }
+
+ public synchronized boolean isAutoGroup()
+ {
+ return autoGroup;
+ }
+
+ public synchronized void setAutoGroup(final boolean autoGroup)
+ {
+ checkWrite();
+ this.autoGroup = autoGroup;
+ }
+
+ public synchronized boolean isPreAcknowledge()
+ {
+ return preAcknowledge;
+ }
+
+ public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+ {
+ checkWrite();
+ this.preAcknowledge = preAcknowledge;
+ }
+
+ public synchronized int getAckBatchSize()
+ {
+ return ackBatchSize;
+ }
+
+ public synchronized void setAckBatchSize(final int ackBatchSize)
+ {
+ checkWrite();
+ this.ackBatchSize = ackBatchSize;
+ }
+
+ public synchronized long getDiscoveryInitialWaitTimeout()
+ {
+ return discoveryInitialWaitTimeout;
+ }
+
+ public synchronized void setDiscoveryInitialWaitTimeout(final long initialWaitTimeout)
+ {
+ checkWrite();
+ discoveryInitialWaitTimeout = initialWaitTimeout;
+ }
+
+ public synchronized boolean isUseGlobalPools()
+ {
+ return useGlobalPools;
+ }
+
+ public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+ {
+ checkWrite();
+ this.useGlobalPools = useGlobalPools;
+ }
+
+ public synchronized int getScheduledThreadPoolMaxSize()
+ {
+ return scheduledThreadPoolMaxSize;
+ }
+
+ public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+ {
+ checkWrite();
+ this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+ }
+
+ public synchronized int getThreadPoolMaxSize()
+ {
+ return threadPoolMaxSize;
+ }
+
+ public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+ {
+ checkWrite();
+ this.threadPoolMaxSize = threadPoolMaxSize;
+ }
+
+ public synchronized long getRetryInterval()
+ {
+ return retryInterval;
+ }
+
+ public synchronized void setRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ this.retryInterval = retryInterval;
+ }
+
+ public synchronized long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
+ public synchronized void setMaxRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ maxRetryInterval = retryInterval;
+ }
+
+ public synchronized double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+ {
+ checkWrite();
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ }
+
+ public synchronized int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
+ public synchronized void setReconnectAttempts(final int reconnectAttempts)
+ {
+ checkWrite();
+ this.reconnectAttempts = reconnectAttempts;
+ }
+
+ public synchronized boolean isFailoverOnServerShutdown()
+ {
+ return failoverOnServerShutdown;
+ }
+
+ public synchronized void setFailoverOnServerShutdown(final boolean failoverOnServerShutdown)
+ {
+ checkWrite();
+ this.failoverOnServerShutdown = failoverOnServerShutdown;
+ }
+
+ public synchronized String getConnectionLoadBalancingPolicyClassName()
+ {
+ return connectionLoadBalancingPolicyClassName;
+ }
+
+ public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+ {
+ checkWrite();
+ connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+ }
+
+ public synchronized String getDiscoveryAddress()
+ {
+ return discoveryAddress;
+ }
+
+ public synchronized void setDiscoveryAddress(final String discoveryAddress)
+ {
+ checkWrite();
+ this.discoveryAddress = discoveryAddress;
+ }
+
+ public synchronized int getDiscoveryPort()
+ {
+ return discoveryPort;
+ }
+
+ public synchronized void setDiscoveryPort(final int discoveryPort)
+ {
+ checkWrite();
+ this.discoveryPort = discoveryPort;
+ }
+
+ public synchronized long getDiscoveryRefreshTimeout()
+ {
+ return discoveryRefreshTimeout;
+ }
+
+ public void addInterceptor(final Interceptor interceptor)
+ {
+ interceptors.add(interceptor);
+ }
+
+ public boolean removeInterceptor(final Interceptor interceptor)
+ {
+ return interceptors.remove(interceptor);
+ }
+
+ public synchronized void setDiscoveryRefreshTimeout(final long discoveryRefreshTimeout)
+ {
+ checkWrite();
+ this.discoveryRefreshTimeout = discoveryRefreshTimeout;
+ }
+
+ public synchronized int getInitialMessagePacketSize()
+ {
+ return initialMessagePacketSize;
+ }
+
+ public synchronized void setInitialMessagePacketSize(final int size)
+ {
+ checkWrite();
+ initialMessagePacketSize = size;
+ }
+
+ public ClientSession createSession(final String username,
+ final String password,
+ final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final int ackBatchSize) throws HornetQException
+ {
+ return createSessionInternal(username,
+ password,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ ackBatchSize);
+ }
+
+ public ClientSession createSession(final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final int ackBatchSize) throws HornetQException
+ {
+ return createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
+ }
+
+ public ClientSession createXASession() throws HornetQException
+ {
+ return createSessionInternal(null, null, true, false, false, preAcknowledge, ackBatchSize);
+ }
+
+ public ClientSession createTransactedSession() throws HornetQException
+ {
+ return createSessionInternal(null, null, false, false, false, preAcknowledge, ackBatchSize);
+ }
+
+ public ClientSession createSession() throws HornetQException
+ {
+ return createSessionInternal(null, null, false, true, true, preAcknowledge, ackBatchSize);
+ }
+
+ public ClientSession createSession(final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
+ {
+ return createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
+ }
+
+ public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
+ {
+ return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
+ }
+
+ public ClientSession createSession(final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge) throws HornetQException
+ {
+ return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
+ }
+
+ public int numSessions()
+ {
+ int num = 0;
+
+ for (FailoverManager failoverManager : failoverManagerMap.values())
+ {
+ num += failoverManager.numSessions();
+ }
+
+ return num;
+ }
+
+ public int numConnections()
+ {
+ int num = 0;
+
+ for (FailoverManager failoverManager : failoverManagerMap.values())
+ {
+ num += failoverManager.numConnections();
+ }
+
+ return num;
+ }
+
+ public void close()
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ if (discoveryGroup != null)
+ {
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ ClientSessionFactoryImpl.log.error("Failed to stop discovery group", e);
+ }
+ }
+
+ for (FailoverManager failoverManager : failoverManagerMap.values())
+ {
+ failoverManager.causeExit();
+ }
+
+ failoverManagerMap.clear();
+
+ if (!useGlobalPools)
+ {
+ if (threadPool != null)
+ {
+ threadPool.shutdown();
+
+ try
+ {
+ if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ ClientSessionFactoryImpl.log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ if (scheduledThreadPool != null)
+ {
+ scheduledThreadPool.shutdown();
+
+ try
+ {
+ if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ ClientSessionFactoryImpl.log.warn("Timed out waiting for scheduled pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+
+ closed = true;
+ }
+
+ public ClientSessionFactory copy()
+ {
+ return new ClientSessionFactoryImpl(this);
+ }
+
+ public void setGroupID(final String groupID)
+ {
+ this.groupID = groupID;
+ }
+
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
+ // DiscoveryListener implementation --------------------------------------------------------
+
+ public synchronized void connectorsChanged()
+ {
+ receivedBroadcast = true;
+
+ Map<String, DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntryMap();
+
+ Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
+
+ for (DiscoveryEntry entry : newConnectors.values())
+ {
+ connectorSet.add(entry.getConnectorPair());
+ }
+
+ Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager>> iter = failoverManagerMap.entrySet()
+ .iterator();
+ while (iter.hasNext())
+ {
+ Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, FailoverManager> entry = iter.next();
+
+ if (!connectorSet.contains(entry.getKey()))
+ {
+ // failoverManager no longer there - we should remove it
+
+ iter.remove();
+ }
+ }
+
+ for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorSet)
+ {
+ if (!failoverManagerMap.containsKey(connectorPair))
+ {
+ // Create a new failoverManager
+
+ FailoverManager failoverManager = new FailoverManagerImpl(this,
+ connectorPair.a,
+ connectorPair.b,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+
+ failoverManagerMap.put(connectorPair, failoverManager);
+ }
+ }
+
+ updatefailoverManagerArray();
+ }
+
+ public FailoverManager[] getFailoverManagers()
+ {
+ return failoverManagerArray;
+ }
+
+ // Protected ------------------------------------------------------------------------------
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ close();
+
+ super.finalize();
+ }
+
+ // Private --------------------------------------------------------------------------------
+
+ private void checkWrite()
+ {
+ if (readOnly)
+ {
+ throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
+ }
+ }
+
+ private ClientSession createSessionInternal(final String username,
+ final String password,
+ final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final int ackBatchSize) throws HornetQException
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Cannot create session, factory is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ if (discoveryGroup != null && !receivedBroadcast)
+ {
+ boolean ok = discoveryGroup.waitForBroadcast(discoveryInitialWaitTimeout);
+
+ if (!ok)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive initial broadcast from discovery group");
+ }
+ }
+
+ synchronized (this)
+ {
+ int pos = loadBalancingPolicy.select(failoverManagerArray.length);
+
+ FailoverManager failoverManager = failoverManagerArray[pos];
+
+ ClientSession session = failoverManager.createSession(username,
+ password,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ ackBatchSize,
+ cacheLargeMessagesClient,
+ minLargeMessageSize,
+ blockOnAcknowledge,
+ autoGroup,
+ confirmationWindowSize,
+ producerWindowSize,
+ consumerWindowSize,
+ producerMaxRate,
+ consumerMaxRate,
+ blockOnNonDurableSend,
+ blockOnDurableSend,
+ initialMessagePacketSize,
+ groupID);
+
+ return session;
+ }
+ }
+
+ private void instantiateLoadBalancingPolicy()
+ {
+ if (connectionLoadBalancingPolicyClassName == null)
+ {
+ throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
+ }
+
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
+ "\"",
+ e);
+ }
+ }
+
+ private synchronized void updatefailoverManagerArray()
+ {
+ failoverManagerArray = new FailoverManager[failoverManagerMap.size()];
+
+ failoverManagerMap.values().toArray(failoverManagerArray);
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -24,8 +24,8 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.cluster.BridgeConfiguration;
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -18,8 +18,8 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
/**
Modified: trunk/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java
===================================================================
--- trunk/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -29,10 +29,10 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientRequestor;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.integration.transports.netty.NettyConnectorFactory;
import org.hornetq.tests.util.SpawnedVMSupport;
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -18,7 +18,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -28,8 +28,8 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -20,8 +20,8 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -14,8 +14,8 @@
package org.hornetq.tests.integration.cluster.reattach;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -26,6 +26,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -21,10 +21,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AcceptorControl;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlUsingCoreTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlUsingCoreTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -18,10 +18,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AcceptorControl;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
/**
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -22,10 +22,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
Modified: trunk/tests/src/org/hornetq/tests/integration/management/BroadcastGroupControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BroadcastGroupControlUsingCoreTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BroadcastGroupControlUsingCoreTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -16,10 +16,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.BroadcastGroupControl;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
/**
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -18,10 +18,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ClusterConnectionControl;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
/**
Modified: trunk/tests/src/org/hornetq/tests/integration/management/DiscoveryGroupControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/DiscoveryGroupControlUsingCoreTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/DiscoveryGroupControlUsingCoreTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -16,10 +16,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.DiscoveryGroupControl;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
/**
Modified: trunk/tests/src/org/hornetq/tests/integration/management/DivertControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/DivertControlUsingCoreTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/DivertControlUsingCoreTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -16,10 +16,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.DivertControl;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
/**
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -16,10 +16,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.HornetQServerControl;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -19,10 +19,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/NetworkAddressTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/NetworkAddressTestBase.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/NetworkAddressTestBase.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -29,8 +29,8 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.config.Configuration;
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/SynchronousCloseTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -18,8 +18,8 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
Modified: trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -20,8 +20,8 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Packet;
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-01-06 20:34:43 UTC (rev 8757)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-01-07 09:38:09 UTC (rev 8758)
@@ -25,8 +25,8 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactoryImpl;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
14 years, 11 months
JBoss hornetq SVN: r8757 - tags.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-06 15:34:43 -0500 (Wed, 06 Jan 2010)
New Revision: 8757
Added:
tags/HornetQ_2_0_0_CR3/
Log:
tag 2.0.0.CR3 release
Copied: tags/HornetQ_2_0_0_CR3 (from rev 8756, tags/HornetQ_2_0_0_CR3_pending)
14 years, 11 months
JBoss hornetq SVN: r8756 - tags.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-06 12:18:17 -0500 (Wed, 06 Jan 2010)
New Revision: 8756
Added:
tags/HornetQ_2_0_0_CR3_pending/
Log:
pending tag for 2.0.0.CR3 release
Copied: tags/HornetQ_2_0_0_CR3_pending (from rev 8755, trunk)
14 years, 11 months
JBoss hornetq SVN: r8755 - trunk/examples/javaee/xarecovery/server.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-06 11:58:05 -0500 (Wed, 06 Jan 2010)
New Revision: 8755
Modified:
trunk/examples/javaee/xarecovery/server/jbossts-properties.xml
Log:
fixed xarecovery example
* fixed package of HornetQXAResourceRecovery
Modified: trunk/examples/javaee/xarecovery/server/jbossts-properties.xml
===================================================================
--- trunk/examples/javaee/xarecovery/server/jbossts-properties.xml 2010-01-06 16:46:41 UTC (rev 8754)
+++ trunk/examples/javaee/xarecovery/server/jbossts-properties.xml 2010-01-06 16:58:05 UTC (rev 8755)
@@ -240,7 +240,7 @@
com.arjuna.ats.internal.jta.transaction.jts.UserTransactionImple
-->
<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
- value="org.hornetq.api.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>
+ value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>
</properties>
<properties depends="arjuna,txoj,jta" name="recoverymanager">
<!--
14 years, 11 months
JBoss hornetq SVN: r8754 - trunk/src/config/jboss-as.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-01-06 11:46:41 -0500 (Wed, 06 Jan 2010)
New Revision: 8754
Modified:
trunk/src/config/jboss-as/build-as4.xml
trunk/src/config/jboss-as/build-as5.xml
Log:
Remove ia64
Modified: trunk/src/config/jboss-as/build-as4.xml
===================================================================
--- trunk/src/config/jboss-as/build-as4.xml 2010-01-06 16:46:09 UTC (rev 8753)
+++ trunk/src/config/jboss-as/build-as4.xml 2010-01-06 16:46:41 UTC (rev 8754)
@@ -39,7 +39,6 @@
<mkdir dir="${dest.bin.dir}/native"/>
<copy todir="${dest.bin.dir}/native" file="${src.bin.dir}/libHornetQAIO32.so"/>
<copy todir="${dest.bin.dir}/native" file="${src.bin.dir}/libHornetQAIO64.so"/>
- <copy todir="${dest.bin.dir}/native" file="${src.bin.dir}/libHornetQAIO_ia64.so"/>
</target>
<target name="create-profile">
Modified: trunk/src/config/jboss-as/build-as5.xml
===================================================================
--- trunk/src/config/jboss-as/build-as5.xml 2010-01-06 16:46:09 UTC (rev 8753)
+++ trunk/src/config/jboss-as/build-as5.xml 2010-01-06 16:46:41 UTC (rev 8754)
@@ -37,10 +37,8 @@
</antcall>
<mkdir dir="${dest.bin.dir}/META-INF/lib/linux2/x86"/>
<mkdir dir="${dest.bin.dir}/META-INF/lib/linux2/x64"/>
- <mkdir dir="${dest.bin.dir}/META-INF/lib/linux2/i64"/>
<copy todir="${dest.bin.dir}/META-INF/lib/linux2/x86" file="${src.bin.dir}/libHornetQAIO32.so"/>
<copy todir="${dest.bin.dir}/META-INF/lib/linux2/x64" file="${src.bin.dir}/libHornetQAIO64.so"/>
- <copy todir="${dest.bin.dir}/META-INF/lib/linux2/i64" file="${src.bin.dir}/libHornetQAIO_ia64.so"/>
</target>
<target name="create-profile">
14 years, 11 months
JBoss hornetq SVN: r8753 - tags.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-06 11:46:09 -0500 (Wed, 06 Jan 2010)
New Revision: 8753
Removed:
tags/HornetQ_2_0_0_CR3_pending/
Log:
delete previous pending tag for 2.0.0.CR3 release
14 years, 11 months
JBoss hornetq SVN: r8752 - trunk/tests/src/org/hornetq/tests/integration/jms/server/management.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-06 11:23:37 -0500 (Wed, 06 Jan 2010)
New Revision: 8752
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
Log:
fixed management tests
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-01-06 15:55:03 UTC (rev 8751)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-01-06 16:23:37 UTC (rev 8752)
@@ -86,7 +86,7 @@
@Override
protected JMSServerControl createManagementControl() throws Exception
{
- HornetQQueue managementQueue = (HornetQQueue) HornetQJMSClient.createQueue(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS.toString());
+ HornetQQueue managementQueue = (HornetQQueue) HornetQJMSClient.createQueue("hornetq.management");
final JMSMessagingProxy proxy = new JMSMessagingProxy(session, managementQueue, ResourceNames.JMS_SERVER);
return new JMSServerControl()
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-01-06 15:55:03 UTC (rev 8751)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-01-06 16:23:37 UTC (rev 8752)
@@ -359,7 +359,7 @@
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
- HornetQQueue managementQueue = (HornetQQueue)HornetQJMSClient.createQueue(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS.toString());
+ HornetQQueue managementQueue = (HornetQQueue)HornetQJMSClient.createQueue("hornetq.management");
proxy = new JMSMessagingProxy(session, managementQueue, ResourceNames.JMS_TOPIC + topic.getTopicName());
}
14 years, 11 months
JBoss hornetq SVN: r8751 - tags.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-06 10:55:03 -0500 (Wed, 06 Jan 2010)
New Revision: 8751
Added:
tags/HornetQ_2_0_0_CR3_pending/
Log:
pending tag for 2.0.0.CR3 release
Copied: tags/HornetQ_2_0_0_CR3_pending (from rev 8750, trunk)
14 years, 11 months