[hornetq-commits] JBoss hornetq SVN: r10150 - in branches/HORNETQ-316: src/main/org/hornetq/api/core/client and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Jan 27 09:06:21 EST 2011
Author: igarashitm
Date: 2011-01-27 09:06:20 -0500 (Thu, 27 Jan 2011)
New Revision: 10150
Added:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
Removed:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Modified:
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
Log:
https://issues.jboss.org/browse/HORNETQ-316
first commit. just implemented pluggable discovery strategy and removed compile errors. not yet debugged.
Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -1,5 +1,5 @@
/*
- * Copyright 2009 Red Hat, Inc.
+ * Copyright 2010 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -14,169 +14,45 @@
package org.hornetq.api.core;
import java.io.Serializable;
-
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.UUIDGenerator;
-
+import java.util.Map;
+import java.util.Properties;
/**
* A DiscoveryGroupConfiguration
*
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 18 Nov 2008 08:47:30
- *
- *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
*/
public class DiscoveryGroupConfiguration implements Serializable
{
- private static final long serialVersionUID = 8657206421727863400L;
-
- private static final Logger log = Logger.getLogger(DiscoveryGroupConfiguration.class);
+ private static final long serialVersionUID = 2877108926493109407L;
-
- private String name;
+ private final String serverLocatorClassName;
- private String localBindAddress;
-
- private String groupAddress;
-
- private int groupPort;
-
- private long refreshTimeout;
+ private final String name;
- private long discoveryInitialWaitTimeout;
-
- public DiscoveryGroupConfiguration(final String name,
- final String localBindAddress,
- final String groupAddress,
- final int groupPort,
- final long refreshTimeout,
- final long discoveryInitialWaitTimeout)
- {
- this.name = name;
- this.groupAddress = groupAddress;
- this.localBindAddress = localBindAddress;
- this.groupPort = groupPort;
- this.refreshTimeout = refreshTimeout;
- this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
- }
-
- public DiscoveryGroupConfiguration(final String groupAddress,
- final int groupPort)
- {
- this(UUIDGenerator.getInstance().generateStringUUID(), null, groupAddress, groupPort, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
- }
-
- public String getName()
- {
- return name;
- }
+ private final Map<String, Object> params;
- public String getLocalBindAddress()
+ public DiscoveryGroupConfiguration(final String clazz, final Map<String, Object> params, final String name)
{
- return localBindAddress;
- }
-
- public String getGroupAddress()
- {
- return groupAddress;
- }
-
- public int getGroupPort()
- {
- return groupPort;
- }
-
- public long getRefreshTimeout()
- {
- return refreshTimeout;
- }
-
- /**
- * @param name the name to set
- */
- public void setName(final String name)
- {
+ this.serverLocatorClassName = clazz;
+
+ this.params = params;
+
this.name = name;
}
- /**
- * @param localBindAddress the localBindAddress to set
- */
- public void setLocalBindAdress(final String localBindAddress)
+ public String getServerLocatorClassName()
{
- this.localBindAddress = localBindAddress;
+ return this.serverLocatorClassName;
}
- /**
- * @param groupAddress the groupAddress to set
- */
- public void setGroupAddress(final String groupAddress)
+ public Map<String, Object> getParams()
{
- this.groupAddress = groupAddress;
+ return this.params;
}
- /**
- * @param groupPort the groupPort to set
- */
- public void setGroupPort(final int groupPort)
+ public String getName()
{
- this.groupPort = groupPort;
+ return this.name;
}
-
- /**
- * @param refreshTimeout the refreshTimeout to set
- */
- public void setRefreshTimeout(final long refreshTimeout)
- {
- this.refreshTimeout = refreshTimeout;
- }
-
- /**
- * @return the discoveryInitialWaitTimeout
- */
- public long getDiscoveryInitialWaitTimeout()
- {
- return discoveryInitialWaitTimeout;
- }
-
- /**
- * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout to set
- */
- public void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout)
- {
- this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- DiscoveryGroupConfiguration that = (DiscoveryGroupConfiguration) o;
-
- if (discoveryInitialWaitTimeout != that.discoveryInitialWaitTimeout) return false;
- if (groupPort != that.groupPort) return false;
- if (refreshTimeout != that.refreshTimeout) return false;
- if (groupAddress != null ? !groupAddress.equals(that.groupAddress) : that.groupAddress != null) return false;
- if (localBindAddress != null ? !localBindAddress.equals(that.localBindAddress) : that.localBindAddress != null)
- return false;
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result = name != null ? name.hashCode() : 0;
- result = 31 * result + (localBindAddress != null ? localBindAddress.hashCode() : 0);
- result = 31 * result + (groupAddress != null ? groupAddress.hashCode() : 0);
- result = 31 * result + groupPort;
- result = 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>> 32));
- result = 31 * result + (int) (discoveryInitialWaitTimeout ^ (discoveryInitialWaitTimeout >>> 32));
- return result;
- }
+
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -13,13 +13,15 @@
package org.hornetq.api.core.client;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
-import java.util.List;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
/**
* Utility class for creating HornetQ {@link ClientSessionFactory} objects.
@@ -107,21 +109,34 @@
*/
public static ServerLocator createServerLocatorWithoutHA(TransportConfiguration... transportConfigurations)
{
- return new ServerLocatorImpl(false, transportConfigurations);
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, transportConfigurations);
+ DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, null);
+ return createServerLocatorWithoutHA(config);
}
/**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
*
- * The UDP address and port are used to listen for live servers in the cluster
- *
- * @param discoveryAddress The UDP group address to listen for updates
- * @param discoveryPort the UDP port to listen for updates
+ * @param groupConfiguration The configuration for server discovery
* @return the ServerLocator
*/
public static ServerLocator createServerLocatorWithoutHA(final DiscoveryGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(false, groupConfiguration);
+ ServerLocator serverLocator = null;
+ String className = groupConfiguration.getServerLocatorClassName();
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(Boolean.class, DiscoveryGroupConfiguration.class);
+ serverLocator = (ServerLocator)constructor.newInstance(Boolean.FALSE, groupConfiguration);
+ }
+ catch(Exception e)
+ {
+ new HornetQException(HornetQException.INTERNAL_ERROR, "Could not instantiate ServerLocator implementation class: " + className, e);
+ }
+ return serverLocator;
}
/**
@@ -135,9 +150,12 @@
*/
public static ServerLocator createServerLocatorWithHA(TransportConfiguration... initialServers)
{
- return new ServerLocatorImpl(true, initialServers);
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, initialServers);
+ DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, null);
+ return createServerLocatorWithHA(config);
}
-
+
/**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
* The discoveryAddress and discoveryPort parameters in this method are used to listen for UDP broadcasts which contain connection information for members of the cluster.
@@ -150,7 +168,20 @@
*/
public static ServerLocator createServerLocatorWithHA(final DiscoveryGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(true, groupConfiguration);
+ ServerLocator serverLocator = null;
+ String className = groupConfiguration.getServerLocatorClassName();
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(Boolean.class, DiscoveryGroupConfiguration.class);
+ serverLocator = (ServerLocator)constructor.newInstance(Boolean.TRUE, groupConfiguration);
+ }
+ catch(Exception e)
+ {
+ new HornetQException(HornetQException.INTERNAL_ERROR, "Could not instantiate ServerLocator implementation class: " + className, e);
+ }
+ return serverLocator;
}
Copied: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java (from rev 10124, branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java)
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,1026 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A AbstractServerLocator
+ *
+ * @author Tim Fox
+ */
+public abstract class AbstractServerLocator implements ServerLocatorInternal, Serializable
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ private static final Logger log = Logger.getLogger(AbstractServerLocator.class);
+
+ private final boolean ha;
+
+ private boolean finalizeCheck = true;
+
+ private boolean clusterConnection;
+
+ private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
+
+ private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
+
+ private TransportConfiguration[] initialConnectors;
+
+ private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+
+ private Topology topology = new Topology();
+
+ private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
+
+ private boolean receivedTopology;
+
+ private boolean compressLargeMessage;
+
+ private ExecutorService threadPool;
+
+ private ScheduledExecutorService scheduledThreadPool;
+
+ private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+ private boolean readOnly;
+
+ // Settable attributes:
+
+ private boolean cacheLargeMessagesClient;
+
+ private long clientFailureCheckPeriod;
+
+ private long connectionTTL;
+
+ private long callTimeout;
+
+ private int minLargeMessageSize;
+
+ private int consumerWindowSize;
+
+ private int consumerMaxRate;
+
+ private int confirmationWindowSize;
+
+ private int producerWindowSize;
+
+ private int producerMaxRate;
+
+ private boolean blockOnAcknowledge;
+
+ private boolean blockOnDurableSend;
+
+ private boolean blockOnNonDurableSend;
+
+ private boolean autoGroup;
+
+ private boolean preAcknowledge;
+
+ private String connectionLoadBalancingPolicyClassName;
+
+ private int ackBatchSize;
+
+ private boolean useGlobalPools;
+
+ private int scheduledThreadPoolMaxSize;
+
+ private int threadPoolMaxSize;
+
+ private long retryInterval;
+
+ private double retryIntervalMultiplier;
+
+ private long maxRetryInterval;
+
+ private int reconnectAttempts;
+
+ private int initialConnectAttempts;
+
+ private boolean failoverOnInitialConnection;
+
+ private int initialMessagePacketSize;
+
+ private volatile boolean closed;
+
+ private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+
+ private static ExecutorService globalThreadPool;
+
+ private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private String groupID;
+
+ private String nodeID;
+
+ private TransportConfiguration clusterTransportConfiguration;
+
+ private boolean backup;
+
+ private final Exception e = new Exception();
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
+
+ private static synchronized ExecutorService getGlobalThreadPool()
+ {
+ if (globalThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
+
+ globalThreadPool = Executors.newCachedThreadPool(factory);
+ }
+
+ return globalThreadPool;
+ }
+
+ public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+ {
+ if (globalScheduledThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
+ true,
+ getThisClassLoader());
+
+ globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+ factory);
+ }
+
+ return globalScheduledThreadPool;
+ }
+
+ protected void setThreadPools()
+ {
+ if (useGlobalPools)
+ {
+ threadPool = getGlobalThreadPool();
+
+ scheduledThreadPool = getGlobalScheduledThreadPool();
+ }
+ else
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ if (threadPoolMaxSize == -1)
+ {
+ threadPool = Executors.newCachedThreadPool(factory);
+ }
+ else
+ {
+ threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+ }
+
+ factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+ }
+ }
+
+ private static ClassLoader getThisClassLoader()
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
+ }
+
+ protected void instantiateLoadBalancingPolicy()
+ {
+ if (connectionLoadBalancingPolicyClassName == null)
+ {
+ throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
+ }
+
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+ return null;
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
+ "\"",
+ e);
+ }
+ }
+ });
+ }
+
+ public AbstractServerLocator(final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ e.fillInStackTrace();
+ this.ha = useHA;
+
+ this.discoveryGroupConfiguration = discoveryGroupConfiguration;
+
+ this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
+
+ clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+ connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+ minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+ consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+ confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+ producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+ producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+ blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+ blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+ blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+ autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+ preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+ ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+ connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+ useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+ scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+ threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+ retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+ retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+ reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+ initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
+
+ failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
+
+ clusterConnection = false;
+ }
+
+ protected TransportConfiguration selectConnector()
+ {
+ if (receivedTopology)
+ {
+ int pos = loadBalancingPolicy.select(topologyArray.length);
+
+ Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
+
+ return pair.a;
+ }
+ else
+ {
+ // Get from initialconnectors
+
+ int pos = loadBalancingPolicy.select(initialConnectors.length);
+
+ return initialConnectors[pos];
+ }
+ }
+
+ protected int getConnectorLength()
+ {
+ if(receivedTopology)
+ {
+ return topologyArray.length;
+ }
+ else if(initialConnectors == null)
+ {
+ return -1;
+ }
+ else
+ {
+ return initialConnectors.length;
+ }
+ }
+
+ public void disableFinalizeCheck()
+ {
+ finalizeCheck = false;
+ }
+
+ public synchronized boolean isHA()
+ {
+ return ha;
+ }
+
+ public synchronized boolean isCacheLargeMessagesClient()
+ {
+ return cacheLargeMessagesClient;
+ }
+
+ public synchronized void setCacheLargeMessagesClient(final boolean cached)
+ {
+ cacheLargeMessagesClient = cached;
+ }
+
+ public synchronized long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
+
+ public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+ {
+ checkWrite();
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ }
+
+ public synchronized long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ public synchronized void setConnectionTTL(final long connectionTTL)
+ {
+ checkWrite();
+ this.connectionTTL = connectionTTL;
+ }
+
+ public synchronized long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ public synchronized void setCallTimeout(final long callTimeout)
+ {
+ checkWrite();
+ this.callTimeout = callTimeout;
+ }
+
+ public synchronized int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+ {
+ checkWrite();
+ this.minLargeMessageSize = minLargeMessageSize;
+ }
+
+ public synchronized int getConsumerWindowSize()
+ {
+ return consumerWindowSize;
+ }
+
+ public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+ {
+ checkWrite();
+ this.consumerWindowSize = consumerWindowSize;
+ }
+
+ public synchronized int getConsumerMaxRate()
+ {
+ return consumerMaxRate;
+ }
+
+ public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+ {
+ checkWrite();
+ this.consumerMaxRate = consumerMaxRate;
+ }
+
+ public synchronized int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
+
+ public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+ {
+ checkWrite();
+ this.confirmationWindowSize = confirmationWindowSize;
+ }
+
+ public synchronized int getProducerWindowSize()
+ {
+ return producerWindowSize;
+ }
+
+ public synchronized void setProducerWindowSize(final int producerWindowSize)
+ {
+ checkWrite();
+ this.producerWindowSize = producerWindowSize;
+ }
+
+ public synchronized int getProducerMaxRate()
+ {
+ return producerMaxRate;
+ }
+
+ public synchronized void setProducerMaxRate(final int producerMaxRate)
+ {
+ checkWrite();
+ this.producerMaxRate = producerMaxRate;
+ }
+
+ public synchronized boolean isBlockOnAcknowledge()
+ {
+ return blockOnAcknowledge;
+ }
+
+ public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+ {
+ checkWrite();
+ this.blockOnAcknowledge = blockOnAcknowledge;
+ }
+
+ public synchronized boolean isBlockOnDurableSend()
+ {
+ return blockOnDurableSend;
+ }
+
+ public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+ {
+ checkWrite();
+ this.blockOnDurableSend = blockOnDurableSend;
+ }
+
+ public synchronized boolean isBlockOnNonDurableSend()
+ {
+ return blockOnNonDurableSend;
+ }
+
+ public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+ {
+ checkWrite();
+ this.blockOnNonDurableSend = blockOnNonDurableSend;
+ }
+
+ public synchronized boolean isAutoGroup()
+ {
+ return autoGroup;
+ }
+
+ public synchronized void setAutoGroup(final boolean autoGroup)
+ {
+ checkWrite();
+ this.autoGroup = autoGroup;
+ }
+
+ public synchronized boolean isPreAcknowledge()
+ {
+ return preAcknowledge;
+ }
+
+ public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+ {
+ checkWrite();
+ this.preAcknowledge = preAcknowledge;
+ }
+
+ public synchronized int getAckBatchSize()
+ {
+ return ackBatchSize;
+ }
+
+ public synchronized void setAckBatchSize(final int ackBatchSize)
+ {
+ checkWrite();
+ this.ackBatchSize = ackBatchSize;
+ }
+
+ public synchronized boolean isUseGlobalPools()
+ {
+ return useGlobalPools;
+ }
+
+ public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+ {
+ checkWrite();
+ this.useGlobalPools = useGlobalPools;
+ }
+
+ public synchronized int getScheduledThreadPoolMaxSize()
+ {
+ return scheduledThreadPoolMaxSize;
+ }
+
+ public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+ {
+ checkWrite();
+ this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+ }
+
+ public synchronized int getThreadPoolMaxSize()
+ {
+ return threadPoolMaxSize;
+ }
+
+ public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+ {
+ checkWrite();
+ this.threadPoolMaxSize = threadPoolMaxSize;
+ }
+
+ public synchronized long getRetryInterval()
+ {
+ return retryInterval;
+ }
+
+ public synchronized void setRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ this.retryInterval = retryInterval;
+ }
+
+ public synchronized long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
+ public synchronized void setMaxRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ maxRetryInterval = retryInterval;
+ }
+
+ public synchronized double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+ {
+ checkWrite();
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ }
+
+ public synchronized int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
+ public synchronized void setReconnectAttempts(final int reconnectAttempts)
+ {
+ checkWrite();
+ this.reconnectAttempts = reconnectAttempts;
+ }
+
+ public void setInitialConnectAttempts(int initialConnectAttempts)
+ {
+ checkWrite();
+ this.initialConnectAttempts = initialConnectAttempts;
+ }
+
+ public int getInitialConnectAttempts()
+ {
+ return initialConnectAttempts;
+ }
+
+ public synchronized boolean isFailoverOnInitialConnection()
+ {
+ return this.failoverOnInitialConnection;
+ }
+
+ public synchronized void setFailoverOnInitialConnection(final boolean failover)
+ {
+ checkWrite();
+ this.failoverOnInitialConnection = failover;
+ }
+
+ public synchronized String getConnectionLoadBalancingPolicyClassName()
+ {
+ return connectionLoadBalancingPolicyClassName;
+ }
+
+ public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+ {
+ checkWrite();
+ connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+ }
+
+ public TransportConfiguration[] getStaticTransportConfigurations()
+ {
+ return this.initialConnectors;
+ }
+
+ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
+ {
+ return discoveryGroupConfiguration;
+ }
+
+ public void addInterceptor(final Interceptor interceptor)
+ {
+ interceptors.add(interceptor);
+ }
+
+ public boolean removeInterceptor(final Interceptor interceptor)
+ {
+ return interceptors.remove(interceptor);
+ }
+
+ public synchronized int getInitialMessagePacketSize()
+ {
+ return initialMessagePacketSize;
+ }
+
+ public synchronized void setInitialMessagePacketSize(final int size)
+ {
+ checkWrite();
+ initialMessagePacketSize = size;
+ }
+
+ public void setGroupID(final String groupID)
+ {
+ checkWrite();
+ this.groupID = groupID;
+ }
+
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
+ public boolean isCompressLargeMessage()
+ {
+ return compressLargeMessage;
+ }
+
+ public void setCompressLargeMessage(boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
+ private void checkWrite()
+ {
+ if (readOnly)
+ {
+ throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
+ }
+ }
+
+ public void setNodeID(String nodeID)
+ {
+ this.nodeID = nodeID;
+ }
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public void setClusterConnection(boolean clusterConnection)
+ {
+ this.clusterConnection = clusterConnection;
+ }
+
+ public boolean isClusterConnection()
+ {
+ return clusterConnection;
+ }
+
+ public TransportConfiguration getClusterTransportConfiguration()
+ {
+ return clusterTransportConfiguration;
+ }
+
+ public void setClusterTransportConfiguration(TransportConfiguration tc)
+ {
+ this.clusterTransportConfiguration = tc;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public void setBackup(boolean backup)
+ {
+ this.backup = backup;
+ }
+
+ protected boolean isReceivedTopology()
+ {
+ return this.receivedTopology;
+ }
+
+ protected boolean doFinalizeCheck()
+ {
+ return this.finalizeCheck;
+ }
+
+ protected ConnectionLoadBalancingPolicy getLoadBalancingPolicy()
+ {
+ return this.loadBalancingPolicy;
+ }
+
+ protected ExecutorService getThreadPool()
+ {
+ return threadPool;
+ }
+
+ protected ScheduledExecutorService getScheduledThreadPool()
+ {
+ return scheduledThreadPool;
+ }
+
+ protected List<Interceptor> getInterceptors()
+ {
+ return this.interceptors;
+ }
+
+ protected TransportConfiguration[] getInitialConnectors()
+ {
+ return this.initialConnectors;
+ }
+
+ protected void setInitialConnectors(TransportConfiguration[] initialConnectors)
+ {
+ this.initialConnectors = initialConnectors;
+ }
+
+ protected boolean isReadOnly()
+ {
+ return this.readOnly;
+ }
+
+ protected void setReadOnly(boolean readOnly)
+ {
+ this.readOnly = readOnly;
+ }
+
+ protected boolean isClosed()
+ {
+ return this.closed;
+ }
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ if (finalizeCheck)
+ {
+ close();
+ }
+
+ super.finalize();
+ }
+
+ public void close()
+ {
+ if (closed)
+ {
+ return;
+ }
+
+
+ for (ClientSessionFactory factory : factories)
+ {
+ factory.close();
+ }
+
+ factories.clear();
+
+ if (!useGlobalPools)
+ {
+ if (threadPool != null)
+ {
+ threadPool.shutdown();
+
+ try
+ {
+ if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ if (scheduledThreadPool != null)
+ {
+ scheduledThreadPool.shutdown();
+
+ try
+ {
+ if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for scheduled pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+ readOnly = false;
+
+ closed = true;
+ }
+
+ public synchronized void notifyNodeDown(final String nodeID)
+ {
+ boolean removed = false;
+
+ if (!ha)
+ {
+ return;
+ }
+
+ removed = topology.removeMember(nodeID);
+
+ if (!topology.isEmpty())
+ {
+ updateArraysAndPairs();
+
+ if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+ {
+ receivedTopology = false;
+ }
+ }
+ else
+ {
+ topologyArray = null;
+
+ receivedTopology = false;
+ }
+
+ if (removed)
+ {
+ for (ClusterTopologyListener listener : topologyListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
+ }
+ }
+
+ public synchronized void notifyNodeUp(final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean last)
+ {
+ if (!ha)
+ {
+ return;
+ }
+
+ topology.addMember(nodeID, new TopologyMember(connectorPair));
+
+ TopologyMember actMember = topology.getMember(nodeID);
+
+ if (actMember.getConnector().a != null && actMember.getConnector().b != null)
+ {
+ for (ClientSessionFactory factory : factories)
+ {
+ ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+ actMember.getConnector().b);
+ }
+ }
+
+ if (connectorPair.a != null)
+ {
+ updateArraysAndPairs();
+ }
+
+ if (last)
+ {
+ receivedTopology = true;
+ }
+
+ for (ClusterTopologyListener listener : topologyListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last);
+ }
+
+ // Notify if waiting on getting topology
+ notify();
+ }
+
+ private void updateArraysAndPairs()
+ {
+ topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
+ topology.members());
+
+ int count = 0;
+ for (TopologyMember pair : topology.getMembers())
+ {
+ topologyArray[count++] = pair.getConnector();
+ }
+ }
+
+ public synchronized void factoryClosed(final ClientSessionFactory factory)
+ {
+ factories.remove(factory);
+
+ if (factories.isEmpty())
+ {
+ // Go back to using the broadcast or static list
+
+ receivedTopology = false;
+
+ topology = null;
+
+ }
+ }
+
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topologyListeners.add(listener);
+ if(topology.members() > 0)
+ {
+ System.out.println("ServerLocatorImpl.addClusterTopologyListener");
+ }
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topologyListeners.remove(listener);
+ }
+
+ public synchronized void addFactory(ClientSessionFactoryInternal factory)
+ {
+ if (factory != null)
+ {
+ TransportConfiguration backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factories.add(factory);
+ }
+ }
+
+ public static void shutdown()
+ {
+ if (globalScheduledThreadPool != null)
+ {
+ globalScheduledThreadPool.shutdown();
+ globalScheduledThreadPool = null;
+ }
+ if (globalThreadPool != null)
+ {
+ globalThreadPool.shutdown();
+ globalThreadPool = null;
+ }
+ }
+
+ public boolean isStaticDirectConnection(TransportConfiguration conf)
+ {
+ return false;
+ }
+}
Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ */
+public class DiscoveryGroupConstants
+{
+ // for static discovery
+ public static final String STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME = "static-connector-ref-list";
+ public static final String STATIC_CONNECTORS_LIST_NAME = "static-connector-list";
+
+ // for simple UDP discovery
+ public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+ public static final String GROUP_ADDRESS_NAME = "group-address";
+ public static final String GROUP_PORT_NAME = "group-port";
+ public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+ public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Deleted: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -1,1456 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * A ServerLocatorImpl
- *
- * @author Tim Fox
- */
-public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
-{
- private static final long serialVersionUID = -1615857864410205260L;
-
- private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
-
- private final boolean ha;
-
- private boolean finalizeCheck = true;
-
- private boolean clusterConnection;
-
- private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
-
- private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
-
- private TransportConfiguration[] initialConnectors;
-
- private DiscoveryGroupConfiguration discoveryGroupConfiguration;
-
- private StaticConnector staticConnector = new StaticConnector();
-
- private Topology topology = new Topology();
-
- private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
-
- private boolean receivedTopology;
-
- private boolean compressLargeMessage;
-
- private ExecutorService threadPool;
-
- private ScheduledExecutorService scheduledThreadPool;
-
- private DiscoveryGroup discoveryGroup;
-
- private ConnectionLoadBalancingPolicy loadBalancingPolicy;
-
- private boolean readOnly;
-
- // Settable attributes:
-
- private boolean cacheLargeMessagesClient;
-
- private long clientFailureCheckPeriod;
-
- private long connectionTTL;
-
- private long callTimeout;
-
- private int minLargeMessageSize;
-
- private int consumerWindowSize;
-
- private int consumerMaxRate;
-
- private int confirmationWindowSize;
-
- private int producerWindowSize;
-
- private int producerMaxRate;
-
- private boolean blockOnAcknowledge;
-
- private boolean blockOnDurableSend;
-
- private boolean blockOnNonDurableSend;
-
- private boolean autoGroup;
-
- private boolean preAcknowledge;
-
- private String connectionLoadBalancingPolicyClassName;
-
- private int ackBatchSize;
-
- private boolean useGlobalPools;
-
- private int scheduledThreadPoolMaxSize;
-
- private int threadPoolMaxSize;
-
- private long retryInterval;
-
- private double retryIntervalMultiplier;
-
- private long maxRetryInterval;
-
- private int reconnectAttempts;
-
- private int initialConnectAttempts;
-
- private boolean failoverOnInitialConnection;
-
- private int initialMessagePacketSize;
-
- private volatile boolean closed;
-
- private volatile boolean closing;
-
- private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
-
- private static ExecutorService globalThreadPool;
-
- private static ScheduledExecutorService globalScheduledThreadPool;
-
- private String groupID;
-
- private String nodeID;
-
- private TransportConfiguration clusterTransportConfiguration;
-
- private boolean backup;
-
- private final Exception e = new Exception();
-
- // To be called when there are ServerLocator being finalized.
- // To be used on test assertions
- public static Runnable finalizeCallback = null;
-
- private static synchronized ExecutorService getGlobalThreadPool()
- {
- if (globalThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
-
- globalThreadPool = Executors.newCachedThreadPool(factory);
- }
-
- return globalThreadPool;
- }
-
- public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
- {
- if (globalScheduledThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
- true,
- getThisClassLoader());
-
- globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-
- factory);
- }
-
- return globalScheduledThreadPool;
- }
-
- private void setThreadPools()
- {
- if (useGlobalPools)
- {
- threadPool = getGlobalThreadPool();
-
- scheduledThreadPool = getGlobalScheduledThreadPool();
- }
- else
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
-
- if (threadPoolMaxSize == -1)
- {
- threadPool = Executors.newCachedThreadPool(factory);
- }
- else
- {
- threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
- }
-
- factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
-
- scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
- }
- }
-
- private static ClassLoader getThisClassLoader()
- {
- return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
- }
-
- private void instantiateLoadBalancingPolicy()
- {
- if (connectionLoadBalancingPolicyClassName == null)
- {
- throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
- }
-
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- public Object run()
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
- return null;
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
- }
- }
- });
- }
-
- private synchronized void initialise() throws Exception
- {
- if (!readOnly)
- {
- setThreadPools();
-
- instantiateLoadBalancingPolicy();
-
- if (discoveryGroupConfiguration != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
-
- InetAddress lbAddress;
-
- if (discoveryGroupConfiguration.getLocalBindAddress() != null)
- {
- lbAddress = InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
- }
- else
- {
- lbAddress = null;
- }
-
- discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryGroupConfiguration.getName(),
- lbAddress,
- groupAddress,
- discoveryGroupConfiguration.getGroupPort(),
- discoveryGroupConfiguration.getRefreshTimeout());
-
- discoveryGroup.registerListener(this);
-
- discoveryGroup.start();
- }
-
- readOnly = true;
- }
- }
-
- private ServerLocatorImpl(final boolean useHA,
- final DiscoveryGroupConfiguration discoveryGroupConfiguration,
- final TransportConfiguration[] transportConfigs)
- {
- e.fillInStackTrace();
- this.ha = useHA;
-
- this.discoveryGroupConfiguration = discoveryGroupConfiguration;
-
- this.initialConnectors = transportConfigs;
-
- this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
-
- clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
- connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
-
- minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
- consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
- consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
-
- confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
- producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
- producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
-
- blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
- blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
- blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
- autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
-
- preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
- ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
-
- connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
- useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
-
- scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
- threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
- retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
-
- retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
- maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
- reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
- initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
-
- failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
- clusterConnection = false;
- }
-
- /**
- * Create a ServerLocatorImpl using UDP discovery to lookup cluster
- *
- * @param discoveryAddress
- * @param discoveryPort
- */
- public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
- {
- this(useHA, groupConfiguration, null);
- }
-
- /**
- * Create a ServerLocatorImpl using a static list of live servers
- *
- * @param transportConfigs
- */
- public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
- {
- this(useHA, null, transportConfigs);
- }
-
- private TransportConfiguration selectConnector()
- {
- if (receivedTopology)
- {
- int pos = loadBalancingPolicy.select(topologyArray.length);
-
- Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
-
- return pair.a;
- }
- else
- {
- // Get from initialconnectors
-
- int pos = loadBalancingPolicy.select(initialConnectors.length);
-
- return initialConnectors[pos];
- }
- }
-
- public void start(Executor executor) throws Exception
- {
- initialise();
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- connect();
- }
- catch (Exception e)
- {
- if (!closing)
- {
- log.warn("did not connect the cluster connection to other nodes", e);
- }
- }
- }
- });
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
- */
- public void disableFinalizeCheck()
- {
- finalizeCheck = false;
- }
-
- public ClientSessionFactory connect() throws Exception
- {
- ClientSessionFactoryInternal sf;
- // static list of initial connectors
- if (initialConnectors != null && discoveryGroup == null)
- {
- sf = (ClientSessionFactoryInternal)staticConnector.connect();
- }
- // wait for discovery group to get the list of initial connectors
- else
- {
- sf = (ClientSessionFactoryInternal)createSessionFactory();
- }
- addFactory(sf);
- return sf;
- }
-
- public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
- transportConfiguration,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
-
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
-
- addFactory(factory);
-
- return factory;
- }
-
- public ClientSessionFactory createSessionFactory() throws Exception
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- if (initialConnectors == null && discoveryGroup != null)
- {
- // Wait for an initial broadcast to give us at least one node in the cluster
- long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
- boolean ok = discoveryGroup.waitForBroadcast(timeout);
-
- if (!ok)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial broadcast from cluster");
- }
- }
-
- ClientSessionFactoryInternal factory = null;
-
- synchronized (this)
- {
- boolean retry;
- int attempts = 0;
- do
- {
- retry = false;
-
- TransportConfiguration tc = selectConnector();
-
- // try each factory in the list until we find one which works
-
- try
- {
- factory = new ClientSessionFactoryImpl(this,
- tc,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
- }
- catch (HornetQException e)
- {
- factory.close();
- factory = null;
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- attempts++;
-
- if (topologyArray != null && attempts == topologyArray.length)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
- }
- if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
- }
- retry = true;
- }
- else
- {
- throw e;
- }
- }
- }
- while (retry);
-
- if (ha)
- {
- long toWait = 30000;
- long start = System.currentTimeMillis();
- while (!receivedTopology && toWait > 0)
- {
- // Now wait for the topology
-
- try
- {
- wait(toWait);
- }
- catch (InterruptedException ignore)
- {
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- if (toWait <= 0)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive cluster topology");
- }
- }
-
- addFactory(factory);
-
- return factory;
- }
- }
-
- public synchronized boolean isHA()
- {
- return ha;
- }
-
- public synchronized boolean isCacheLargeMessagesClient()
- {
- return cacheLargeMessagesClient;
- }
-
- public synchronized void setCacheLargeMessagesClient(final boolean cached)
- {
- cacheLargeMessagesClient = cached;
- }
-
- public synchronized long getClientFailureCheckPeriod()
- {
- return clientFailureCheckPeriod;
- }
-
- public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
- {
- checkWrite();
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- }
-
- public synchronized long getConnectionTTL()
- {
- return connectionTTL;
- }
-
- public synchronized void setConnectionTTL(final long connectionTTL)
- {
- checkWrite();
- this.connectionTTL = connectionTTL;
- }
-
- public synchronized long getCallTimeout()
- {
- return callTimeout;
- }
-
- public synchronized void setCallTimeout(final long callTimeout)
- {
- checkWrite();
- this.callTimeout = callTimeout;
- }
-
- public synchronized int getMinLargeMessageSize()
- {
- return minLargeMessageSize;
- }
-
- public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
- {
- checkWrite();
- this.minLargeMessageSize = minLargeMessageSize;
- }
-
- public synchronized int getConsumerWindowSize()
- {
- return consumerWindowSize;
- }
-
- public synchronized void setConsumerWindowSize(final int consumerWindowSize)
- {
- checkWrite();
- this.consumerWindowSize = consumerWindowSize;
- }
-
- public synchronized int getConsumerMaxRate()
- {
- return consumerMaxRate;
- }
-
- public synchronized void setConsumerMaxRate(final int consumerMaxRate)
- {
- checkWrite();
- this.consumerMaxRate = consumerMaxRate;
- }
-
- public synchronized int getConfirmationWindowSize()
- {
- return confirmationWindowSize;
- }
-
- public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
- {
- checkWrite();
- this.confirmationWindowSize = confirmationWindowSize;
- }
-
- public synchronized int getProducerWindowSize()
- {
- return producerWindowSize;
- }
-
- public synchronized void setProducerWindowSize(final int producerWindowSize)
- {
- checkWrite();
- this.producerWindowSize = producerWindowSize;
- }
-
- public synchronized int getProducerMaxRate()
- {
- return producerMaxRate;
- }
-
- public synchronized void setProducerMaxRate(final int producerMaxRate)
- {
- checkWrite();
- this.producerMaxRate = producerMaxRate;
- }
-
- public synchronized boolean isBlockOnAcknowledge()
- {
- return blockOnAcknowledge;
- }
-
- public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
- {
- checkWrite();
- this.blockOnAcknowledge = blockOnAcknowledge;
- }
-
- public synchronized boolean isBlockOnDurableSend()
- {
- return blockOnDurableSend;
- }
-
- public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
- {
- checkWrite();
- this.blockOnDurableSend = blockOnDurableSend;
- }
-
- public synchronized boolean isBlockOnNonDurableSend()
- {
- return blockOnNonDurableSend;
- }
-
- public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
- {
- checkWrite();
- this.blockOnNonDurableSend = blockOnNonDurableSend;
- }
-
- public synchronized boolean isAutoGroup()
- {
- return autoGroup;
- }
-
- public synchronized void setAutoGroup(final boolean autoGroup)
- {
- checkWrite();
- this.autoGroup = autoGroup;
- }
-
- public synchronized boolean isPreAcknowledge()
- {
- return preAcknowledge;
- }
-
- public synchronized void setPreAcknowledge(final boolean preAcknowledge)
- {
- checkWrite();
- this.preAcknowledge = preAcknowledge;
- }
-
- public synchronized int getAckBatchSize()
- {
- return ackBatchSize;
- }
-
- public synchronized void setAckBatchSize(final int ackBatchSize)
- {
- checkWrite();
- this.ackBatchSize = ackBatchSize;
- }
-
- public synchronized boolean isUseGlobalPools()
- {
- return useGlobalPools;
- }
-
- public synchronized void setUseGlobalPools(final boolean useGlobalPools)
- {
- checkWrite();
- this.useGlobalPools = useGlobalPools;
- }
-
- public synchronized int getScheduledThreadPoolMaxSize()
- {
- return scheduledThreadPoolMaxSize;
- }
-
- public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
- {
- checkWrite();
- this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
- }
-
- public synchronized int getThreadPoolMaxSize()
- {
- return threadPoolMaxSize;
- }
-
- public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
- {
- checkWrite();
- this.threadPoolMaxSize = threadPoolMaxSize;
- }
-
- public synchronized long getRetryInterval()
- {
- return retryInterval;
- }
-
- public synchronized void setRetryInterval(final long retryInterval)
- {
- checkWrite();
- this.retryInterval = retryInterval;
- }
-
- public synchronized long getMaxRetryInterval()
- {
- return maxRetryInterval;
- }
-
- public synchronized void setMaxRetryInterval(final long retryInterval)
- {
- checkWrite();
- maxRetryInterval = retryInterval;
- }
-
- public synchronized double getRetryIntervalMultiplier()
- {
- return retryIntervalMultiplier;
- }
-
- public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
- {
- checkWrite();
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- }
-
- public synchronized int getReconnectAttempts()
- {
- return reconnectAttempts;
- }
-
- public synchronized void setReconnectAttempts(final int reconnectAttempts)
- {
- checkWrite();
- this.reconnectAttempts = reconnectAttempts;
- }
-
- public void setInitialConnectAttempts(int initialConnectAttempts)
- {
- checkWrite();
- this.initialConnectAttempts = initialConnectAttempts;
- }
-
- public int getInitialConnectAttempts()
- {
- return initialConnectAttempts;
- }
-
- public synchronized boolean isFailoverOnInitialConnection()
- {
- return this.failoverOnInitialConnection;
- }
-
- public synchronized void setFailoverOnInitialConnection(final boolean failover)
- {
- checkWrite();
- this.failoverOnInitialConnection = failover;
- }
-
- public synchronized String getConnectionLoadBalancingPolicyClassName()
- {
- return connectionLoadBalancingPolicyClassName;
- }
-
- public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
- {
- checkWrite();
- connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
- }
-
- public TransportConfiguration[] getStaticTransportConfigurations()
- {
- return this.initialConnectors;
- }
-
- public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
- {
- return discoveryGroupConfiguration;
- }
-
- public void addInterceptor(final Interceptor interceptor)
- {
- interceptors.add(interceptor);
- }
-
- public boolean removeInterceptor(final Interceptor interceptor)
- {
- return interceptors.remove(interceptor);
- }
-
- public synchronized int getInitialMessagePacketSize()
- {
- return initialMessagePacketSize;
- }
-
- public synchronized void setInitialMessagePacketSize(final int size)
- {
- checkWrite();
- initialMessagePacketSize = size;
- }
-
- public void setGroupID(final String groupID)
- {
- checkWrite();
- this.groupID = groupID;
- }
-
- public String getGroupID()
- {
- return groupID;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
- */
- public boolean isCompressLargeMessage()
- {
- return compressLargeMessage;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
- */
- public void setCompressLargeMessage(boolean compress)
- {
- this.compressLargeMessage = compress;
- }
-
- private void checkWrite()
- {
- if (readOnly)
- {
- throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
- }
- }
-
- public void setNodeID(String nodeID)
- {
- this.nodeID = nodeID;
- }
-
- public String getNodeID()
- {
- return nodeID;
- }
-
- public void setClusterConnection(boolean clusterConnection)
- {
- this.clusterConnection = clusterConnection;
- }
-
- public boolean isClusterConnection()
- {
- return clusterConnection;
- }
-
- public TransportConfiguration getClusterTransportConfiguration()
- {
- return clusterTransportConfiguration;
- }
-
- public void setClusterTransportConfiguration(TransportConfiguration tc)
- {
- this.clusterTransportConfiguration = tc;
- }
-
- public boolean isBackup()
- {
- return backup;
- }
-
- public void setBackup(boolean backup)
- {
- this.backup = backup;
- }
-
- @Override
- protected void finalize() throws Throwable
- {
- if (finalizeCheck)
- {
- close();
- }
-
- super.finalize();
- }
-
- public void close()
- {
- if (closed)
- {
- return;
- }
-
- closing = true;
-
- if (discoveryGroup != null)
- {
- try
- {
- discoveryGroup.stop();
- }
- catch (Exception e)
- {
- log.error("Failed to stop discovery group", e);
- }
- }
- else
- {
- staticConnector.disconnect();
- }
-
- for (ClientSessionFactory factory : factories)
- {
- factory.close();
- }
-
- factories.clear();
-
- if (!useGlobalPools)
- {
- if (threadPool != null)
- {
- threadPool.shutdown();
-
- try
- {
- if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
-
- if (scheduledThreadPool != null)
- {
- scheduledThreadPool.shutdown();
-
- try
- {
- if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for scheduled pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
- readOnly = false;
-
- closed = true;
- }
-
- public synchronized void notifyNodeDown(final String nodeID)
- {
- boolean removed = false;
-
- if (!ha)
- {
- return;
- }
-
- removed = topology.removeMember(nodeID);
-
- if (!topology.isEmpty())
- {
- updateArraysAndPairs();
-
- if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
- {
- receivedTopology = false;
- }
- }
- else
- {
- topologyArray = null;
-
- receivedTopology = false;
- }
-
- if (removed)
- {
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeDown(nodeID);
- }
- }
- }
-
- public synchronized void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
- {
- if (!ha)
- {
- return;
- }
-
- topology.addMember(nodeID, new TopologyMember(connectorPair));
-
- TopologyMember actMember = topology.getMember(nodeID);
-
- if (actMember.getConnector().a != null && actMember.getConnector().b != null)
- {
- for (ClientSessionFactory factory : factories)
- {
- ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
- actMember.getConnector().b);
- }
- }
-
- if (connectorPair.a != null)
- {
- updateArraysAndPairs();
- }
-
- if (last)
- {
- receivedTopology = true;
- }
-
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeUP(nodeID, connectorPair, last);
- }
-
- // Notify if waiting on getting topology
- notify();
- }
-
- private void updateArraysAndPairs()
- {
- topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
- topology.members());
-
- int count = 0;
- for (TopologyMember pair : topology.getMembers())
- {
- topologyArray[count++] = pair.getConnector();
- }
- }
-
- public synchronized void connectorsChanged()
- {
- List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
-
- this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
- newConnectors.size());
-
- int count = 0;
- for (DiscoveryEntry entry : newConnectors)
- {
- this.initialConnectors[count++] = entry.getConnector();
- }
-
- if (ha && clusterConnection && !receivedTopology && initialConnectors.length > 0)
- {
- // FIXME the node is alone in the cluster. We create a connection to the new node
- // to trigger the node notification to form the cluster.
- try
- {
- connect();
- }
- catch (Exception e)
- {
- e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- public synchronized void factoryClosed(final ClientSessionFactory factory)
- {
- factories.remove(factory);
-
- if (factories.isEmpty())
- {
- // Go back to using the broadcast or static list
-
- receivedTopology = false;
-
- topology = null;
-
- }
- }
-
- public Topology getTopology()
- {
- return topology;
- }
-
- public void addClusterTopologyListener(final ClusterTopologyListener listener)
- {
- topologyListeners.add(listener);
- if(topology.members() > 0)
- {
- System.out.println("ServerLocatorImpl.addClusterTopologyListener");
- }
- }
-
- public void removeClusterTopologyListener(final ClusterTopologyListener listener)
- {
- topologyListeners.remove(listener);
- }
-
- public synchronized void addFactory(ClientSessionFactoryInternal factory)
- {
- if (factory != null)
- {
- TransportConfiguration backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
- factories.add(factory);
- }
- }
-
- public static void shutdown()
- {
- if (globalScheduledThreadPool != null)
- {
- globalScheduledThreadPool.shutdown();
- globalScheduledThreadPool = null;
- }
- if (globalThreadPool != null)
- {
- globalThreadPool.shutdown();
- globalThreadPool = null;
- }
- }
-
- class StaticConnector implements Serializable
- {
- private List<Connector> connectors;
-
- public ClientSessionFactory connect() throws HornetQException
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- ClientSessionFactory csf = null;
-
- createConnectors();
-
- try
- {
- List<Future<ClientSessionFactory>> futures = threadPool.invokeAll(connectors);
- for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
- {
- Future<ClientSessionFactory> future = futures.get(i);
- try
- {
- csf = future.get();
- if (csf != null)
- break;
- }
- catch (Exception e)
- {
- log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
- }
- }
- if (csf == null && !closed)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
- }
- }
- catch (InterruptedException e)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
- }
-
- if (csf == null && !closed)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
- }
- return csf;
- }
-
- private synchronized void createConnectors()
- {
- connectors = new ArrayList<Connector>();
- for (TransportConfiguration initialConnector : initialConnectors)
- {
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
- initialConnector,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- connectors.add(new Connector(initialConnector, factory));
- }
- }
-
- public synchronized void disconnect()
- {
- if (connectors != null)
- {
- for (Connector connector : connectors)
- {
- connector.disconnect();
- }
- }
- }
-
- public void finalize() throws Throwable
- {
- if (!closed && finalizeCheck)
- {
- log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
- System.identityHashCode(this));
-
- log.warn("The ServerLocator you didn't close was created here:", e);
-
- if (ServerLocatorImpl.finalizeCallback != null)
- {
- ServerLocatorImpl.finalizeCallback.run();
- }
-
- close();
- }
-
- super.finalize();
- }
-
- class Connector implements Callable<ClientSessionFactory>
- {
- private TransportConfiguration initialConnector;
-
- private volatile ClientSessionFactoryInternal factory;
-
- private boolean isConnected = false;
-
- private boolean interrupted = false;
-
- private Exception e;
-
- public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
- {
- this.initialConnector = initialConnector;
- this.factory = factory;
- }
-
- public ClientSessionFactory call() throws HornetQException
- {
- try
- {
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
- }
- catch (HornetQException e)
- {
- if (!interrupted)
- {
- this.e = e;
- throw e;
- }
- /*if(factory != null)
- {
- factory.close();
- factory = null;
- }*/
- return null;
- }
- isConnected = true;
- for (Connector connector : connectors)
- {
- if (!connector.isConnected())
- {
- connector.disconnect();
- }
- }
- return factory;
- }
-
- public boolean isConnected()
- {
- return isConnected;
- }
-
- public void disconnect()
- {
- interrupted = true;
-
- if (factory != null)
- {
- factory.causeExit();
- factory.close();
- factory = null;
- }
- }
- }
- }
-}
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -57,4 +57,6 @@
void setBackup(boolean backup);
Topology getTopology();
+
+ boolean isStaticDirectConnection(TransportConfiguration con);
}
Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A SimpleUDPServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public class SimpleUDPServerLocatorImpl extends AbstractServerLocator implements DiscoveryListener
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ private static final Logger log = Logger.getLogger(SimpleUDPServerLocatorImpl.class);
+
+ private String discoveryGroupName;
+
+ private InetAddress localBindAddress;
+
+ private InetAddress groupAddress;
+
+ private int groupPort;
+
+ private long refreshTimeout;
+
+ private long initialWaitTimeout;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private volatile boolean closing;
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
+
+ private synchronized void initialise() throws Exception
+ {
+ if (!isReadOnly())
+ {
+ setThreadPools();
+
+ instantiateLoadBalancingPolicy();
+
+ this.discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+
+ Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+
+ String lbStr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
+
+ if (lbStr != null)
+ {
+ this.localBindAddress = InetAddress.getByName(lbStr);
+ }
+ else
+ {
+ this.localBindAddress = null;
+ }
+
+ this.groupAddress = InetAddress.getByName(ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params));
+ this.groupPort = ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
+ this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
+ this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
+
+ discoveryGroup = new DiscoveryGroupImpl(getNodeID(),
+ this.discoveryGroupName,
+ this.localBindAddress,
+ this.groupAddress,
+ this.groupPort,
+ this.refreshTimeout);
+
+ discoveryGroup.registerListener(this);
+
+ discoveryGroup.start();
+
+ setReadOnly(true);
+ }
+ }
+
+ public SimpleUDPServerLocatorImpl(final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ super(useHA, discoveryGroupConfiguration);
+ }
+
+ public void start(Executor executor) throws Exception
+ {
+ initialise();
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if (!closing)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ public ClientSessionFactory connect() throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+
+ // wait for discovery group to get the list of initial connectors
+ sf = (ClientSessionFactoryInternal)createSessionFactory();
+
+ addFactory(sf);
+ return sf;
+ }
+
+ public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+ transportConfiguration,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+
+ factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+
+ addFactory(factory);
+
+ return factory;
+ }
+
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ if (getInitialConnectors() == null)
+ {
+ // Wait for an initial broadcast to give us at least one node in the cluster
+ long timeout = isClusterConnection() ? 0 : this.initialWaitTimeout;
+ boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+ if (!ok)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive initial broadcast from cluster");
+ }
+ }
+
+ ClientSessionFactoryInternal factory = null;
+
+ synchronized (this)
+ {
+ boolean retry;
+ int attempts = 0;
+ do
+ {
+ retry = false;
+
+ TransportConfiguration tc = selectConnector();
+
+ // try each factory in the list until we find one which works
+
+ try
+ {
+ factory = new ClientSessionFactoryImpl(this,
+ tc,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+ factory.connect(getInitialConnectAttempts(), isFailoverOnInitialConnection());
+ }
+ catch (HornetQException e)
+ {
+ factory.close();
+ factory = null;
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ attempts++;
+
+ if (attempts == getConnectorLength())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ retry = true;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ while (retry);
+
+ if (isHA())
+ {
+ long toWait = 30000;
+ long start = System.currentTimeMillis();
+ while (!isReceivedTopology() && toWait > 0)
+ {
+ // Now wait for the topology
+
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive cluster topology");
+ }
+ }
+
+ addFactory(factory);
+
+ return factory;
+ }
+ }
+
+ public void close()
+ {
+ if (isClosed())
+ {
+ return;
+ }
+
+ closing = true;
+
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
+
+ super.close();
+ }
+
+ public synchronized void connectorsChanged()
+ {
+ List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
+
+ TransportConfiguration[] initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+ newConnectors.size());
+ int count = 0;
+ for (DiscoveryEntry entry : newConnectors)
+ {
+ initialConnectors[count++] = entry.getConnector();
+ }
+
+ if (isHA() && isClusterConnection() && !isReceivedTopology() && initialConnectors.length > 0)
+ {
+ // FIXME the node is alone in the cluster. We create a connection to the new node
+ // to trigger the node notification to form the cluster.
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ setInitialConnectors(initialConnectors);
+ }
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,450 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A SimpleUDPServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public class StaticServerLocatorImpl extends AbstractServerLocator
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ private static final Logger log = Logger.getLogger(StaticServerLocatorImpl.class);
+
+ private StaticConnector staticConnector = new StaticConnector();
+
+ private volatile boolean closing;
+
+ private final Exception e = new Exception();
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
+
+ private synchronized void initialise() throws Exception
+ {
+ if (!isReadOnly())
+ {
+ setThreadPools();
+
+ instantiateLoadBalancingPolicy();
+
+ Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+ TransportConfiguration[] initialConnectors = (TransportConfiguration[])params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME);
+ setInitialConnectors(initialConnectors);
+
+ setReadOnly(true);
+ }
+ }
+
+ public StaticServerLocatorImpl(final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ super(useHA, discoveryGroupConfiguration);
+
+ e.fillInStackTrace();
+ }
+
+ public void start(Executor executor) throws Exception
+ {
+ initialise();
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if (!closing)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ public ClientSessionFactory connect() throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+
+ sf = (ClientSessionFactoryInternal)staticConnector.connect();
+
+ addFactory(sf);
+ return sf;
+ }
+
+ public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+ transportConfiguration,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+
+ factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+
+ addFactory(factory);
+
+ return factory;
+ }
+
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactoryInternal factory = null;
+
+ synchronized (this)
+ {
+ boolean retry;
+ int attempts = 0;
+ do
+ {
+ retry = false;
+
+ TransportConfiguration tc = selectConnector();
+
+ // try each factory in the list until we find one which works
+
+ try
+ {
+ factory = new ClientSessionFactoryImpl(this,
+ tc,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+ factory.connect(getInitialConnectAttempts(), isFailoverOnInitialConnection());
+ }
+ catch (HornetQException e)
+ {
+ factory.close();
+ factory = null;
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ attempts++;
+
+ if (attempts == getConnectorLength())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ retry = true;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ while (retry);
+
+ if (isHA())
+ {
+ long toWait = 30000;
+ long start = System.currentTimeMillis();
+ while (!isReceivedTopology() && toWait > 0)
+ {
+ // Now wait for the topology
+
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive cluster topology");
+ }
+ }
+
+ addFactory(factory);
+
+ return factory;
+ }
+ }
+
+ public void close()
+ {
+ if (isClosed())
+ {
+ return;
+ }
+
+ closing = true;
+
+ staticConnector.disconnect();
+
+ super.close();
+ }
+
+ public boolean isStaticDirectConnection(TransportConfiguration con)
+ {
+ for(TransportConfiguration connector : getInitialConnectors())
+ {
+ if(connector.equals(con))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ class StaticConnector implements Serializable
+ {
+ private List<Connector> connectors;
+
+ public ClientSessionFactory connect() throws HornetQException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactory csf = null;
+
+ createConnectors();
+
+ try
+ {
+ List<Future<ClientSessionFactory>> futures = getThreadPool().invokeAll(connectors);
+ for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
+ {
+ Future<ClientSessionFactory> future = futures.get(i);
+ try
+ {
+ csf = future.get();
+ if (csf != null)
+ break;
+ }
+ catch (Exception e)
+ {
+ log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
+ }
+ }
+ if (csf == null && !isClosed())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
+ }
+
+ if (csf == null && !isClosed())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+ }
+ return csf;
+ }
+
+ private synchronized void createConnectors()
+ {
+ connectors = new ArrayList<Connector>();
+ for (TransportConfiguration initialConnector : getInitialConnectors())
+ {
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(StaticServerLocatorImpl.this,
+ initialConnector,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+ connectors.add(new Connector(initialConnector, factory));
+ }
+ }
+
+ public synchronized void disconnect()
+ {
+ if (connectors != null)
+ {
+ for (Connector connector : connectors)
+ {
+ connector.disconnect();
+ }
+ }
+ }
+
+ public void finalize() throws Throwable
+ {
+ if (!isClosed() && doFinalizeCheck())
+ {
+ log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
+ System.identityHashCode(this));
+
+ log.warn("The ServerLocator you didn't close was created here:", e);
+
+ if (StaticServerLocatorImpl.finalizeCallback != null)
+ {
+ StaticServerLocatorImpl.finalizeCallback.run();
+ }
+
+ close();
+ }
+
+ super.finalize();
+ }
+
+ class Connector implements Callable<ClientSessionFactory>
+ {
+ private TransportConfiguration initialConnector;
+
+ private volatile ClientSessionFactoryInternal factory;
+
+ private boolean isConnected = false;
+
+ private boolean interrupted = false;
+
+ private Exception e;
+
+ public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
+ {
+ this.initialConnector = initialConnector;
+ this.factory = factory;
+ }
+
+ public ClientSessionFactory call() throws HornetQException
+ {
+ try
+ {
+ factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+ }
+ catch (HornetQException e)
+ {
+ if (!interrupted)
+ {
+ this.e = e;
+ throw e;
+ }
+ /*if(factory != null)
+ {
+ factory.close();
+ factory = null;
+ }*/
+ return null;
+ }
+ isConnected = true;
+ for (Connector connector : connectors)
+ {
+ if (!connector.isConnected())
+ {
+ connector.disconnect();
+ }
+ }
+ return factory;
+ }
+
+ public boolean isConnected()
+ {
+ return isConnected;
+ }
+
+ public void disconnect()
+ {
+ interrupted = true;
+
+ if (factory != null)
+ {
+ factory.causeExit();
+ factory.close();
+ factory = null;
+ }
+ }
+ }
+ }
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -42,6 +42,8 @@
private final boolean forwardWhenNoConsumers;
private final List<String> staticConnectors;
+
+ private final List<String> allowableConnectors;
private final String discoveryGroupName;
@@ -49,7 +51,7 @@
private final int confirmationWindowSize;
- private final boolean allowDirectConnectionsOnly;
+ private final boolean allowableConnectionsOnly;
public ClusterConnectionConfiguration(final String name,
final String address,
@@ -60,42 +62,22 @@
final int maxHops,
final int confirmationWindowSize,
final List<String> staticConnectors,
- final boolean allowDirectConnectionsOnly)
+ final String discoveryGroupName,
+ final boolean allowableConnectionsOnly,
+ final List<String> allowableConnectorNames)
{
this.name = name;
this.address = address;
this.connectorName = connectorName;
this.retryInterval = retryInterval;
- this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
- discoveryGroupName = null;
- this.maxHops = maxHops;
- this.confirmationWindowSize = confirmationWindowSize;
- this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
- }
-
- public ClusterConnectionConfiguration(final String name,
- final String address,
- final String connectorName,
- final long retryInterval,
- final boolean duplicateDetection,
- final boolean forwardWhenNoConsumers,
- final int maxHops,
- final int confirmationWindowSize,
- final String discoveryGroupName)
- {
- this.name = name;
- this.address = address;
- this.connectorName = connectorName;
- this.retryInterval = retryInterval;
- this.duplicateDetection = duplicateDetection;
- this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = discoveryGroupName;
- this.staticConnectors = null;
this.maxHops = maxHops;
this.confirmationWindowSize = confirmationWindowSize;
- allowDirectConnectionsOnly = false;
+ this.staticConnectors = staticConnectors;
+ this.allowableConnectors = allowableConnectorNames;
+ this.allowableConnectionsOnly = allowableConnectionsOnly;
}
public String getName()
@@ -137,6 +119,11 @@
{
return staticConnectors;
}
+
+ public List<String> getAllowableConnectors()
+ {
+ return allowableConnectors;
+ }
public String getDiscoveryGroupName()
{
@@ -148,8 +135,8 @@
return retryInterval;
}
- public boolean isAllowDirectConnectionsOnly()
+ public boolean isAllowableConnectionsOnly()
{
- return allowDirectConnectionsOnly;
+ return allowableConnectionsOnly;
}
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -22,12 +22,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.StringTokenizer;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
import org.hornetq.core.config.*;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.FileConfiguration;
@@ -939,29 +941,42 @@
{
String name = e.getAttribute("name");
- String localBindAddress = XMLConfigurationUtil.getString(e, "local-bind-address", null, Validators.NO_CHECK);
+ String clazz = XMLConfigurationUtil.getString(e, "server-locator-class", null, Validators.NOT_NULL_OR_EMPTY);
- String groupAddress = XMLConfigurationUtil.getString(e, "group-address", null, Validators.NOT_NULL_OR_EMPTY);
+ Map<String, Object> params = new HashMap<String, Object>();
- int groupPort = XMLConfigurationUtil.getInteger(e, "group-port", -1, Validators.MINUS_ONE_OR_GT_ZERO);
+ NodeList paramsNodes = e.getElementsByTagName("param");
- long discoveryInitialWaitTimeout = XMLConfigurationUtil.getLong(e,
- "initial-wait-timeout",
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- Validators.GT_ZERO);
+ for (int i = 0; i < paramsNodes.getLength(); i++)
+ {
+ Node paramNode = paramsNodes.item(i);
- long refreshTimeout = XMLConfigurationUtil.getLong(e,
- "refresh-timeout",
- ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT,
- Validators.GT_ZERO);
+ NamedNodeMap attributes = paramNode.getAttributes();
- DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(name,
- localBindAddress,
- groupAddress,
- groupPort,
- refreshTimeout,
- discoveryInitialWaitTimeout);
+ Node nkey = attributes.getNamedItem("key");
+ String key = nkey.getTextContent();
+
+ Node nValue = attributes.getNamedItem("value");
+
+ params.put(key, nValue.getTextContent());
+ }
+
+ // discovery-group configuration contains static connector list
+ String connectorList = (String)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME);
+ if(connectorList != null)
+ {
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ StringTokenizer token = new StringTokenizer(connectorList, ",", false);
+ while(token.hasMoreElements())
+ {
+ connectors.add(mainConfig.getConnectorConfigurations().get(token.nextElement()));
+ }
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, connectors.toArray(new TransportConfiguration[0]));
+ }
+
+ DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(clazz, params, name);
+
if (mainConfig.getDiscoveryGroupConfigurations().containsKey(name))
{
FileConfigurationParser.log.warn("There is already a discovery group with name " + name +
@@ -1008,8 +1023,6 @@
String discoveryGroupName = null;
- List<String> staticConnectorNames = new ArrayList<String>();
-
boolean allowDirectConnectionsOnly = false;
NodeList children = e.getChildNodes();
@@ -1021,46 +1034,58 @@
if (child.getNodeName().equals("discovery-group-ref"))
{
discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
- }
- else if (child.getNodeName().equals("static-connectors"))
- {
+
Node attr = child.getAttributes().getNamedItem("allow-direct-connections-only");
if (attr != null)
{
allowDirectConnectionsOnly = "true".equalsIgnoreCase(attr.getNodeValue()) || allowDirectConnectionsOnly;
}
- getStaticConnectors(staticConnectorNames, child);
}
}
- ClusterConnectionConfiguration config;
-
- if (discoveryGroupName == null)
+ List<String> staticConnectors = new ArrayList<String>();
+ DiscoveryGroupConfiguration discovery = mainConfig.getDiscoveryGroupConfigurations().get(discoveryGroupName);
+ Map<String,Object> params = discovery.getParams();
+ String connectorList = (String)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME);
+ if(connectorList != null)
{
- config = new ClusterConnectionConfiguration(name,
- address,
- connectorName,
- retryInterval,
- duplicateDetection,
- forwardWhenNoConsumers,
- maxHops,
- confirmationWindowSize,
- staticConnectorNames,
- allowDirectConnectionsOnly);
+ StringTokenizer token = new StringTokenizer(connectorList, ",", false);
+ while(token.hasMoreElements())
+ {
+ staticConnectors.add(token.nextToken());
+ }
}
- else
+
+ List<String> allowableConnectionNames = null;
+ if(allowDirectConnectionsOnly)
{
- config = new ClusterConnectionConfiguration(name,
- address,
- connectorName,
- retryInterval,
- duplicateDetection,
- forwardWhenNoConsumers,
- maxHops,
- confirmationWindowSize,
- discoveryGroupName);
+ if(connectorList == null)
+ {
+ log.warn("allow-direct-connections-only was found, but "
+ + DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME
+ + " was not found in discovery-group. ignore.");
+ }
+ else
+ {
+ allowableConnectionNames = staticConnectors;
+ }
}
+
+ ClusterConnectionConfiguration config;
+ config = new ClusterConnectionConfiguration(name,
+ address,
+ connectorName,
+ retryInterval,
+ duplicateDetection,
+ forwardWhenNoConsumers,
+ maxHops,
+ confirmationWindowSize,
+ staticConnectors,
+ discoveryGroupName,
+ allowDirectConnectionsOnly,
+ allowableConnectionNames);
+
mainConfig.getClusterConfigurations().add(config);
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -100,90 +100,16 @@
private final String clusterPassword;
- private final ClusterConnector clusterConnector;
+ private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
private ServerLocatorInternal serverLocator;
private final TransportConfiguration connector;
- private final boolean allowDirectConnectionsOnly;
+ private final boolean allowableConnectionsOnly;
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
- public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
- final TransportConfiguration connector,
- final SimpleString name,
- final SimpleString address,
- final long retryInterval,
- final boolean useDuplicateDetection,
- final boolean routeWhenNoConsumers,
- final int confirmationWindowSize,
- final ExecutorFactory executorFactory,
- final HornetQServer server,
- final PostOffice postOffice,
- final ManagementService managementService,
- final ScheduledExecutorService scheduledExecutor,
- final int maxHops,
- final UUID nodeUUID,
- final boolean backup,
- final String clusterUser,
- final String clusterPassword,
- final boolean allowDirectConnectionsOnly) throws Exception
- {
-
- if (nodeUUID == null)
- {
- throw new IllegalArgumentException("node id is null");
- }
-
- this.nodeUUID = nodeUUID;
-
- this.connector = connector;
-
- this.name = name;
-
- this.address = address;
-
- this.retryInterval = retryInterval;
-
- this.useDuplicateDetection = useDuplicateDetection;
-
- this.routeWhenNoConsumers = routeWhenNoConsumers;
-
- this.executorFactory = executorFactory;
-
- this.server = server;
-
- this.postOffice = postOffice;
-
- this.managementService = managementService;
-
- this.scheduledExecutor = scheduledExecutor;
-
- this.maxHops = maxHops;
-
- this.backup = backup;
-
- this.clusterUser = clusterUser;
-
- this.clusterPassword = clusterPassword;
-
- this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
-
- clusterConnector = new StaticClusterConnector(tcConfigs);
-
- if (tcConfigs != null && tcConfigs.length > 0)
- {
- // a cluster connection will connect to other nodes only if they are directly connected
- // through a static list of connectors or broadcasting using UDP.
- if(allowDirectConnectionsOnly)
- {
- allowableConnections.addAll(Arrays.asList(tcConfigs));
- }
- }
-
- }
-
public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
@@ -202,7 +128,8 @@
final boolean backup,
final String clusterUser,
final String clusterPassword,
- final boolean allowDirectConnectionsOnly) throws Exception
+ final boolean allowableConnectionsOnly,
+ final TransportConfiguration[] allowableConnections) throws Exception
{
if (nodeUUID == null)
@@ -242,9 +169,9 @@
this.clusterPassword = clusterPassword;
- this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+ this.allowableConnectionsOnly = allowableConnectionsOnly;
- clusterConnector = new DiscoveryClusterConnector(dg);
+ this.discoveryGroupConfiguration = dg;
}
public synchronized void start() throws Exception
@@ -346,7 +273,7 @@
backup = false;
- serverLocator = clusterConnector.createServerLocator();
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(this.discoveryGroupConfiguration);
if (serverLocator != null)
@@ -433,7 +360,7 @@
server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
- if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
+ if (allowableConnectionsOnly && !allowableConnections.contains(connectorPair.a))
{
return;
}
@@ -992,46 +919,4 @@
return out;
}
-
- interface ClusterConnector
- {
- ServerLocatorInternal createServerLocator();
- }
-
- private class StaticClusterConnector implements ClusterConnector
- {
- private final TransportConfiguration[] tcConfigs;
-
- public StaticClusterConnector(TransportConfiguration[] tcConfigs)
- {
- this.tcConfigs = tcConfigs;
- }
-
- public ServerLocatorInternal createServerLocator()
- {
- if(tcConfigs != null && tcConfigs.length > 0)
- {
- return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
- }
- else
- {
- return null;
- }
- }
- }
-
- private class DiscoveryClusterConnector implements ClusterConnector
- {
- private final DiscoveryGroupConfiguration dg;
-
- public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg)
- {
- this.dg = dg;
- }
-
- public ServerLocatorInternal createServerLocator()
- {
- return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
- }
- }
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -30,7 +30,7 @@
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
@@ -621,46 +621,23 @@
ServerLocatorInternal serverLocator;
- if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
- if (discoveryGroupConfiguration == null)
- {
- ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
- "'. The bridge will not be deployed.");
+ if (discoveryGroupConfiguration == null)
+ {
+ ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
+ "'. The bridge will not be deployed.");
- return;
- }
+ return;
+ }
- if (config.isHA())
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
- }
- else
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration);
- }
-
+ if (config.isHA())
+ {
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
}
else
{
- TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
- if (tcConfigs == null)
- {
- return;
- }
-
- if (config.isHA())
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
- }
- else
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
- }
-
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration);
}
serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
@@ -739,61 +716,41 @@
ClusterConnectionImpl clusterConnection;
- if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
- .get(config.getDiscoveryGroupName());
+ DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
+ .get(config.getDiscoveryGroupName());
- if (dg == null)
- {
- ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+ if (dg == null)
+ {
+ ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
"'. The cluster connection will not be deployed.");
- }
-
- clusterConnection = new ClusterConnectionImpl(dg,
- connector,
- new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getRetryInterval(),
- config.isDuplicateDetection(),
- config.isForwardWhenNoConsumers(),
- config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- config.getMaxHops(),
- nodeUUID,
- backup,
- server.getConfiguration().getClusterUser(),
- server.getConfiguration().getClusterPassword(),
- config.isAllowDirectConnectionsOnly());
}
- else
- {
- TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null? connectorNameListToArray(config.getStaticConnectors()):null;
- clusterConnection = new ClusterConnectionImpl(tcConfigs,
- connector,
- new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getRetryInterval(),
- config.isDuplicateDetection(),
- config.isForwardWhenNoConsumers(),
- config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- config.getMaxHops(),
- nodeUUID,
- backup,
- server.getConfiguration().getClusterUser(),
- server.getConfiguration().getClusterPassword(),
- config.isAllowDirectConnectionsOnly());
+ List<String> connectorNames = config.getAllowableConnectors();
+ TransportConfiguration[] allowableConnections = null;
+ if(connectorNames != null)
+ {
+ allowableConnections = connectorNameListToArray(connectorNames);
}
+ clusterConnection = new ClusterConnectionImpl(dg,
+ connector,
+ new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getRetryInterval(),
+ config.isDuplicateDetection(),
+ config.isForwardWhenNoConsumers(),
+ config.getConfirmationWindowSize(),
+ executorFactory,
+ server,
+ postOffice,
+ managementService,
+ scheduledExecutor,
+ config.getMaxHops(),
+ nodeUUID,
+ backup,
+ server.getConfiguration().getClusterUser(),
+ server.getConfiguration().getClusterPassword(),
+ config.isAllowableConnectionsOnly(),
+ allowableConnections);
managementService.registerCluster(clusterConnection, config);
@@ -809,31 +766,18 @@
private void announceBackup(final ClusterConnectionConfiguration config, final TransportConfiguration connector) throws Exception
{
- if (config.getStaticConnectors() != null)
- {
- TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
- backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
- backupServerLocator.setReconnectAttempts(-1);
- }
- else if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
+ DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
- if (dg == null)
- {
- ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
- "'. The cluster connection will not be deployed.");
- }
-
- backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
- backupServerLocator.setReconnectAttempts(-1);
- }
- else
+ if (dg == null)
{
- return;
+ ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+ "'. The cluster connection will not be deployed.");
}
+
+ backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
+ backupServerLocator.setReconnectAttempts(-1);
+
log.info("announcing backup");
this.executorFactory.getExecutor().execute(new Runnable()
{
Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -37,6 +37,8 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
+import org.hornetq.core.client.impl.SimpleUDPServerLocatorImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.ra.inflow.HornetQActivation;
@@ -1403,20 +1405,24 @@
}
else if (discoveryAddress != null)
{
+ // FIXME make discovery stategy pluggable with configuration
+ Map<String,Object> params = new HashMap<String,Object>();
+
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
: getDiscoveryPort();
- DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryAddress, discoveryPort);
-
long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout()
: raProperties.getDiscoveryRefreshTimeout();
long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout()
: raProperties.getDiscoveryInitialWaitTimeout();
- groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout);
+ params.put(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, discoveryAddress);
+ params.put(DiscoveryGroupConstants.GROUP_PORT_NAME, discoveryPort);
+ params.put(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, initialTimeout);
+ params.put(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, initialTimeout);
- groupConfiguration.setRefreshTimeout(refreshTimeout);
+ DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(SimpleUDPServerLocatorImpl.class.getName(), params, null);
if (ha)
{
Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -32,7 +32,7 @@
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -189,7 +189,7 @@
{
configs[i] = createTransportConfiguration(isNetty(), false, generateParams(nodes[i], isNetty()));
}
- return new ServerLocatorImpl(true, configs);
+ return new AbstractServerLocator(true, configs);
}
// Private -------------------------------------------------------
Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -27,7 +27,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.tests.util.JMSTestBase;
@@ -61,7 +61,7 @@
final AtomicInteger valueGC = new AtomicInteger(0);
- ServerLocatorImpl.finalizeCallback = new Runnable()
+ AbstractServerLocator.finalizeCallback = new Runnable()
{
public void run()
{
@@ -85,7 +85,7 @@
}
finally
{
- ServerLocatorImpl.finalizeCallback = null;
+ AbstractServerLocator.finalizeCallback = null;
}
assertEquals("The code is throwing exceptions", 0, valueGC.get());
More information about the hornetq-commits
mailing list