From do-not-reply at jboss.org Thu Jan 27 09:06:22 2011 Content-Type: multipart/mixed; boundary="===============6609616134239106515==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r10150 - in branches/HORNETQ-316: src/main/org/hornetq/api/core/client and 7 other directories. Date: Thu, 27 Jan 2011 09:06:21 -0500 Message-ID: <201101271406.p0RE6LfR005012@svn01.web.mwc.hst.phx2.redhat.com> --===============6609616134239106515== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: igarashitm Date: 2011-01-27 09:06:20 -0500 (Thu, 27 Jan 2011) New Revision: 10150 Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServe= rLocator.java branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGrou= pConstants.java branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServ= erLocatorImpl.java branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerL= ocatorImpl.java Removed: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocator= Impl.java Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfigu= ration.java branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.= java branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocator= Internal.java branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionC= onfiguration.java branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfig= urationParser.java branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/Clust= erConnectionImpl.java branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/Clust= erManagerImpl.java branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/fai= lover/MultipleBackupsFailoverTestBase.java branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connect= ion/CloseConnectionFactoryOnGCest.java Log: https://issues.jboss.org/browse/HORNETQ-316 first commit. just implemented pluggable discovery strategy and removed com= pile errors. not yet debugged. Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroup= Configuration.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfig= uration.java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfig= uration.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -1,5 +1,5 @@ /* - * Copyright 2009 Red Hat, Inc. + * Copyright 2010 Red Hat, Inc. * Red Hat licenses this file to you under the Apache License, version * 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at @@ -14,169 +14,45 @@ package org.hornetq.api.core; = import java.io.Serializable; - -import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.core.logging.Logger; -import org.hornetq.utils.UUIDGenerator; - +import java.util.Map; +import java.util.Properties; /** * A DiscoveryGroupConfiguration * - * @author Tim Fox - * = - * Created 18 Nov 2008 08:47:30 - * - * + * @author Tim Fox */ public class DiscoveryGroupConfiguration implements Serializable { - private static final long serialVersionUID =3D 8657206421727863400L; - = - private static final Logger log =3D Logger.getLogger(DiscoveryGroupConf= iguration.class); + private static final long serialVersionUID =3D 2877108926493109407L; = - - private String name; + private final String serverLocatorClassName; = - private String localBindAddress; - - private String groupAddress; - - private int groupPort; - - private long refreshTimeout; + private final String name; = = - private long discoveryInitialWaitTimeout; - - public DiscoveryGroupConfiguration(final String name, - final String localBindAddress, - final String groupAddress, - final int groupPort, - final long refreshTimeout, - final long discoveryInitialWaitTimeo= ut) - { - this.name =3D name; - this.groupAddress =3D groupAddress; - this.localBindAddress =3D localBindAddress; - this.groupPort =3D groupPort; - this.refreshTimeout =3D refreshTimeout; - this.discoveryInitialWaitTimeout =3D discoveryInitialWaitTimeout; - } - - public DiscoveryGroupConfiguration(final String groupAddress, - final int groupPort) - { - this(UUIDGenerator.getInstance().generateStringUUID(), null, groupAd= dress, groupPort, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, Hor= netQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT); - } - - public String getName() - { - return name; - } + private final Map params; = - public String getLocalBindAddress() + public DiscoveryGroupConfiguration(final String clazz, final Map params, final String name) = { - return localBindAddress; - } - - public String getGroupAddress() - { - return groupAddress; - } - - public int getGroupPort() - { - return groupPort; - } - - public long getRefreshTimeout() - { - return refreshTimeout; - } - - /** - * @param name the name to set - */ - public void setName(final String name) - { + this.serverLocatorClassName =3D clazz; + = + this.params =3D params; + = this.name =3D name; } = - /** - * @param localBindAddress the localBindAddress to set - */ - public void setLocalBindAdress(final String localBindAddress) + public String getServerLocatorClassName() { - this.localBindAddress =3D localBindAddress; + return this.serverLocatorClassName; } = - /** - * @param groupAddress the groupAddress to set - */ - public void setGroupAddress(final String groupAddress) + public Map getParams() { - this.groupAddress =3D groupAddress; + return this.params; } = - /** - * @param groupPort the groupPort to set - */ - public void setGroupPort(final int groupPort) + public String getName() { - this.groupPort =3D groupPort; + return this.name; } - - /** - * @param refreshTimeout the refreshTimeout to set - */ - public void setRefreshTimeout(final long refreshTimeout) - { - this.refreshTimeout =3D refreshTimeout; - } - - /** - * @return the discoveryInitialWaitTimeout - */ - public long getDiscoveryInitialWaitTimeout() - { - return discoveryInitialWaitTimeout; - } - - /** - * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout t= o set - */ - public void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTim= eout) - { - this.discoveryInitialWaitTimeout =3D discoveryInitialWaitTimeout; - } - - @Override - public boolean equals(Object o) - { - if (this =3D=3D o) return true; - if (o =3D=3D null || getClass() !=3D o.getClass()) return false; - - DiscoveryGroupConfiguration that =3D (DiscoveryGroupConfiguration) o; - - if (discoveryInitialWaitTimeout !=3D that.discoveryInitialWaitTimeou= t) return false; - if (groupPort !=3D that.groupPort) return false; - if (refreshTimeout !=3D that.refreshTimeout) return false; - if (groupAddress !=3D null ? !groupAddress.equals(that.groupAddress)= : that.groupAddress !=3D null) return false; - if (localBindAddress !=3D null ? !localBindAddress.equals(that.local= BindAddress) : that.localBindAddress !=3D null) - return false; - if (name !=3D null ? !name.equals(that.name) : that.name !=3D null) = return false; - - return true; - } - - @Override - public int hashCode() - { - int result =3D name !=3D null ? name.hashCode() : 0; - result =3D 31 * result + (localBindAddress !=3D null ? localBindAddr= ess.hashCode() : 0); - result =3D 31 * result + (groupAddress !=3D null ? groupAddress.hash= Code() : 0); - result =3D 31 * result + groupPort; - result =3D 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>>= 32)); - result =3D 31 * result + (int) (discoveryInitialWaitTimeout ^ (disco= veryInitialWaitTimeout >>> 32)); - return result; - } + = } Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQ= Client.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient= .java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient= .java 2011-01-27 14:06:20 UTC (rev 10150) @@ -13,13 +13,15 @@ package org.hornetq.api.core.client; = import org.hornetq.api.core.DiscoveryGroupConfiguration; -import org.hornetq.api.core.Pair; +import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBal= ancingPolicy; -import org.hornetq.core.client.impl.ClientSessionFactoryImpl; -import org.hornetq.core.client.impl.ServerLocatorImpl; +import org.hornetq.core.client.impl.DiscoveryGroupConstants; +import org.hornetq.core.client.impl.StaticServerLocatorImpl; = -import java.util.List; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.Map; = /** * Utility class for creating HornetQ {@link ClientSessionFactory} objects. @@ -107,21 +109,34 @@ */ public static ServerLocator createServerLocatorWithoutHA(TransportConfi= guration... transportConfigurations) { - return new ServerLocatorImpl(false, transportConfigurations); + Map params =3D new HashMap(); + params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, tran= sportConfigurations); + DiscoveryGroupConfiguration config =3D new DiscoveryGroupConfigurati= on(StaticServerLocatorImpl.class.getName(), params, null); + return createServerLocatorWithoutHA(config); } = /** * Create a ServerLocator which creates session factories from a set of= live servers, no HA backup information is propagated to the client * = - * The UDP address and port are used to listen for live servers in the = cluster - * = - * @param discoveryAddress The UDP group address to listen for updates - * @param discoveryPort the UDP port to listen for updates + * @param groupConfiguration The configuration for server discovery * @return the ServerLocator */ public static ServerLocator createServerLocatorWithoutHA(final Discover= yGroupConfiguration groupConfiguration) { - return new ServerLocatorImpl(false, groupConfiguration); + ServerLocator serverLocator =3D null; + String className =3D groupConfiguration.getServerLocatorClassName(); + try + { + ClassLoader loader =3D Thread.currentThread().getContextClassLoad= er(); + Class clazz =3D loader.loadClass(className); + Constructor constructor =3D clazz.getConstructor(Boolean.class= , DiscoveryGroupConfiguration.class); + serverLocator =3D (ServerLocator)constructor.newInstance(Boolean.= FALSE, groupConfiguration); + } + catch(Exception e) + { + new HornetQException(HornetQException.INTERNAL_ERROR, "Could not= instantiate ServerLocator implementation class: " + className, e); + } + return serverLocator; } = /** @@ -135,9 +150,12 @@ */ public static ServerLocator createServerLocatorWithHA(TransportConfigur= ation... initialServers) { - return new ServerLocatorImpl(true, initialServers); + Map params =3D new HashMap(); + params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, init= ialServers); + DiscoveryGroupConfiguration config =3D new DiscoveryGroupConfigurati= on(StaticServerLocatorImpl.class.getName(), params, null); + return createServerLocatorWithHA(config); } - = + /** * Create a ServerLocator which will receive cluster topology updates f= rom the cluster as servers leave or join and new backups are appointed or r= emoved. * The discoveryAddress and discoveryPort parameters in this method are= used to listen for UDP broadcasts which contain connection information for= members of the cluster. @@ -150,7 +168,20 @@ */ public static ServerLocator createServerLocatorWithHA(final DiscoveryGr= oupConfiguration groupConfiguration) { - return new ServerLocatorImpl(true, groupConfiguration); + ServerLocator serverLocator =3D null; + String className =3D groupConfiguration.getServerLocatorClassName(); + try + { + ClassLoader loader =3D Thread.currentThread().getContextClassLoad= er(); + Class clazz =3D loader.loadClass(className); + Constructor constructor =3D clazz.getConstructor(Boolean.class= , DiscoveryGroupConfiguration.class); + serverLocator =3D (ServerLocator)constructor.newInstance(Boolean.= TRUE, groupConfiguration); + } + catch(Exception e) + { + new HornetQException(HornetQException.INTERNAL_ERROR, "Could not= instantiate ServerLocator implementation class: " + className, e); + } + return serverLocator; } = = Copied: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/Abstract= ServerLocator.java (from rev 10124, branches/HORNETQ-316/src/main/org/horne= tq/core/client/impl/ServerLocatorImpl.java) =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServ= erLocator.java (rev 0) +++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServ= erLocator.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -0,0 +1,1026 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.client.impl; + +import java.io.Serializable; +import java.lang.reflect.Array; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.*; +import java.util.concurrent.*; + +import org.hornetq.api.core.DiscoveryGroupConfiguration; +import org.hornetq.api.core.Interceptor; +import org.hornetq.api.core.Pair; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.ClusterTopologyListener; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPoli= cy; +import org.hornetq.core.logging.Logger; +import org.hornetq.utils.HornetQThreadFactory; +import org.hornetq.utils.UUIDGenerator; + +/** + * A AbstractServerLocator + * + * @author Tim Fox + */ +public abstract class AbstractServerLocator implements ServerLocatorIntern= al, Serializable +{ + private static final long serialVersionUID =3D -1615857864410205260L; + + private static final Logger log =3D Logger.getLogger(AbstractServerLoca= tor.class); + + private final boolean ha; + + private boolean finalizeCheck =3D true; + + private boolean clusterConnection; + + private final Set topologyListeners =3D new Ha= shSet(); + + private Set factories =3D new HashSet(); + + private TransportConfiguration[] initialConnectors; + + private DiscoveryGroupConfiguration discoveryGroupConfiguration; + + private Topology topology =3D new Topology(); + + private Pair[] topology= Array; + + private boolean receivedTopology; + + private boolean compressLargeMessage; + + private ExecutorService threadPool; + + private ScheduledExecutorService scheduledThreadPool; + + private ConnectionLoadBalancingPolicy loadBalancingPolicy; + + private boolean readOnly; + + // Settable attributes: + + private boolean cacheLargeMessagesClient; + + private long clientFailureCheckPeriod; + + private long connectionTTL; + + private long callTimeout; + + private int minLargeMessageSize; + + private int consumerWindowSize; + + private int consumerMaxRate; + + private int confirmationWindowSize; + + private int producerWindowSize; + + private int producerMaxRate; + + private boolean blockOnAcknowledge; + + private boolean blockOnDurableSend; + + private boolean blockOnNonDurableSend; + + private boolean autoGroup; + + private boolean preAcknowledge; + + private String connectionLoadBalancingPolicyClassName; + + private int ackBatchSize; + + private boolean useGlobalPools; + + private int scheduledThreadPoolMaxSize; + + private int threadPoolMaxSize; + + private long retryInterval; + + private double retryIntervalMultiplier; + + private long maxRetryInterval; + + private int reconnectAttempts; + + private int initialConnectAttempts; + + private boolean failoverOnInitialConnection; + + private int initialMessagePacketSize; + + private volatile boolean closed; + + private final List interceptors =3D new CopyOnWriteArrayLi= st(); + + private static ExecutorService globalThreadPool; + + private static ScheduledExecutorService globalScheduledThreadPool; + + private String groupID; + + private String nodeID; + + private TransportConfiguration clusterTransportConfiguration; + + private boolean backup; + + private final Exception e =3D new Exception(); + = + // To be called when there are ServerLocator being finalized. + // To be used on test assertions + public static Runnable finalizeCallback =3D null; + + private static synchronized ExecutorService getGlobalThreadPool() + { + if (globalThreadPool =3D=3D null) + { + ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien= t-global-threads", true, getThisClassLoader()); + + globalThreadPool =3D Executors.newCachedThreadPool(factory); + } + + return globalThreadPool; + } + + public static synchronized ScheduledExecutorService getGlobalScheduledT= hreadPool() + { + if (globalScheduledThreadPool =3D=3D null) + { + ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien= t-global-scheduled-threads", + true, + getThisClassLoad= er()); + + globalScheduledThreadPool =3D Executors.newScheduledThreadPool(Ho= rnetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, + + fact= ory); + } + + return globalScheduledThreadPool; + } + + protected void setThreadPools() + { + if (useGlobalPools) + { + threadPool =3D getGlobalThreadPool(); + + scheduledThreadPool =3D getGlobalScheduledThreadPool(); + } + else + { + ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien= t-factory-threads-" + System.identityHashCode(this), + true, + getThisClassLoad= er()); + + if (threadPoolMaxSize =3D=3D -1) + { + threadPool =3D Executors.newCachedThreadPool(factory); + } + else + { + threadPool =3D Executors.newFixedThreadPool(threadPoolMaxSize,= factory); + } + + factory =3D new HornetQThreadFactory("HornetQ-client-factory-ping= er-threads-" + System.identityHashCode(this), + true, + getThisClassLoader()); + + scheduledThreadPool =3D Executors.newScheduledThreadPool(schedule= dThreadPoolMaxSize, factory); + } + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction() + { + public ClassLoader run() + { + return ClientSessionFactoryImpl.class.getClassLoader(); + } + }); + + } + + protected void instantiateLoadBalancingPolicy() + { + if (connectionLoadBalancingPolicyClassName =3D=3D null) + { + throw new IllegalStateException("Please specify a load balancing = policy class name on the session factory"); + } + + AccessController.doPrivileged(new PrivilegedAction() + { + public Object run() + { + ClassLoader loader =3D Thread.currentThread().getContextClassL= oader(); + try + { + Class clazz =3D loader.loadClass(connectionLoadBalancing= PolicyClassName); + loadBalancingPolicy =3D (ConnectionLoadBalancingPolicy)claz= z.newInstance(); + return null; + } + catch (Exception e) + { + throw new IllegalArgumentException("Unable to instantiate l= oad balancing policy \"" + connectionLoadBalancingPolicyClassName + + "\"", + e); + } + } + }); + } + + public AbstractServerLocator(final boolean useHA, + final DiscoveryGroupConfiguration discoveryGr= oupConfiguration) + { + e.fillInStackTrace(); + this.ha =3D useHA; + + this.discoveryGroupConfiguration =3D discoveryGroupConfiguration; + + this.nodeID =3D UUIDGenerator.getInstance().generateStringUUID(); + + clientFailureCheckPeriod =3D HornetQClient.DEFAULT_CLIENT_FAILURE_CH= ECK_PERIOD; + + connectionTTL =3D HornetQClient.DEFAULT_CONNECTION_TTL; + + callTimeout =3D HornetQClient.DEFAULT_CALL_TIMEOUT; + + minLargeMessageSize =3D HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; + + consumerWindowSize =3D HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE; + + consumerMaxRate =3D HornetQClient.DEFAULT_CONSUMER_MAX_RATE; + + confirmationWindowSize =3D HornetQClient.DEFAULT_CONFIRMATION_WINDOW= _SIZE; + + producerWindowSize =3D HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE; + + producerMaxRate =3D HornetQClient.DEFAULT_PRODUCER_MAX_RATE; + + blockOnAcknowledge =3D HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE; + + blockOnDurableSend =3D HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND; + + blockOnNonDurableSend =3D HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE= _SEND; + + autoGroup =3D HornetQClient.DEFAULT_AUTO_GROUP; + + preAcknowledge =3D HornetQClient.DEFAULT_PRE_ACKNOWLEDGE; + + ackBatchSize =3D HornetQClient.DEFAULT_ACK_BATCH_SIZE; + + connectionLoadBalancingPolicyClassName =3D HornetQClient.DEFAULT_CON= NECTION_LOAD_BALANCING_POLICY_CLASS_NAME; + + useGlobalPools =3D HornetQClient.DEFAULT_USE_GLOBAL_POOLS; + + scheduledThreadPoolMaxSize =3D HornetQClient.DEFAULT_SCHEDULED_THREA= D_POOL_MAX_SIZE; + + threadPoolMaxSize =3D HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE; + + retryInterval =3D HornetQClient.DEFAULT_RETRY_INTERVAL; + + retryIntervalMultiplier =3D HornetQClient.DEFAULT_RETRY_INTERVAL_MUL= TIPLIER; + + maxRetryInterval =3D HornetQClient.DEFAULT_MAX_RETRY_INTERVAL; + + reconnectAttempts =3D HornetQClient.DEFAULT_RECONNECT_ATTEMPTS; + + initialConnectAttempts =3D HornetQClient.INITIAL_CONNECT_ATTEMPTS; + + failoverOnInitialConnection =3D HornetQClient.DEFAULT_FAILOVER_ON_IN= ITIAL_CONNECTION; + + cacheLargeMessagesClient =3D HornetQClient.DEFAULT_CACHE_LARGE_MESSA= GE_CLIENT; + + initialMessagePacketSize =3D HornetQClient.DEFAULT_INITIAL_MESSAGE_P= ACKET_SIZE; + + cacheLargeMessagesClient =3D HornetQClient.DEFAULT_CACHE_LARGE_MESSA= GE_CLIENT; + + compressLargeMessage =3D HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAG= ES; + + clusterConnection =3D false; + } + + protected TransportConfiguration selectConnector() + { + if (receivedTopology) + { + int pos =3D loadBalancingPolicy.select(topologyArray.length); + + Pair pair =3D top= ologyArray[pos]; + + return pair.a; + } + else + { + // Get from initialconnectors + + int pos =3D loadBalancingPolicy.select(initialConnectors.length); + + return initialConnectors[pos]; + } + } + + protected int getConnectorLength() + { + if(receivedTopology) + { + return topologyArray.length; + } + else if(initialConnectors =3D=3D null) + { + return -1; + } + else + { + return initialConnectors.length; + } + } + = + public void disableFinalizeCheck() + { + finalizeCheck =3D false; + } + + public synchronized boolean isHA() + { + return ha; + } + + public synchronized boolean isCacheLargeMessagesClient() + { + return cacheLargeMessagesClient; + } + + public synchronized void setCacheLargeMessagesClient(final boolean cach= ed) + { + cacheLargeMessagesClient =3D cached; + } + + public synchronized long getClientFailureCheckPeriod() + { + return clientFailureCheckPeriod; + } + + public synchronized void setClientFailureCheckPeriod(final long clientF= ailureCheckPeriod) + { + checkWrite(); + this.clientFailureCheckPeriod =3D clientFailureCheckPeriod; + } + + public synchronized long getConnectionTTL() + { + return connectionTTL; + } + + public synchronized void setConnectionTTL(final long connectionTTL) + { + checkWrite(); + this.connectionTTL =3D connectionTTL; + } + + public synchronized long getCallTimeout() + { + return callTimeout; + } + + public synchronized void setCallTimeout(final long callTimeout) + { + checkWrite(); + this.callTimeout =3D callTimeout; + } + + public synchronized int getMinLargeMessageSize() + { + return minLargeMessageSize; + } + + public synchronized void setMinLargeMessageSize(final int minLargeMessa= geSize) + { + checkWrite(); + this.minLargeMessageSize =3D minLargeMessageSize; + } + + public synchronized int getConsumerWindowSize() + { + return consumerWindowSize; + } + + public synchronized void setConsumerWindowSize(final int consumerWindow= Size) + { + checkWrite(); + this.consumerWindowSize =3D consumerWindowSize; + } + + public synchronized int getConsumerMaxRate() + { + return consumerMaxRate; + } + + public synchronized void setConsumerMaxRate(final int consumerMaxRate) + { + checkWrite(); + this.consumerMaxRate =3D consumerMaxRate; + } + + public synchronized int getConfirmationWindowSize() + { + return confirmationWindowSize; + } + + public synchronized void setConfirmationWindowSize(final int confirmati= onWindowSize) + { + checkWrite(); + this.confirmationWindowSize =3D confirmationWindowSize; + } + + public synchronized int getProducerWindowSize() + { + return producerWindowSize; + } + + public synchronized void setProducerWindowSize(final int producerWindow= Size) + { + checkWrite(); + this.producerWindowSize =3D producerWindowSize; + } + + public synchronized int getProducerMaxRate() + { + return producerMaxRate; + } + + public synchronized void setProducerMaxRate(final int producerMaxRate) + { + checkWrite(); + this.producerMaxRate =3D producerMaxRate; + } + + public synchronized boolean isBlockOnAcknowledge() + { + return blockOnAcknowledge; + } + + public synchronized void setBlockOnAcknowledge(final boolean blockOnAck= nowledge) + { + checkWrite(); + this.blockOnAcknowledge =3D blockOnAcknowledge; + } + + public synchronized boolean isBlockOnDurableSend() + { + return blockOnDurableSend; + } + + public synchronized void setBlockOnDurableSend(final boolean blockOnDur= ableSend) + { + checkWrite(); + this.blockOnDurableSend =3D blockOnDurableSend; + } + + public synchronized boolean isBlockOnNonDurableSend() + { + return blockOnNonDurableSend; + } + + public synchronized void setBlockOnNonDurableSend(final boolean blockOn= NonDurableSend) + { + checkWrite(); + this.blockOnNonDurableSend =3D blockOnNonDurableSend; + } + + public synchronized boolean isAutoGroup() + { + return autoGroup; + } + + public synchronized void setAutoGroup(final boolean autoGroup) + { + checkWrite(); + this.autoGroup =3D autoGroup; + } + + public synchronized boolean isPreAcknowledge() + { + return preAcknowledge; + } + + public synchronized void setPreAcknowledge(final boolean preAcknowledge) + { + checkWrite(); + this.preAcknowledge =3D preAcknowledge; + } + + public synchronized int getAckBatchSize() + { + return ackBatchSize; + } + + public synchronized void setAckBatchSize(final int ackBatchSize) + { + checkWrite(); + this.ackBatchSize =3D ackBatchSize; + } + + public synchronized boolean isUseGlobalPools() + { + return useGlobalPools; + } + + public synchronized void setUseGlobalPools(final boolean useGlobalPools) + { + checkWrite(); + this.useGlobalPools =3D useGlobalPools; + } + + public synchronized int getScheduledThreadPoolMaxSize() + { + return scheduledThreadPoolMaxSize; + } + + public synchronized void setScheduledThreadPoolMaxSize(final int schedu= ledThreadPoolMaxSize) + { + checkWrite(); + this.scheduledThreadPoolMaxSize =3D scheduledThreadPoolMaxSize; + } + + public synchronized int getThreadPoolMaxSize() + { + return threadPoolMaxSize; + } + + public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSi= ze) + { + checkWrite(); + this.threadPoolMaxSize =3D threadPoolMaxSize; + } + + public synchronized long getRetryInterval() + { + return retryInterval; + } + + public synchronized void setRetryInterval(final long retryInterval) + { + checkWrite(); + this.retryInterval =3D retryInterval; + } + + public synchronized long getMaxRetryInterval() + { + return maxRetryInterval; + } + + public synchronized void setMaxRetryInterval(final long retryInterval) + { + checkWrite(); + maxRetryInterval =3D retryInterval; + } + + public synchronized double getRetryIntervalMultiplier() + { + return retryIntervalMultiplier; + } + + public synchronized void setRetryIntervalMultiplier(final double retryI= ntervalMultiplier) + { + checkWrite(); + this.retryIntervalMultiplier =3D retryIntervalMultiplier; + } + + public synchronized int getReconnectAttempts() + { + return reconnectAttempts; + } + + public synchronized void setReconnectAttempts(final int reconnectAttemp= ts) + { + checkWrite(); + this.reconnectAttempts =3D reconnectAttempts; + } + + public void setInitialConnectAttempts(int initialConnectAttempts) + { + checkWrite(); + this.initialConnectAttempts =3D initialConnectAttempts; + } + + public int getInitialConnectAttempts() + { + return initialConnectAttempts; + } + + public synchronized boolean isFailoverOnInitialConnection() + { + return this.failoverOnInitialConnection; + } + + public synchronized void setFailoverOnInitialConnection(final boolean f= ailover) + { + checkWrite(); + this.failoverOnInitialConnection =3D failover; + } + + public synchronized String getConnectionLoadBalancingPolicyClassName() + { + return connectionLoadBalancingPolicyClassName; + } + + public synchronized void setConnectionLoadBalancingPolicyClassName(fina= l String loadBalancingPolicyClassName) + { + checkWrite(); + connectionLoadBalancingPolicyClassName =3D loadBalancingPolicyClassN= ame; + } + + public TransportConfiguration[] getStaticTransportConfigurations() + { + return this.initialConnectors; + } + + public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() + { + return discoveryGroupConfiguration; + } + + public void addInterceptor(final Interceptor interceptor) + { + interceptors.add(interceptor); + } + + public boolean removeInterceptor(final Interceptor interceptor) + { + return interceptors.remove(interceptor); + } + + public synchronized int getInitialMessagePacketSize() + { + return initialMessagePacketSize; + } + + public synchronized void setInitialMessagePacketSize(final int size) + { + checkWrite(); + initialMessagePacketSize =3D size; + } + + public void setGroupID(final String groupID) + { + checkWrite(); + this.groupID =3D groupID; + } + + public String getGroupID() + { + return groupID; + } + + public boolean isCompressLargeMessage() + { + return compressLargeMessage; + } + + public void setCompressLargeMessage(boolean compress) + { + this.compressLargeMessage =3D compress; + } + + private void checkWrite() + { + if (readOnly) + { + throw new IllegalStateException("Cannot set attribute on SessionF= actory after it has been used"); + } + } + + public void setNodeID(String nodeID) + { + this.nodeID =3D nodeID; + } + + public String getNodeID() + { + return nodeID; + } + + public void setClusterConnection(boolean clusterConnection) + { + this.clusterConnection =3D clusterConnection; + } + + public boolean isClusterConnection() + { + return clusterConnection; + } + + public TransportConfiguration getClusterTransportConfiguration() + { + return clusterTransportConfiguration; + } + + public void setClusterTransportConfiguration(TransportConfiguration tc) + { + this.clusterTransportConfiguration =3D tc; + } + + public boolean isBackup() + { + return backup; + } + + public void setBackup(boolean backup) + { + this.backup =3D backup; + } + + protected boolean isReceivedTopology() + { + return this.receivedTopology; + } + = + protected boolean doFinalizeCheck() + { + return this.finalizeCheck; + } + = + protected ConnectionLoadBalancingPolicy getLoadBalancingPolicy() + { + return this.loadBalancingPolicy; + } + = + protected ExecutorService getThreadPool() + { + return threadPool; + } + + protected ScheduledExecutorService getScheduledThreadPool() + { + return scheduledThreadPool; + } + + protected List getInterceptors() + { + return this.interceptors; + } + = + protected TransportConfiguration[] getInitialConnectors() + { + return this.initialConnectors; + } + = + protected void setInitialConnectors(TransportConfiguration[] initialCon= nectors) + { + this.initialConnectors =3D initialConnectors; + } + = + protected boolean isReadOnly() + { + return this.readOnly; + } + = + protected void setReadOnly(boolean readOnly) + { + this.readOnly =3D readOnly; + } + = + protected boolean isClosed() + { + return this.closed; + } + = + @Override + protected void finalize() throws Throwable + { + if (finalizeCheck) + { + close(); + } + + super.finalize(); + } + + public void close() + { + if (closed) + { + return; + } + + + for (ClientSessionFactory factory : factories) + { + factory.close(); + } + + factories.clear(); + + if (!useGlobalPools) + { + if (threadPool !=3D null) + { + threadPool.shutdown(); + + try + { + if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECON= DS)) + { + log.warn("Timed out waiting for pool to terminate"); + } + } + catch (InterruptedException ignore) + { + } + } + + if (scheduledThreadPool !=3D null) + { + scheduledThreadPool.shutdown(); + + try + { + if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.M= ILLISECONDS)) + { + log.warn("Timed out waiting for scheduled pool to termin= ate"); + } + } + catch (InterruptedException ignore) + { + } + } + } + readOnly =3D false; + + closed =3D true; + } + = + public synchronized void notifyNodeDown(final String nodeID) + { + boolean removed =3D false; + + if (!ha) + { + return; + } + + removed =3D topology.removeMember(nodeID); + + if (!topology.isEmpty()) + { + updateArraysAndPairs(); + + if (topology.nodes() =3D=3D 1 && topology.getMember(this.nodeID) = !=3D null) + { + receivedTopology =3D false; + } + } + else + { + topologyArray =3D null; + + receivedTopology =3D false; + } + + if (removed) + { + for (ClusterTopologyListener listener : topologyListeners) + { + listener.nodeDown(nodeID); + } + } + } + + public synchronized void notifyNodeUp(final String nodeID, + final Pair connectorPair, + final boolean last) + { + if (!ha) + { + return; + } + + topology.addMember(nodeID, new TopologyMember(connectorPair)); + + TopologyMember actMember =3D topology.getMember(nodeID); + + if (actMember.getConnector().a !=3D null && actMember.getConnector()= .b !=3D null) + { + for (ClientSessionFactory factory : factories) + { + ((ClientSessionFactoryInternal)factory).setBackupConnector(act= Member.getConnector().a, + act= Member.getConnector().b); + } + } + + if (connectorPair.a !=3D null) + { + updateArraysAndPairs(); + } + + if (last) + { + receivedTopology =3D true; + } + + for (ClusterTopologyListener listener : topologyListeners) + { + listener.nodeUP(nodeID, connectorPair, last); + } + + // Notify if waiting on getting topology + notify(); + } + + private void updateArraysAndPairs() + { + topologyArray =3D (Pair[])Array.newInstance(Pair.class, + = topology.members()); + + int count =3D 0; + for (TopologyMember pair : topology.getMembers()) + { + topologyArray[count++] =3D pair.getConnector(); + } + } + + public synchronized void factoryClosed(final ClientSessionFactory facto= ry) + { + factories.remove(factory); + + if (factories.isEmpty()) + { + // Go back to using the broadcast or static list + + receivedTopology =3D false; + + topology =3D null; + + } + } + + public Topology getTopology() + { + return topology; + } + + public void addClusterTopologyListener(final ClusterTopologyListener li= stener) + { + topologyListeners.add(listener); + if(topology.members() > 0) + { + System.out.println("ServerLocatorImpl.addClusterTopologyListener"= ); + } + } + + public void removeClusterTopologyListener(final ClusterTopologyListener= listener) + { + topologyListeners.remove(listener); + } + + public synchronized void addFactory(ClientSessionFactoryInternal factor= y) + { + if (factory !=3D null) + { + TransportConfiguration backup =3D topology.getBackupForConnector(= factory.getConnectorConfiguration()); + factory.setBackupConnector(factory.getConnectorConfiguration(), b= ackup); + factories.add(factory); + } + } + + public static void shutdown() + { + if (globalScheduledThreadPool !=3D null) + { + globalScheduledThreadPool.shutdown(); + globalScheduledThreadPool =3D null; + } + if (globalThreadPool !=3D null) + { + globalThreadPool.shutdown(); + globalThreadPool =3D null; + } + } + = + public boolean isStaticDirectConnection(TransportConfiguration conf) + { + return false; + } +} Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/Discovery= GroupConstants.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGro= upConstants.java (rev 0) +++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGro= upConstants.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -0,0 +1,34 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.client.impl; + +/** + * A DiscoveryGroupConstants + * + * @author "Tomohisa Igarashi" + * + */ +public class DiscoveryGroupConstants +{ + // for static discovery + public static final String STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME = =3D "static-connector-ref-list"; + public static final String STATIC_CONNECTORS_LIST_NAME =3D "static-conn= ector-list"; + = + // for simple UDP discovery + public static final String LOCAL_BIND_ADDRESS_NAME =3D "local-bind-addr= ess"; + public static final String GROUP_ADDRESS_NAME =3D "group-address"; + public static final String GROUP_PORT_NAME =3D "group-port"; + public static final String INITIAL_WAIT_TIMEOUT_NAME =3D "initial-wait-= timeout"; + public static final String REFRESH_TIMEOUT_NAME =3D "refresh-timeout"; +} Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/= impl/DiscoveryGroupConstants.java ___________________________________________________________________ Name: svn:mime-type + text/plain Deleted: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerL= ocatorImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocato= rImpl.java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocato= rImpl.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -1,1456 +0,0 @@ -/* - * Copyright 2010 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.client.impl; - -import java.io.Serializable; -import java.lang.reflect.Array; -import java.net.InetAddress; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.*; -import java.util.concurrent.*; - -import org.hornetq.api.core.DiscoveryGroupConfiguration; -import org.hornetq.api.core.HornetQException; -import org.hornetq.api.core.Interceptor; -import org.hornetq.api.core.Pair; -import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.client.ClientSessionFactory; -import org.hornetq.api.core.client.ClusterTopologyListener; -import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPoli= cy; -import org.hornetq.core.cluster.DiscoveryEntry; -import org.hornetq.core.cluster.DiscoveryGroup; -import org.hornetq.core.cluster.DiscoveryListener; -import org.hornetq.core.cluster.impl.DiscoveryGroupImpl; -import org.hornetq.core.logging.Logger; -import org.hornetq.utils.HornetQThreadFactory; -import org.hornetq.utils.UUIDGenerator; - -/** - * A ServerLocatorImpl - * - * @author Tim Fox - */ -public class ServerLocatorImpl implements ServerLocatorInternal, Discovery= Listener, Serializable -{ - private static final long serialVersionUID =3D -1615857864410205260L; - - private static final Logger log =3D Logger.getLogger(ServerLocatorImpl.= class); - - private final boolean ha; - - private boolean finalizeCheck =3D true; - - private boolean clusterConnection; - - private final Set topologyListeners =3D new Ha= shSet(); - - private Set factories =3D new HashSet(); - - private TransportConfiguration[] initialConnectors; - - private DiscoveryGroupConfiguration discoveryGroupConfiguration; - - private StaticConnector staticConnector =3D new StaticConnector(); - - private Topology topology =3D new Topology(); - - private Pair[] topology= Array; - - private boolean receivedTopology; - - private boolean compressLargeMessage; - - private ExecutorService threadPool; - - private ScheduledExecutorService scheduledThreadPool; - - private DiscoveryGroup discoveryGroup; - - private ConnectionLoadBalancingPolicy loadBalancingPolicy; - - private boolean readOnly; - - // Settable attributes: - - private boolean cacheLargeMessagesClient; - - private long clientFailureCheckPeriod; - - private long connectionTTL; - - private long callTimeout; - - private int minLargeMessageSize; - - private int consumerWindowSize; - - private int consumerMaxRate; - - private int confirmationWindowSize; - - private int producerWindowSize; - - private int producerMaxRate; - - private boolean blockOnAcknowledge; - - private boolean blockOnDurableSend; - - private boolean blockOnNonDurableSend; - - private boolean autoGroup; - - private boolean preAcknowledge; - - private String connectionLoadBalancingPolicyClassName; - - private int ackBatchSize; - - private boolean useGlobalPools; - - private int scheduledThreadPoolMaxSize; - - private int threadPoolMaxSize; - - private long retryInterval; - - private double retryIntervalMultiplier; - - private long maxRetryInterval; - - private int reconnectAttempts; - - private int initialConnectAttempts; - - private boolean failoverOnInitialConnection; - - private int initialMessagePacketSize; - - private volatile boolean closed; - - private volatile boolean closing; - - private final List interceptors =3D new CopyOnWriteArrayLi= st(); - - private static ExecutorService globalThreadPool; - - private static ScheduledExecutorService globalScheduledThreadPool; - - private String groupID; - - private String nodeID; - - private TransportConfiguration clusterTransportConfiguration; - - private boolean backup; - - private final Exception e =3D new Exception(); - = - // To be called when there are ServerLocator being finalized. - // To be used on test assertions - public static Runnable finalizeCallback =3D null; - - private static synchronized ExecutorService getGlobalThreadPool() - { - if (globalThreadPool =3D=3D null) - { - ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien= t-global-threads", true, getThisClassLoader()); - - globalThreadPool =3D Executors.newCachedThreadPool(factory); - } - - return globalThreadPool; - } - - public static synchronized ScheduledExecutorService getGlobalScheduledT= hreadPool() - { - if (globalScheduledThreadPool =3D=3D null) - { - ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien= t-global-scheduled-threads", - true, - getThisClassLoad= er()); - - globalScheduledThreadPool =3D Executors.newScheduledThreadPool(Ho= rnetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, - - fact= ory); - } - - return globalScheduledThreadPool; - } - - private void setThreadPools() - { - if (useGlobalPools) - { - threadPool =3D getGlobalThreadPool(); - - scheduledThreadPool =3D getGlobalScheduledThreadPool(); - } - else - { - ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien= t-factory-threads-" + System.identityHashCode(this), - true, - getThisClassLoad= er()); - - if (threadPoolMaxSize =3D=3D -1) - { - threadPool =3D Executors.newCachedThreadPool(factory); - } - else - { - threadPool =3D Executors.newFixedThreadPool(threadPoolMaxSize,= factory); - } - - factory =3D new HornetQThreadFactory("HornetQ-client-factory-ping= er-threads-" + System.identityHashCode(this), - true, - getThisClassLoader()); - - scheduledThreadPool =3D Executors.newScheduledThreadPool(schedule= dThreadPoolMaxSize, factory); - } - } - - private static ClassLoader getThisClassLoader() - { - return AccessController.doPrivileged(new PrivilegedAction() - { - public ClassLoader run() - { - return ClientSessionFactoryImpl.class.getClassLoader(); - } - }); - - } - - private void instantiateLoadBalancingPolicy() - { - if (connectionLoadBalancingPolicyClassName =3D=3D null) - { - throw new IllegalStateException("Please specify a load balancing = policy class name on the session factory"); - } - - AccessController.doPrivileged(new PrivilegedAction() - { - public Object run() - { - ClassLoader loader =3D Thread.currentThread().getContextClassL= oader(); - try - { - Class clazz =3D loader.loadClass(connectionLoadBalancing= PolicyClassName); - loadBalancingPolicy =3D (ConnectionLoadBalancingPolicy)claz= z.newInstance(); - return null; - } - catch (Exception e) - { - throw new IllegalArgumentException("Unable to instantiate l= oad balancing policy \"" + connectionLoadBalancingPolicyClassName + - "\"", - e); - } - } - }); - } - - private synchronized void initialise() throws Exception - { - if (!readOnly) - { - setThreadPools(); - - instantiateLoadBalancingPolicy(); - - if (discoveryGroupConfiguration !=3D null) - { - InetAddress groupAddress =3D InetAddress.getByName(discoveryGr= oupConfiguration.getGroupAddress()); - - InetAddress lbAddress; - - if (discoveryGroupConfiguration.getLocalBindAddress() !=3D nul= l) - { - lbAddress =3D InetAddress.getByName(discoveryGroupConfigura= tion.getLocalBindAddress()); - } - else - { - lbAddress =3D null; - } - - discoveryGroup =3D new DiscoveryGroupImpl(nodeID, - discoveryGroupConfigur= ation.getName(), - lbAddress, - groupAddress, - discoveryGroupConfigur= ation.getGroupPort(), - discoveryGroupConfigur= ation.getRefreshTimeout()); - - discoveryGroup.registerListener(this); - - discoveryGroup.start(); - } - - readOnly =3D true; - } - } - - private ServerLocatorImpl(final boolean useHA, - final DiscoveryGroupConfiguration discoveryGr= oupConfiguration, - final TransportConfiguration[] transportConfi= gs) - { - e.fillInStackTrace(); - this.ha =3D useHA; - - this.discoveryGroupConfiguration =3D discoveryGroupConfiguration; - - this.initialConnectors =3D transportConfigs; - - this.nodeID =3D UUIDGenerator.getInstance().generateStringUUID(); - - clientFailureCheckPeriod =3D HornetQClient.DEFAULT_CLIENT_FAILURE_CH= ECK_PERIOD; - - connectionTTL =3D HornetQClient.DEFAULT_CONNECTION_TTL; - - callTimeout =3D HornetQClient.DEFAULT_CALL_TIMEOUT; - - minLargeMessageSize =3D HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; - - consumerWindowSize =3D HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE; - - consumerMaxRate =3D HornetQClient.DEFAULT_CONSUMER_MAX_RATE; - - confirmationWindowSize =3D HornetQClient.DEFAULT_CONFIRMATION_WINDOW= _SIZE; - - producerWindowSize =3D HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE; - - producerMaxRate =3D HornetQClient.DEFAULT_PRODUCER_MAX_RATE; - - blockOnAcknowledge =3D HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE; - - blockOnDurableSend =3D HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND; - - blockOnNonDurableSend =3D HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE= _SEND; - - autoGroup =3D HornetQClient.DEFAULT_AUTO_GROUP; - - preAcknowledge =3D HornetQClient.DEFAULT_PRE_ACKNOWLEDGE; - - ackBatchSize =3D HornetQClient.DEFAULT_ACK_BATCH_SIZE; - - connectionLoadBalancingPolicyClassName =3D HornetQClient.DEFAULT_CON= NECTION_LOAD_BALANCING_POLICY_CLASS_NAME; - - useGlobalPools =3D HornetQClient.DEFAULT_USE_GLOBAL_POOLS; - - scheduledThreadPoolMaxSize =3D HornetQClient.DEFAULT_SCHEDULED_THREA= D_POOL_MAX_SIZE; - - threadPoolMaxSize =3D HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE; - - retryInterval =3D HornetQClient.DEFAULT_RETRY_INTERVAL; - - retryIntervalMultiplier =3D HornetQClient.DEFAULT_RETRY_INTERVAL_MUL= TIPLIER; - - maxRetryInterval =3D HornetQClient.DEFAULT_MAX_RETRY_INTERVAL; - - reconnectAttempts =3D HornetQClient.DEFAULT_RECONNECT_ATTEMPTS; - - initialConnectAttempts =3D HornetQClient.INITIAL_CONNECT_ATTEMPTS; - - failoverOnInitialConnection =3D HornetQClient.DEFAULT_FAILOVER_ON_IN= ITIAL_CONNECTION; - - cacheLargeMessagesClient =3D HornetQClient.DEFAULT_CACHE_LARGE_MESSA= GE_CLIENT; - - initialMessagePacketSize =3D HornetQClient.DEFAULT_INITIAL_MESSAGE_P= ACKET_SIZE; - - cacheLargeMessagesClient =3D HornetQClient.DEFAULT_CACHE_LARGE_MESSA= GE_CLIENT; - - compressLargeMessage =3D HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAG= ES; - - clusterConnection =3D false; - } - - /** - * Create a ServerLocatorImpl using UDP discovery to lookup cluster - * - * @param discoveryAddress - * @param discoveryPort - */ - public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfi= guration groupConfiguration) - { - this(useHA, groupConfiguration, null); - } - - /** - * Create a ServerLocatorImpl using a static list of live servers - * - * @param transportConfigs - */ - public ServerLocatorImpl(final boolean useHA, final TransportConfigurat= ion... transportConfigs) - { - this(useHA, null, transportConfigs); - } - - private TransportConfiguration selectConnector() - { - if (receivedTopology) - { - int pos =3D loadBalancingPolicy.select(topologyArray.length); - - Pair pair =3D top= ologyArray[pos]; - - return pair.a; - } - else - { - // Get from initialconnectors - - int pos =3D loadBalancingPolicy.select(initialConnectors.length); - - return initialConnectors[pos]; - } - } - - public void start(Executor executor) throws Exception - { - initialise(); - - executor.execute(new Runnable() - { - public void run() - { - try - { - connect(); - } - catch (Exception e) - { - if (!closing) - { - log.warn("did not connect the cluster connection to othe= r nodes", e); - } - } - } - }); - } - - /* (non-Javadoc) - * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck() - */ - public void disableFinalizeCheck() - { - finalizeCheck =3D false; - } - - public ClientSessionFactory connect() throws Exception - { - ClientSessionFactoryInternal sf; - // static list of initial connectors - if (initialConnectors !=3D null && discoveryGroup =3D=3D null) - { - sf =3D (ClientSessionFactoryInternal)staticConnector.connect(); - } - // wait for discovery group to get the list of initial connectors - else - { - sf =3D (ClientSessionFactoryInternal)createSessionFactory(); - } - addFactory(sf); - return sf; - } - - public ClientSessionFactory createSessionFactory(final TransportConfigu= ration transportConfiguration) throws Exception - { - if (closed) - { - throw new IllegalStateException("Cannot create session factory, s= erver locator is closed (maybe it has been garbage collected)"); - } - - try - { - initialise(); - } - catch (Exception e) - { - throw new HornetQException(HornetQException.INTERNAL_ERROR, "Fail= ed to initialise session factory", e); - } - - ClientSessionFactoryInternal factory =3D new ClientSessionFactoryImp= l(this, - = transportConfiguration, - = callTimeout, - = clientFailureCheckPeriod, - = connectionTTL, - = retryInterval, - = retryIntervalMultiplier, - = maxRetryInterval, - = reconnectAttempts, - = threadPool, - = scheduledThreadPool, - = interceptors); - - factory.connect(reconnectAttempts, failoverOnInitialConnection); - - addFactory(factory); - - return factory; - } - - public ClientSessionFactory createSessionFactory() throws Exception - { - if (closed) - { - throw new IllegalStateException("Cannot create session factory, s= erver locator is closed (maybe it has been garbage collected)"); - } - - try - { - initialise(); - } - catch (Exception e) - { - throw new HornetQException(HornetQException.INTERNAL_ERROR, "Fail= ed to initialise session factory", e); - } - - if (initialConnectors =3D=3D null && discoveryGroup !=3D null) - { - // Wait for an initial broadcast to give us at least one node in = the cluster - long timeout =3D clusterConnection ? 0 : discoveryGroupConfigurat= ion.getDiscoveryInitialWaitTimeout(); - boolean ok =3D discoveryGroup.waitForBroadcast(timeout); - - if (!ok) - { - throw new HornetQException(HornetQException.CONNECTION_TIMEDOU= T, - "Timed out waiting to receive initi= al broadcast from cluster"); - } - } - - ClientSessionFactoryInternal factory =3D null; - - synchronized (this) - { - boolean retry; - int attempts =3D 0; - do - { - retry =3D false; - - TransportConfiguration tc =3D selectConnector(); - - // try each factory in the list until we find one which works - - try - { - factory =3D new ClientSessionFactoryImpl(this, - tc, - callTimeout, - clientFailureCheckPe= riod, - connectionTTL, - retryInterval, - retryIntervalMultipl= ier, - maxRetryInterval, - reconnectAttempts, - threadPool, - scheduledThreadPool, - interceptors); - factory.connect(initialConnectAttempts, failoverOnInitialCo= nnection); - } - catch (HornetQException e) - { - factory.close(); - factory =3D null; - if (e.getCode() =3D=3D HornetQException.NOT_CONNECTED) - { - attempts++; - - if (topologyArray !=3D null && attempts =3D=3D topologyA= rray.length) - { - throw new HornetQException(HornetQException.NOT_CONNE= CTED, - "Cannot connect to server(= s). Tried with all available servers."); - } - if (topologyArray =3D=3D null && initialConnectors !=3D = null && attempts =3D=3D initialConnectors.length) - { - throw new HornetQException(HornetQException.NOT_CONNE= CTED, - "Cannot connect to server(= s). Tried with all available servers."); - } - retry =3D true; - } - else - { - throw e; - } - } - } - while (retry); - - if (ha) - { - long toWait =3D 30000; - long start =3D System.currentTimeMillis(); - while (!receivedTopology && toWait > 0) - { - // Now wait for the topology - - try - { - wait(toWait); - } - catch (InterruptedException ignore) - { - } - - long now =3D System.currentTimeMillis(); - - toWait -=3D now - start; - - start =3D now; - } - - if (toWait <=3D 0) - { - throw new HornetQException(HornetQException.CONNECTION_TIME= DOUT, - "Timed out waiting to receive cl= uster topology"); - } - } - - addFactory(factory); - - return factory; - } - } - - public synchronized boolean isHA() - { - return ha; - } - - public synchronized boolean isCacheLargeMessagesClient() - { - return cacheLargeMessagesClient; - } - - public synchronized void setCacheLargeMessagesClient(final boolean cach= ed) - { - cacheLargeMessagesClient =3D cached; - } - - public synchronized long getClientFailureCheckPeriod() - { - return clientFailureCheckPeriod; - } - - public synchronized void setClientFailureCheckPeriod(final long clientF= ailureCheckPeriod) - { - checkWrite(); - this.clientFailureCheckPeriod =3D clientFailureCheckPeriod; - } - - public synchronized long getConnectionTTL() - { - return connectionTTL; - } - - public synchronized void setConnectionTTL(final long connectionTTL) - { - checkWrite(); - this.connectionTTL =3D connectionTTL; - } - - public synchronized long getCallTimeout() - { - return callTimeout; - } - - public synchronized void setCallTimeout(final long callTimeout) - { - checkWrite(); - this.callTimeout =3D callTimeout; - } - - public synchronized int getMinLargeMessageSize() - { - return minLargeMessageSize; - } - - public synchronized void setMinLargeMessageSize(final int minLargeMessa= geSize) - { - checkWrite(); - this.minLargeMessageSize =3D minLargeMessageSize; - } - - public synchronized int getConsumerWindowSize() - { - return consumerWindowSize; - } - - public synchronized void setConsumerWindowSize(final int consumerWindow= Size) - { - checkWrite(); - this.consumerWindowSize =3D consumerWindowSize; - } - - public synchronized int getConsumerMaxRate() - { - return consumerMaxRate; - } - - public synchronized void setConsumerMaxRate(final int consumerMaxRate) - { - checkWrite(); - this.consumerMaxRate =3D consumerMaxRate; - } - - public synchronized int getConfirmationWindowSize() - { - return confirmationWindowSize; - } - - public synchronized void setConfirmationWindowSize(final int confirmati= onWindowSize) - { - checkWrite(); - this.confirmationWindowSize =3D confirmationWindowSize; - } - - public synchronized int getProducerWindowSize() - { - return producerWindowSize; - } - - public synchronized void setProducerWindowSize(final int producerWindow= Size) - { - checkWrite(); - this.producerWindowSize =3D producerWindowSize; - } - - public synchronized int getProducerMaxRate() - { - return producerMaxRate; - } - - public synchronized void setProducerMaxRate(final int producerMaxRate) - { - checkWrite(); - this.producerMaxRate =3D producerMaxRate; - } - - public synchronized boolean isBlockOnAcknowledge() - { - return blockOnAcknowledge; - } - - public synchronized void setBlockOnAcknowledge(final boolean blockOnAck= nowledge) - { - checkWrite(); - this.blockOnAcknowledge =3D blockOnAcknowledge; - } - - public synchronized boolean isBlockOnDurableSend() - { - return blockOnDurableSend; - } - - public synchronized void setBlockOnDurableSend(final boolean blockOnDur= ableSend) - { - checkWrite(); - this.blockOnDurableSend =3D blockOnDurableSend; - } - - public synchronized boolean isBlockOnNonDurableSend() - { - return blockOnNonDurableSend; - } - - public synchronized void setBlockOnNonDurableSend(final boolean blockOn= NonDurableSend) - { - checkWrite(); - this.blockOnNonDurableSend =3D blockOnNonDurableSend; - } - - public synchronized boolean isAutoGroup() - { - return autoGroup; - } - - public synchronized void setAutoGroup(final boolean autoGroup) - { - checkWrite(); - this.autoGroup =3D autoGroup; - } - - public synchronized boolean isPreAcknowledge() - { - return preAcknowledge; - } - - public synchronized void setPreAcknowledge(final boolean preAcknowledge) - { - checkWrite(); - this.preAcknowledge =3D preAcknowledge; - } - - public synchronized int getAckBatchSize() - { - return ackBatchSize; - } - - public synchronized void setAckBatchSize(final int ackBatchSize) - { - checkWrite(); - this.ackBatchSize =3D ackBatchSize; - } - - public synchronized boolean isUseGlobalPools() - { - return useGlobalPools; - } - - public synchronized void setUseGlobalPools(final boolean useGlobalPools) - { - checkWrite(); - this.useGlobalPools =3D useGlobalPools; - } - - public synchronized int getScheduledThreadPoolMaxSize() - { - return scheduledThreadPoolMaxSize; - } - - public synchronized void setScheduledThreadPoolMaxSize(final int schedu= ledThreadPoolMaxSize) - { - checkWrite(); - this.scheduledThreadPoolMaxSize =3D scheduledThreadPoolMaxSize; - } - - public synchronized int getThreadPoolMaxSize() - { - return threadPoolMaxSize; - } - - public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSi= ze) - { - checkWrite(); - this.threadPoolMaxSize =3D threadPoolMaxSize; - } - - public synchronized long getRetryInterval() - { - return retryInterval; - } - - public synchronized void setRetryInterval(final long retryInterval) - { - checkWrite(); - this.retryInterval =3D retryInterval; - } - - public synchronized long getMaxRetryInterval() - { - return maxRetryInterval; - } - - public synchronized void setMaxRetryInterval(final long retryInterval) - { - checkWrite(); - maxRetryInterval =3D retryInterval; - } - - public synchronized double getRetryIntervalMultiplier() - { - return retryIntervalMultiplier; - } - - public synchronized void setRetryIntervalMultiplier(final double retryI= ntervalMultiplier) - { - checkWrite(); - this.retryIntervalMultiplier =3D retryIntervalMultiplier; - } - - public synchronized int getReconnectAttempts() - { - return reconnectAttempts; - } - - public synchronized void setReconnectAttempts(final int reconnectAttemp= ts) - { - checkWrite(); - this.reconnectAttempts =3D reconnectAttempts; - } - - public void setInitialConnectAttempts(int initialConnectAttempts) - { - checkWrite(); - this.initialConnectAttempts =3D initialConnectAttempts; - } - - public int getInitialConnectAttempts() - { - return initialConnectAttempts; - } - - public synchronized boolean isFailoverOnInitialConnection() - { - return this.failoverOnInitialConnection; - } - - public synchronized void setFailoverOnInitialConnection(final boolean f= ailover) - { - checkWrite(); - this.failoverOnInitialConnection =3D failover; - } - - public synchronized String getConnectionLoadBalancingPolicyClassName() - { - return connectionLoadBalancingPolicyClassName; - } - - public synchronized void setConnectionLoadBalancingPolicyClassName(fina= l String loadBalancingPolicyClassName) - { - checkWrite(); - connectionLoadBalancingPolicyClassName =3D loadBalancingPolicyClassN= ame; - } - - public TransportConfiguration[] getStaticTransportConfigurations() - { - return this.initialConnectors; - } - - public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() - { - return discoveryGroupConfiguration; - } - - public void addInterceptor(final Interceptor interceptor) - { - interceptors.add(interceptor); - } - - public boolean removeInterceptor(final Interceptor interceptor) - { - return interceptors.remove(interceptor); - } - - public synchronized int getInitialMessagePacketSize() - { - return initialMessagePacketSize; - } - - public synchronized void setInitialMessagePacketSize(final int size) - { - checkWrite(); - initialMessagePacketSize =3D size; - } - - public void setGroupID(final String groupID) - { - checkWrite(); - this.groupID =3D groupID; - } - - public String getGroupID() - { - return groupID; - } - - /* (non-Javadoc) - * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessag= e() - */ - public boolean isCompressLargeMessage() - { - return compressLargeMessage; - } - - /* (non-Javadoc) - * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessa= ge(boolean) - */ - public void setCompressLargeMessage(boolean compress) - { - this.compressLargeMessage =3D compress; - } - - private void checkWrite() - { - if (readOnly) - { - throw new IllegalStateException("Cannot set attribute on SessionF= actory after it has been used"); - } - } - - public void setNodeID(String nodeID) - { - this.nodeID =3D nodeID; - } - - public String getNodeID() - { - return nodeID; - } - - public void setClusterConnection(boolean clusterConnection) - { - this.clusterConnection =3D clusterConnection; - } - - public boolean isClusterConnection() - { - return clusterConnection; - } - - public TransportConfiguration getClusterTransportConfiguration() - { - return clusterTransportConfiguration; - } - - public void setClusterTransportConfiguration(TransportConfiguration tc) - { - this.clusterTransportConfiguration =3D tc; - } - - public boolean isBackup() - { - return backup; - } - - public void setBackup(boolean backup) - { - this.backup =3D backup; - } - - @Override - protected void finalize() throws Throwable - { - if (finalizeCheck) - { - close(); - } - - super.finalize(); - } - - public void close() - { - if (closed) - { - return; - } - - closing =3D true; - - if (discoveryGroup !=3D null) - { - try - { - discoveryGroup.stop(); - } - catch (Exception e) - { - log.error("Failed to stop discovery group", e); - } - } - else - { - staticConnector.disconnect(); - } - - for (ClientSessionFactory factory : factories) - { - factory.close(); - } - - factories.clear(); - - if (!useGlobalPools) - { - if (threadPool !=3D null) - { - threadPool.shutdown(); - - try - { - if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECON= DS)) - { - log.warn("Timed out waiting for pool to terminate"); - } - } - catch (InterruptedException ignore) - { - } - } - - if (scheduledThreadPool !=3D null) - { - scheduledThreadPool.shutdown(); - - try - { - if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.M= ILLISECONDS)) - { - log.warn("Timed out waiting for scheduled pool to termin= ate"); - } - } - catch (InterruptedException ignore) - { - } - } - } - readOnly =3D false; - - closed =3D true; - } - - public synchronized void notifyNodeDown(final String nodeID) - { - boolean removed =3D false; - - if (!ha) - { - return; - } - - removed =3D topology.removeMember(nodeID); - - if (!topology.isEmpty()) - { - updateArraysAndPairs(); - - if (topology.nodes() =3D=3D 1 && topology.getMember(this.nodeID) = !=3D null) - { - receivedTopology =3D false; - } - } - else - { - topologyArray =3D null; - - receivedTopology =3D false; - } - - if (removed) - { - for (ClusterTopologyListener listener : topologyListeners) - { - listener.nodeDown(nodeID); - } - } - } - - public synchronized void notifyNodeUp(final String nodeID, - final Pair connectorPair, - final boolean last) - { - if (!ha) - { - return; - } - - topology.addMember(nodeID, new TopologyMember(connectorPair)); - - TopologyMember actMember =3D topology.getMember(nodeID); - - if (actMember.getConnector().a !=3D null && actMember.getConnector()= .b !=3D null) - { - for (ClientSessionFactory factory : factories) - { - ((ClientSessionFactoryInternal)factory).setBackupConnector(act= Member.getConnector().a, - act= Member.getConnector().b); - } - } - - if (connectorPair.a !=3D null) - { - updateArraysAndPairs(); - } - - if (last) - { - receivedTopology =3D true; - } - - for (ClusterTopologyListener listener : topologyListeners) - { - listener.nodeUP(nodeID, connectorPair, last); - } - - // Notify if waiting on getting topology - notify(); - } - - private void updateArraysAndPairs() - { - topologyArray =3D (Pair[])Array.newInstance(Pair.class, - = topology.members()); - - int count =3D 0; - for (TopologyMember pair : topology.getMembers()) - { - topologyArray[count++] =3D pair.getConnector(); - } - } - - public synchronized void connectorsChanged() - { - List newConnectors =3D discoveryGroup.getDiscoveryEn= tries(); - - this.initialConnectors =3D (TransportConfiguration[])Array.newInstan= ce(TransportConfiguration.class, - = newConnectors.size()); - - int count =3D 0; - for (DiscoveryEntry entry : newConnectors) - { - this.initialConnectors[count++] =3D entry.getConnector(); - } - - if (ha && clusterConnection && !receivedTopology && initialConnector= s.length > 0) - { - // FIXME the node is alone in the cluster. We create a connection= to the new node - // to trigger the node notification to form the cluster. - try - { - connect(); - } - catch (Exception e) - { - e.printStackTrace(); // To change body of catch statement use = File | Settings | File Templates. - } - } - } - - public synchronized void factoryClosed(final ClientSessionFactory facto= ry) - { - factories.remove(factory); - - if (factories.isEmpty()) - { - // Go back to using the broadcast or static list - - receivedTopology =3D false; - - topology =3D null; - - } - } - - public Topology getTopology() - { - return topology; - } - - public void addClusterTopologyListener(final ClusterTopologyListener li= stener) - { - topologyListeners.add(listener); - if(topology.members() > 0) - { - System.out.println("ServerLocatorImpl.addClusterTopologyListener"= ); - } - } - - public void removeClusterTopologyListener(final ClusterTopologyListener= listener) - { - topologyListeners.remove(listener); - } - - public synchronized void addFactory(ClientSessionFactoryInternal factor= y) - { - if (factory !=3D null) - { - TransportConfiguration backup =3D topology.getBackupForConnector(= factory.getConnectorConfiguration()); - factory.setBackupConnector(factory.getConnectorConfiguration(), b= ackup); - factories.add(factory); - } - } - - public static void shutdown() - { - if (globalScheduledThreadPool !=3D null) - { - globalScheduledThreadPool.shutdown(); - globalScheduledThreadPool =3D null; - } - if (globalThreadPool !=3D null) - { - globalThreadPool.shutdown(); - globalThreadPool =3D null; - } - } - - class StaticConnector implements Serializable - { - private List connectors; - - public ClientSessionFactory connect() throws HornetQException - { - if (closed) - { - throw new IllegalStateException("Cannot create session factory= , server locator is closed (maybe it has been garbage collected)"); - } - - try - { - initialise(); - } - catch (Exception e) - { - throw new HornetQException(HornetQException.INTERNAL_ERROR, "F= ailed to initialise session factory", e); - } - - ClientSessionFactory csf =3D null; - - createConnectors(); - - try - { - List> futures =3D threadPool.invo= keAll(connectors); - for (int i =3D 0, futuresSize =3D futures.size(); i < futuresS= ize; i++) - { - Future future =3D futures.get(i); - try - { - csf =3D future.get(); - if (csf !=3D null) - break; - } - catch (Exception e) - { - log.debug("unable to connect with static connector " + c= onnectors.get(i).initialConnector); - } - } - if (csf =3D=3D null && !closed) - { - throw new HornetQException(HornetQException.NOT_CONNECTED, = "Failed to connect to any static connectors"); - } - } - catch (InterruptedException e) - { - throw new HornetQException(HornetQException.NOT_CONNECTED, "Fa= iled to connect to any static connectors", e); - } - - if (csf =3D=3D null && !closed) - { - throw new HornetQException(HornetQException.NOT_CONNECTED, "Fa= iled to connect to any static connectors"); - } - return csf; - } - - private synchronized void createConnectors() - { - connectors =3D new ArrayList(); - for (TransportConfiguration initialConnector : initialConnectors) - { - ClientSessionFactoryInternal factory =3D new ClientSessionFact= oryImpl(ServerLocatorImpl.this, - = initialConnector, - = callTimeout, - = clientFailureCheckPeriod, - = connectionTTL, - = retryInterval, - = retryIntervalMultiplier, - = maxRetryInterval, - = reconnectAttempts, - = threadPool, - = scheduledThreadPool, - = interceptors); - connectors.add(new Connector(initialConnector, factory)); - } - } - - public synchronized void disconnect() - { - if (connectors !=3D null) - { - for (Connector connector : connectors) - { - connector.disconnect(); - } - } - } - - public void finalize() throws Throwable - { - if (!closed && finalizeCheck) - { - log.warn("I'm closing a core ServerLocator you left open. Plea= se make sure you close all ServerLocators explicitly " + "before letting th= em go out of scope! " + - System.identityHashCode(this)); - - log.warn("The ServerLocator you didn't close was created here:= ", e); - = - if (ServerLocatorImpl.finalizeCallback !=3D null) - { - ServerLocatorImpl.finalizeCallback.run(); - } - - close(); - } - - super.finalize(); - } - - class Connector implements Callable - { - private TransportConfiguration initialConnector; - - private volatile ClientSessionFactoryInternal factory; - - private boolean isConnected =3D false; - - private boolean interrupted =3D false; - - private Exception e; - - public Connector(TransportConfiguration initialConnector, ClientS= essionFactoryInternal factory) - { - this.initialConnector =3D initialConnector; - this.factory =3D factory; - } - - public ClientSessionFactory call() throws HornetQException - { - try - { - factory.connect(reconnectAttempts, failoverOnInitialConnect= ion); - } - catch (HornetQException e) - { - if (!interrupted) - { - this.e =3D e; - throw e; - } - /*if(factory !=3D null) - { - factory.close(); - factory =3D null; - }*/ - return null; - } - isConnected =3D true; - for (Connector connector : connectors) - { - if (!connector.isConnected()) - { - connector.disconnect(); - } - } - return factory; - } - - public boolean isConnected() - { - return isConnected; - } - - public void disconnect() - { - interrupted =3D true; - - if (factory !=3D null) - { - factory.causeExit(); - factory.close(); - factory =3D null; - } - } - } - } -} Modified: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/Server= LocatorInternal.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocato= rInternal.java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocato= rInternal.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -57,4 +57,6 @@ void setBackup(boolean backup); = Topology getTopology(); + = + boolean isStaticDirectConnection(TransportConfiguration con); } Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDP= ServerLocatorImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPSer= verLocatorImpl.java (rev 0) +++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPSer= verLocatorImpl.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -0,0 +1,352 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.client.impl; + +import java.lang.reflect.Array; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.*; + +import org.hornetq.api.core.DiscoveryGroupConfiguration; +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.core.cluster.DiscoveryEntry; +import org.hornetq.core.cluster.DiscoveryGroup; +import org.hornetq.core.cluster.DiscoveryListener; +import org.hornetq.core.cluster.impl.DiscoveryGroupImpl; +import org.hornetq.core.config.impl.ConfigurationImpl; +import org.hornetq.core.logging.Logger; +import org.hornetq.utils.ConfigurationHelper; + +/** + * A SimpleUDPServerLocatorImpl + * = + * @author Tim Fox + * @author Tomohisa Igarashi + */ +public class SimpleUDPServerLocatorImpl extends AbstractServerLocator impl= ements DiscoveryListener +{ + private static final long serialVersionUID =3D -1615857864410205260L; + + private static final Logger log =3D Logger.getLogger(SimpleUDPServerLoc= atorImpl.class); + + private String discoveryGroupName; + = + private InetAddress localBindAddress; + = + private InetAddress groupAddress; + = + private int groupPort; + = + private long refreshTimeout; + = + private long initialWaitTimeout; + = + private DiscoveryGroup discoveryGroup; + + private volatile boolean closing; + = + // To be called when there are ServerLocator being finalized. + // To be used on test assertions + public static Runnable finalizeCallback =3D null; + + private synchronized void initialise() throws Exception + { + if (!isReadOnly()) + { + setThreadPools(); + + instantiateLoadBalancingPolicy(); + + this.discoveryGroupName =3D getDiscoveryGroupConfiguration().getN= ame(); + = + Map params =3D getDiscoveryGroupConfiguration().ge= tParams(); + + String lbStr =3D ConfigurationHelper.getStringProperty(DiscoveryG= roupConstants.LOCAL_BIND_ADDRESS_NAME, null, params); + + if (lbStr !=3D null) + { + this.localBindAddress =3D InetAddress.getByName(lbStr); + } + else + { + this.localBindAddress =3D null; + } + + this.groupAddress =3D InetAddress.getByName(ConfigurationHelper.g= etStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params)); + this.groupPort =3D ConfigurationHelper.getIntProperty(DiscoveryGr= oupConstants.GROUP_PORT_NAME, -1, params); + this.refreshTimeout =3D ConfigurationHelper.getLongProperty(Disco= veryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAS= T_REFRESH_TIMEOUT, params); + this.initialWaitTimeout =3D ConfigurationHelper.getLongProperty(D= iscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DIS= COVERY_INITIAL_WAIT_TIMEOUT, params); + = + discoveryGroup =3D new DiscoveryGroupImpl(getNodeID(), + this.discoveryGroupName, + this.localBindAddress, + this.groupAddress, + this.groupPort, + this.refreshTimeout); + + discoveryGroup.registerListener(this); + + discoveryGroup.start(); + + setReadOnly(true); + } + } + + public SimpleUDPServerLocatorImpl(final boolean useHA, + final DiscoveryGroupConfiguration discoveryGr= oupConfiguration) + { + super(useHA, discoveryGroupConfiguration); + } + + public void start(Executor executor) throws Exception + { + initialise(); + + executor.execute(new Runnable() + { + public void run() + { + try + { + connect(); + } + catch (Exception e) + { + if (!closing) + { + log.warn("did not connect the cluster connection to othe= r nodes", e); + } + } + } + }); + } + + public ClientSessionFactory connect() throws Exception + { + ClientSessionFactoryInternal sf; + + // wait for discovery group to get the list of initial connectors + sf =3D (ClientSessionFactoryInternal)createSessionFactory(); + + addFactory(sf); + return sf; + } + + public ClientSessionFactory createSessionFactory(final TransportConfigu= ration transportConfiguration) throws Exception + { + if (isClosed()) + { + throw new IllegalStateException("Cannot create session factory, s= erver locator is closed (maybe it has been garbage collected)"); + } + + try + { + initialise(); + } + catch (Exception e) + { + throw new HornetQException(HornetQException.INTERNAL_ERROR, "Fail= ed to initialise session factory", e); + } + + ClientSessionFactoryInternal factory =3D new ClientSessionFactoryImp= l(this, + = transportConfiguration, + = getCallTimeout(), + = getClientFailureCheckPeriod(), + = getConnectionTTL(), + = getRetryInterval(), + = getRetryIntervalMultiplier(), + = getMaxRetryInterval(), + = getReconnectAttempts(), + = getThreadPool(), + = getScheduledThreadPool(), + = getInterceptors()); + + factory.connect(getReconnectAttempts(), isFailoverOnInitialConnectio= n()); + + addFactory(factory); + + return factory; + } + + public ClientSessionFactory createSessionFactory() throws Exception + { + if (isClosed()) + { + throw new IllegalStateException("Cannot create session factory, s= erver locator is closed (maybe it has been garbage collected)"); + } + + try + { + initialise(); + } + catch (Exception e) + { + throw new HornetQException(HornetQException.INTERNAL_ERROR, "Fail= ed to initialise session factory", e); + } + + if (getInitialConnectors() =3D=3D null) + { + // Wait for an initial broadcast to give us at least one node in = the cluster + long timeout =3D isClusterConnection() ? 0 : this.initialWaitTime= out; + boolean ok =3D discoveryGroup.waitForBroadcast(timeout); + + if (!ok) + { + throw new HornetQException(HornetQException.CONNECTION_TIMEDOU= T, + "Timed out waiting to receive initi= al broadcast from cluster"); + } + } + + ClientSessionFactoryInternal factory =3D null; + + synchronized (this) + { + boolean retry; + int attempts =3D 0; + do + { + retry =3D false; + + TransportConfiguration tc =3D selectConnector(); + + // try each factory in the list until we find one which works + + try + { + factory =3D new ClientSessionFactoryImpl(this, + tc, + getCallTimeout(), + getClientFailureChec= kPeriod(), + getConnectionTTL(), + getRetryInterval(), + getRetryIntervalMult= iplier(), + getMaxRetryInterval(= ), + getReconnectAttempts= (), + getThreadPool(), + getScheduledThreadPo= ol(), + getInterceptors()); + factory.connect(getInitialConnectAttempts(), isFailoverOnIn= itialConnection()); + } + catch (HornetQException e) + { + factory.close(); + factory =3D null; + if (e.getCode() =3D=3D HornetQException.NOT_CONNECTED) + { + attempts++; + + if (attempts =3D=3D getConnectorLength()) + { + throw new HornetQException(HornetQException.NOT_CONNE= CTED, + "Cannot connect to server(= s). Tried with all available servers."); + } + retry =3D true; + } + else + { + throw e; + } + } + } + while (retry); + + if (isHA()) + { + long toWait =3D 30000; + long start =3D System.currentTimeMillis(); + while (!isReceivedTopology() && toWait > 0) + { + // Now wait for the topology + + try + { + wait(toWait); + } + catch (InterruptedException ignore) + { + } + + long now =3D System.currentTimeMillis(); + + toWait -=3D now - start; + + start =3D now; + } + + if (toWait <=3D 0) + { + throw new HornetQException(HornetQException.CONNECTION_TIME= DOUT, + "Timed out waiting to receive cl= uster topology"); + } + } + + addFactory(factory); + + return factory; + } + } + + public void close() + { + if (isClosed()) + { + return; + } + + closing =3D true; + + try + { + discoveryGroup.stop(); + } + catch (Exception e) + { + log.error("Failed to stop discovery group", e); + } + + super.close(); + } + + public synchronized void connectorsChanged() + { + List newConnectors =3D discoveryGroup.getDiscoveryEn= tries(); + + TransportConfiguration[] initialConnectors =3D (TransportConfigurati= on[])Array.newInstance(TransportConfiguration.class, + = newConnectors.size()); + int count =3D 0; + for (DiscoveryEntry entry : newConnectors) + { + initialConnectors[count++] =3D entry.getConnector(); + } + + if (isHA() && isClusterConnection() && !isReceivedTopology() && init= ialConnectors.length > 0) + { + // FIXME the node is alone in the cluster. We create a connection= to the new node + // to trigger the node notification to form the cluster. + try + { + connect(); + } + catch (Exception e) + { + e.printStackTrace(); // To change body of catch statement use = File | Settings | File Templates. + } + } + + setInitialConnectors(initialConnectors); + } +} Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/= impl/SimpleUDPServerLocatorImpl.java ___________________________________________________________________ Name: svn:mime-type + text/plain Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticSer= verLocatorImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServer= LocatorImpl.java (rev 0) +++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServer= LocatorImpl.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -0,0 +1,450 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.client.impl; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.*; + +import org.hornetq.api.core.DiscoveryGroupConfiguration; +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.core.logging.Logger; +import org.hornetq.utils.ConfigurationHelper; + +/** + * A SimpleUDPServerLocatorImpl + * + * @author Tim Fox + * @author Tomohisa Igarashi + */ +public class StaticServerLocatorImpl extends AbstractServerLocator +{ + private static final long serialVersionUID =3D -1615857864410205260L; + + private static final Logger log =3D Logger.getLogger(StaticServerLocato= rImpl.class); + + private StaticConnector staticConnector =3D new StaticConnector(); + + private volatile boolean closing; + = + private final Exception e =3D new Exception(); + = + // To be called when there are ServerLocator being finalized. + // To be used on test assertions + public static Runnable finalizeCallback =3D null; + + private synchronized void initialise() throws Exception + { + if (!isReadOnly()) + { + setThreadPools(); + + instantiateLoadBalancingPolicy(); + + Map params =3D getDiscoveryGroupConfiguration().ge= tParams(); + TransportConfiguration[] initialConnectors =3D (TransportConfigur= ation[])params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME); + setInitialConnectors(initialConnectors); + = + setReadOnly(true); + } + } + + public StaticServerLocatorImpl(final boolean useHA, + final DiscoveryGroupConfiguration discoveryGr= oupConfiguration) + { + super(useHA, discoveryGroupConfiguration); + = + e.fillInStackTrace(); + } + + public void start(Executor executor) throws Exception + { + initialise(); + + executor.execute(new Runnable() + { + public void run() + { + try + { + connect(); + } + catch (Exception e) + { + if (!closing) + { + log.warn("did not connect the cluster connection to othe= r nodes", e); + } + } + } + }); + } + + public ClientSessionFactory connect() throws Exception + { + ClientSessionFactoryInternal sf; + + sf =3D (ClientSessionFactoryInternal)staticConnector.connect(); + + addFactory(sf); + return sf; + } + + public ClientSessionFactory createSessionFactory(final TransportConfigu= ration transportConfiguration) throws Exception + { + if (isClosed()) + { + throw new IllegalStateException("Cannot create session factory, s= erver locator is closed (maybe it has been garbage collected)"); + } + + try + { + initialise(); + } + catch (Exception e) + { + throw new HornetQException(HornetQException.INTERNAL_ERROR, "Fail= ed to initialise session factory", e); + } + + ClientSessionFactoryInternal factory =3D new ClientSessionFactoryImp= l(this, + = transportConfiguration, + = getCallTimeout(), + = getClientFailureCheckPeriod(), + = getConnectionTTL(), + = getRetryInterval(), + = getRetryIntervalMultiplier(), + = getMaxRetryInterval(), + = getReconnectAttempts(), + = getThreadPool(), + = getScheduledThreadPool(), + = getInterceptors()); + + factory.connect(getReconnectAttempts(), isFailoverOnInitialConnectio= n()); + + addFactory(factory); + + return factory; + } + + public ClientSessionFactory createSessionFactory() throws Exception + { + if (isClosed()) + { + throw new IllegalStateException("Cannot create session factory, s= erver locator is closed (maybe it has been garbage collected)"); + } + + try + { + initialise(); + } + catch (Exception e) + { + throw new HornetQException(HornetQException.INTERNAL_ERROR, "Fail= ed to initialise session factory", e); + } + + ClientSessionFactoryInternal factory =3D null; + + synchronized (this) + { + boolean retry; + int attempts =3D 0; + do + { + retry =3D false; + + TransportConfiguration tc =3D selectConnector(); + + // try each factory in the list until we find one which works + + try + { + factory =3D new ClientSessionFactoryImpl(this, + tc, + getCallTimeout(), + getClientFailureChec= kPeriod(), + getConnectionTTL(), + getRetryInterval(), + getRetryIntervalMult= iplier(), + getMaxRetryInterval(= ), + getReconnectAttempts= (), + getThreadPool(), + getScheduledThreadPo= ol(), + getInterceptors()); + factory.connect(getInitialConnectAttempts(), isFailoverOnIn= itialConnection()); + } + catch (HornetQException e) + { + factory.close(); + factory =3D null; + if (e.getCode() =3D=3D HornetQException.NOT_CONNECTED) + { + attempts++; + + if (attempts =3D=3D getConnectorLength()) + { + throw new HornetQException(HornetQException.NOT_CONNE= CTED, + "Cannot connect to server(= s). Tried with all available servers."); + } + retry =3D true; + } + else + { + throw e; + } + } + } + while (retry); + + if (isHA()) + { + long toWait =3D 30000; + long start =3D System.currentTimeMillis(); + while (!isReceivedTopology() && toWait > 0) + { + // Now wait for the topology + + try + { + wait(toWait); + } + catch (InterruptedException ignore) + { + } + + long now =3D System.currentTimeMillis(); + + toWait -=3D now - start; + + start =3D now; + } + + if (toWait <=3D 0) + { + throw new HornetQException(HornetQException.CONNECTION_TIME= DOUT, + "Timed out waiting to receive cl= uster topology"); + } + } + + addFactory(factory); + + return factory; + } + } + + public void close() + { + if (isClosed()) + { + return; + } + + closing =3D true; + + staticConnector.disconnect(); + + super.close(); + } + + public boolean isStaticDirectConnection(TransportConfiguration con) + { + for(TransportConfiguration connector : getInitialConnectors()) + { + if(connector.equals(con)) + { + return true; + } + } + return false; + } + = + class StaticConnector implements Serializable + { + private List connectors; + + public ClientSessionFactory connect() throws HornetQException + { + if (isClosed()) + { + throw new IllegalStateException("Cannot create session factory= , server locator is closed (maybe it has been garbage collected)"); + } + + try + { + initialise(); + } + catch (Exception e) + { + throw new HornetQException(HornetQException.INTERNAL_ERROR, "F= ailed to initialise session factory", e); + } + + ClientSessionFactory csf =3D null; + + createConnectors(); + + try + { + List> futures =3D getThreadPool()= .invokeAll(connectors); + for (int i =3D 0, futuresSize =3D futures.size(); i < futuresS= ize; i++) + { + Future future =3D futures.get(i); + try + { + csf =3D future.get(); + if (csf !=3D null) + break; + } + catch (Exception e) + { + log.debug("unable to connect with static connector " + c= onnectors.get(i).initialConnector); + } + } + if (csf =3D=3D null && !isClosed()) + { + throw new HornetQException(HornetQException.NOT_CONNECTED, = "Failed to connect to any static connectors"); + } + } + catch (InterruptedException e) + { + throw new HornetQException(HornetQException.NOT_CONNECTED, "Fa= iled to connect to any static connectors", e); + } + + if (csf =3D=3D null && !isClosed()) + { + throw new HornetQException(HornetQException.NOT_CONNECTED, "Fa= iled to connect to any static connectors"); + } + return csf; + } + + private synchronized void createConnectors() + { + connectors =3D new ArrayList(); + for (TransportConfiguration initialConnector : getInitialConnecto= rs()) + { + ClientSessionFactoryInternal factory =3D new ClientSessionFact= oryImpl(StaticServerLocatorImpl.this, + = initialConnector, + = getCallTimeout(), + = getClientFailureCheckPeriod(), + = getConnectionTTL(), + = getRetryInterval(), + = getRetryIntervalMultiplier(), + = getMaxRetryInterval(), + = getReconnectAttempts(), + = getThreadPool(), + = getScheduledThreadPool(), + = getInterceptors()); + connectors.add(new Connector(initialConnector, factory)); + } + } + + public synchronized void disconnect() + { + if (connectors !=3D null) + { + for (Connector connector : connectors) + { + connector.disconnect(); + } + } + } + + public void finalize() throws Throwable + { + if (!isClosed() && doFinalizeCheck()) + { + log.warn("I'm closing a core ServerLocator you left open. Plea= se make sure you close all ServerLocators explicitly " + "before letting th= em go out of scope! " + + System.identityHashCode(this)); + + log.warn("The ServerLocator you didn't close was created here:= ", e); + = + if (StaticServerLocatorImpl.finalizeCallback !=3D null) + { + StaticServerLocatorImpl.finalizeCallback.run(); + } + + close(); + } + + super.finalize(); + } + + class Connector implements Callable + { + private TransportConfiguration initialConnector; + + private volatile ClientSessionFactoryInternal factory; + + private boolean isConnected =3D false; + + private boolean interrupted =3D false; + + private Exception e; + + public Connector(TransportConfiguration initialConnector, ClientS= essionFactoryInternal factory) + { + this.initialConnector =3D initialConnector; + this.factory =3D factory; + } + + public ClientSessionFactory call() throws HornetQException + { + try + { + factory.connect(getReconnectAttempts(), isFailoverOnInitial= Connection()); + } + catch (HornetQException e) + { + if (!interrupted) + { + this.e =3D e; + throw e; + } + /*if(factory !=3D null) + { + factory.close(); + factory =3D null; + }*/ + return null; + } + isConnected =3D true; + for (Connector connector : connectors) + { + if (!connector.isConnected()) + { + connector.disconnect(); + } + } + return factory; + } + + public boolean isConnected() + { + return isConnected; + } + + public void disconnect() + { + interrupted =3D true; + + if (factory !=3D null) + { + factory.causeExit(); + factory.close(); + factory =3D null; + } + } + } + } +} Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/= impl/StaticServerLocatorImpl.java ___________________________________________________________________ Name: svn:mime-type + text/plain Modified: branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConn= ectionConfiguration.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnection= Configuration.java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnection= Configuration.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -42,6 +42,8 @@ private final boolean forwardWhenNoConsumers; = private final List staticConnectors; + = + private final List allowableConnectors; = private final String discoveryGroupName; = @@ -49,7 +51,7 @@ = private final int confirmationWindowSize; = - private final boolean allowDirectConnectionsOnly; + private final boolean allowableConnectionsOnly; = public ClusterConnectionConfiguration(final String name, final String address, @@ -60,42 +62,22 @@ final int maxHops, final int confirmationWindowSize, final List staticConnecto= rs, - final boolean allowDirectConnecti= onsOnly) + final String discoveryGroupName, + final boolean allowableConnection= sOnly, + final List allowableConne= ctorNames) { this.name =3D name; this.address =3D address; this.connectorName =3D connectorName; this.retryInterval =3D retryInterval; - this.staticConnectors =3D staticConnectors; this.duplicateDetection =3D duplicateDetection; this.forwardWhenNoConsumers =3D forwardWhenNoConsumers; - discoveryGroupName =3D null; - this.maxHops =3D maxHops; - this.confirmationWindowSize =3D confirmationWindowSize; - this.allowDirectConnectionsOnly =3D allowDirectConnectionsOnly; - } - - public ClusterConnectionConfiguration(final String name, - final String address, - final String connectorName, - final long retryInterval, - final boolean duplicateDetection, - final boolean forwardWhenNoConsum= ers, - final int maxHops, - final int confirmationWindowSize, - final String discoveryGroupName) - { - this.name =3D name; - this.address =3D address; - this.connectorName =3D connectorName; - this.retryInterval =3D retryInterval; - this.duplicateDetection =3D duplicateDetection; - this.forwardWhenNoConsumers =3D forwardWhenNoConsumers; this.discoveryGroupName =3D discoveryGroupName; - this.staticConnectors =3D null; this.maxHops =3D maxHops; this.confirmationWindowSize =3D confirmationWindowSize; - allowDirectConnectionsOnly =3D false; + this.staticConnectors =3D staticConnectors; + this.allowableConnectors =3D allowableConnectorNames; + this.allowableConnectionsOnly =3D allowableConnectionsOnly; } = public String getName() @@ -137,6 +119,11 @@ { return staticConnectors; } + = + public List getAllowableConnectors() + { + return allowableConnectors; + } = public String getDiscoveryGroupName() { @@ -148,8 +135,8 @@ return retryInterval; } = - public boolean isAllowDirectConnectionsOnly() + public boolean isAllowableConnectionsOnly() { - return allowDirectConnectionsOnly; + return allowableConnectionsOnly; } } Modified: branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/Fil= eConfigurationParser.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfi= gurationParser.java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfi= gurationParser.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -22,12 +22,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.StringTokenizer; = import org.hornetq.api.core.DiscoveryGroupConfiguration; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.core.client.impl.DiscoveryGroupConstants; import org.hornetq.core.config.*; import org.hornetq.core.config.impl.ConfigurationImpl; import org.hornetq.core.config.impl.FileConfiguration; @@ -939,29 +941,42 @@ { String name =3D e.getAttribute("name"); = - String localBindAddress =3D XMLConfigurationUtil.getString(e, "local= -bind-address", null, Validators.NO_CHECK); + String clazz =3D XMLConfigurationUtil.getString(e, "server-locator-c= lass", null, Validators.NOT_NULL_OR_EMPTY); = - String groupAddress =3D XMLConfigurationUtil.getString(e, "group-add= ress", null, Validators.NOT_NULL_OR_EMPTY); + Map params =3D new HashMap(); = - int groupPort =3D XMLConfigurationUtil.getInteger(e, "group-port", -= 1, Validators.MINUS_ONE_OR_GT_ZERO); + NodeList paramsNodes =3D e.getElementsByTagName("param"); = - long discoveryInitialWaitTimeout =3D XMLConfigurationUtil.getLong(e, - "ini= tial-wait-timeout", - Horn= etQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, - Vali= dators.GT_ZERO); + for (int i =3D 0; i < paramsNodes.getLength(); i++) + { + Node paramNode =3D paramsNodes.item(i); = - long refreshTimeout =3D XMLConfigurationUtil.getLong(e, - "refresh-timeout", - ConfigurationImpl= .DEFAULT_BROADCAST_REFRESH_TIMEOUT, - Validators.GT_ZER= O); + NamedNodeMap attributes =3D paramNode.getAttributes(); = - DiscoveryGroupConfiguration config =3D new DiscoveryGroupConfigurati= on(name, - = localBindAddress, - = groupAddress, - = groupPort, - = refreshTimeout, - = discoveryInitialWaitTimeout); + Node nkey =3D attributes.getNamedItem("key"); = + String key =3D nkey.getTextContent(); + + Node nValue =3D attributes.getNamedItem("value"); + + params.put(key, nValue.getTextContent()); + } + = + // discovery-group configuration contains static connector list + String connectorList =3D (String)params.get(DiscoveryGroupConstants.= STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME); + if(connectorList !=3D null) + { + List connectors =3D new ArrayList(); + StringTokenizer token =3D new StringTokenizer(connectorList, ",",= false); + while(token.hasMoreElements()) + { + connectors.add(mainConfig.getConnectorConfigurations().get(tok= en.nextElement())); + } + params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, c= onnectors.toArray(new TransportConfiguration[0])); + } + + DiscoveryGroupConfiguration config =3D new DiscoveryGroupConfigurati= on(clazz, params, name); + if (mainConfig.getDiscoveryGroupConfigurations().containsKey(name)) { FileConfigurationParser.log.warn("There is already a discovery gr= oup with name " + name + @@ -1008,8 +1023,6 @@ = String discoveryGroupName =3D null; = - List staticConnectorNames =3D new ArrayList(); - boolean allowDirectConnectionsOnly =3D false; = NodeList children =3D e.getChildNodes(); @@ -1021,46 +1034,58 @@ if (child.getNodeName().equals("discovery-group-ref")) { discoveryGroupName =3D child.getAttributes().getNamedItem("dis= covery-group-name").getNodeValue(); - } - else if (child.getNodeName().equals("static-connectors")) - { + Node attr =3D child.getAttributes().getNamedItem("allow-direct= -connections-only"); if (attr !=3D null) { allowDirectConnectionsOnly =3D "true".equalsIgnoreCase(attr= .getNodeValue()) || allowDirectConnectionsOnly; } - getStaticConnectors(staticConnectorNames, child); } } = - ClusterConnectionConfiguration config; - - if (discoveryGroupName =3D=3D null) + List staticConnectors =3D new ArrayList(); + DiscoveryGroupConfiguration discovery =3D mainConfig.getDiscoveryGro= upConfigurations().get(discoveryGroupName); + Map params =3D discovery.getParams(); + String connectorList =3D (String)params.get(DiscoveryGroupConstants.= STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME); + if(connectorList !=3D null) { - config =3D new ClusterConnectionConfiguration(name, - address, - connectorName, - retryInterval, - duplicateDetection, - forwardWhenNoConsumer= s, - maxHops, - confirmationWindowSiz= e, - staticConnectorNames, - allowDirectConnection= sOnly); + StringTokenizer token =3D new StringTokenizer(connectorList, ",",= false); + while(token.hasMoreElements()) + { + staticConnectors.add(token.nextToken()); + } } - else + = + List allowableConnectionNames =3D null; + if(allowDirectConnectionsOnly) { - config =3D new ClusterConnectionConfiguration(name, - address, - connectorName, - retryInterval, - duplicateDetection, - forwardWhenNoConsumer= s, - maxHops, - confirmationWindowSiz= e, - discoveryGroupName); + if(connectorList =3D=3D null) + { + log.warn("allow-direct-connections-only was found, but " + + DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR= _REF_LIST_NAME + + " was not found in discovery-group. ignore."); + } + else + { + allowableConnectionNames =3D staticConnectors; + } } + = + ClusterConnectionConfiguration config; = + config =3D new ClusterConnectionConfiguration(name, + address, + connectorName, + retryInterval, + duplicateDetection, + forwardWhenNoConsumers, + maxHops, + confirmationWindowSize, + staticConnectors, + discoveryGroupName, + allowDirectConnectionsOn= ly, + allowableConnectionNames= ); + mainConfig.getClusterConfigurations().add(config); } = Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/imp= l/ClusterConnectionImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/Clus= terConnectionImpl.java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/Clus= terConnectionImpl.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -100,90 +100,16 @@ = private final String clusterPassword; = - private final ClusterConnector clusterConnector; + private final DiscoveryGroupConfiguration discoveryGroupConfiguration; = private ServerLocatorInternal serverLocator; = private final TransportConfiguration connector; = - private final boolean allowDirectConnectionsOnly; + private final boolean allowableConnectionsOnly; = private final Set allowableConnections =3D new = HashSet(); = - public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs, - final TransportConfiguration connector, - final SimpleString name, - final SimpleString address, - final long retryInterval, - final boolean useDuplicateDetection, - final boolean routeWhenNoConsumers, - final int confirmationWindowSize, - final ExecutorFactory executorFactory, - final HornetQServer server, - final PostOffice postOffice, - final ManagementService managementService, - final ScheduledExecutorService scheduledEx= ecutor, - final int maxHops, - final UUID nodeUUID, - final boolean backup, - final String clusterUser, - final String clusterPassword, - final boolean allowDirectConnectionsOnly) = throws Exception - { - - if (nodeUUID =3D=3D null) - { - throw new IllegalArgumentException("node id is null"); - } - - this.nodeUUID =3D nodeUUID; - - this.connector =3D connector; - - this.name =3D name; - - this.address =3D address; - - this.retryInterval =3D retryInterval; - - this.useDuplicateDetection =3D useDuplicateDetection; - - this.routeWhenNoConsumers =3D routeWhenNoConsumers; - - this.executorFactory =3D executorFactory; - - this.server =3D server; - - this.postOffice =3D postOffice; - - this.managementService =3D managementService; - - this.scheduledExecutor =3D scheduledExecutor; - - this.maxHops =3D maxHops; - - this.backup =3D backup; - - this.clusterUser =3D clusterUser; - - this.clusterPassword =3D clusterPassword; - - this.allowDirectConnectionsOnly =3D allowDirectConnectionsOnly; - - clusterConnector =3D new StaticClusterConnector(tcConfigs); - - if (tcConfigs !=3D null && tcConfigs.length > 0) - { - // a cluster connection will connect to other nodes only if they = are directly connected - // through a static list of connectors or broadcasting using UDP. - if(allowDirectConnectionsOnly) - { - allowableConnections.addAll(Arrays.asList(tcConfigs)); - } - } - - } - public ClusterConnectionImpl(DiscoveryGroupConfiguration dg, final TransportConfiguration connector, final SimpleString name, @@ -202,7 +128,8 @@ final boolean backup, final String clusterUser, final String clusterPassword, - final boolean allowDirectConnectionsOnly) = throws Exception + final boolean allowableConnectionsOnly, + final TransportConfiguration[] allowableCo= nnections) throws Exception { = if (nodeUUID =3D=3D null) @@ -242,9 +169,9 @@ = this.clusterPassword =3D clusterPassword; = - this.allowDirectConnectionsOnly =3D allowDirectConnectionsOnly; + this.allowableConnectionsOnly =3D allowableConnectionsOnly; = - clusterConnector =3D new DiscoveryClusterConnector(dg); + this.discoveryGroupConfiguration =3D dg; } = public synchronized void start() throws Exception @@ -346,7 +273,7 @@ = backup =3D false; = - serverLocator =3D clusterConnector.createServerLocator(); + serverLocator =3D (ServerLocatorInternal)HornetQClient.createServerL= ocatorWithHA(this.discoveryGroupConfiguration); = = if (serverLocator !=3D null) @@ -433,7 +360,7 @@ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last); = // if the node is more than 1 hop away, we do not create a bridge fo= r direct cluster connection - if (allowDirectConnectionsOnly && !allowableConnections.contains(con= nectorPair.a)) + if (allowableConnectionsOnly && !allowableConnections.contains(conne= ctorPair.a)) { return; } @@ -992,46 +919,4 @@ = return out; } - - interface ClusterConnector - { - ServerLocatorInternal createServerLocator(); - } - - private class StaticClusterConnector implements ClusterConnector - { - private final TransportConfiguration[] tcConfigs; - - public StaticClusterConnector(TransportConfiguration[] tcConfigs) - { - this.tcConfigs =3D tcConfigs; - } - - public ServerLocatorInternal createServerLocator() - { - if(tcConfigs !=3D null && tcConfigs.length > 0) - { - return (ServerLocatorInternal) HornetQClient.createServerLocat= orWithHA(tcConfigs); - } - else - { - return null; - } - } - } - - private class DiscoveryClusterConnector implements ClusterConnector - { - private final DiscoveryGroupConfiguration dg; - - public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg) - { - this.dg =3D dg; - } - - public ServerLocatorInternal createServerLocator() - { - return (ServerLocatorInternal) HornetQClient.createServerLocatorW= ithHA(dg); - } - } } Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/imp= l/ClusterManagerImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/Clus= terManagerImpl.java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/Clus= terManagerImpl.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -30,7 +30,7 @@ import org.hornetq.api.core.client.ClusterTopologyListener; import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.ServerLocator; -import org.hornetq.core.client.impl.ServerLocatorImpl; +import org.hornetq.core.client.impl.AbstractServerLocator; import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.core.client.impl.Topology; import org.hornetq.core.client.impl.TopologyMember; @@ -621,46 +621,23 @@ = ServerLocatorInternal serverLocator; = - if (config.getDiscoveryGroupName() !=3D null) - { - DiscoveryGroupConfiguration discoveryGroupConfiguration =3D confi= guration.getDiscoveryGroupConfigurations() + DiscoveryGroupConfiguration discoveryGroupConfiguration =3D configur= ation.getDiscoveryGroupConfigurations() = .get(config.getDiscoveryGroupName()); - if (discoveryGroupConfiguration =3D=3D null) - { - ClusterManagerImpl.log.warn("No discovery group configured wit= h name '" + config.getDiscoveryGroupName() + - "'. The bridge will not be deploye= d."); + if (discoveryGroupConfiguration =3D=3D null) + { + ClusterManagerImpl.log.warn("No discovery group configured with n= ame '" + config.getDiscoveryGroupName() + + "'. The bridge will not be deployed= ."); = - return; - } + return; + } = - if (config.isHA()) - { - serverLocator =3D (ServerLocatorInternal)HornetQClient.createS= erverLocatorWithHA(discoveryGroupConfiguration); - } - else - { - serverLocator =3D (ServerLocatorInternal)HornetQClient.createS= erverLocatorWithoutHA(discoveryGroupConfiguration); - } - + if (config.isHA()) + { + serverLocator =3D (ServerLocatorInternal)HornetQClient.createServ= erLocatorWithHA(discoveryGroupConfiguration); } else { - TransportConfiguration[] tcConfigs =3D connectorNameListToArray(c= onfig.getStaticConnectors()); - - if (tcConfigs =3D=3D null) - { - return; - } - - if (config.isHA()) - { - serverLocator =3D (ServerLocatorInternal)HornetQClient.createS= erverLocatorWithHA(tcConfigs); - } - else - { - serverLocator =3D (ServerLocatorInternal)HornetQClient.createS= erverLocatorWithoutHA(tcConfigs); - } - + serverLocator =3D (ServerLocatorInternal)HornetQClient.createServ= erLocatorWithoutHA(discoveryGroupConfiguration); } = serverLocator.setConfirmationWindowSize(config.getConfirmationWindow= Size()); @@ -739,61 +716,41 @@ = ClusterConnectionImpl clusterConnection; = - if (config.getDiscoveryGroupName() !=3D null) - { - DiscoveryGroupConfiguration dg =3D configuration.getDiscoveryGrou= pConfigurations() - .get(config.getDisc= overyGroupName()); + DiscoveryGroupConfiguration dg =3D configuration.getDiscoveryGroupCo= nfigurations() + .get(config.getDisco= veryGroupName()); = - if (dg =3D=3D null) - { - ClusterManagerImpl.log.warn("No discovery group with name '" += config.getDiscoveryGroupName() + + if (dg =3D=3D null) + { + ClusterManagerImpl.log.warn("No discovery group with name '" + co= nfig.getDiscoveryGroupName() + "'. The cluster connection will no= t be deployed."); - } - - clusterConnection =3D new ClusterConnectionImpl(dg, - connector, - new SimpleString(co= nfig.getName()), - new SimpleString(co= nfig.getAddress()), - config.getRetryInte= rval(), - config.isDuplicateD= etection(), - config.isForwardWhe= nNoConsumers(), - config.getConfirmat= ionWindowSize(), - executorFactory, - server, - postOffice, - managementService, - scheduledExecutor, - config.getMaxHops(), - nodeUUID, - backup, - server.getConfigura= tion().getClusterUser(), - server.getConfigura= tion().getClusterPassword(), - config.isAllowDirec= tConnectionsOnly()); } - else - { - TransportConfiguration[] tcConfigs =3D config.getStaticConnectors= () !=3D null? connectorNameListToArray(config.getStaticConnectors()):null; = - clusterConnection =3D new ClusterConnectionImpl(tcConfigs, - connector, - new SimpleString(co= nfig.getName()), - new SimpleString(co= nfig.getAddress()), - config.getRetryInte= rval(), - config.isDuplicateD= etection(), - config.isForwardWhe= nNoConsumers(), - config.getConfirmat= ionWindowSize(), - executorFactory, - server, - postOffice, - managementService, - scheduledExecutor, - config.getMaxHops(), - nodeUUID, - backup, - server.getConfigura= tion().getClusterUser(), - server.getConfigura= tion().getClusterPassword(), - config.isAllowDirec= tConnectionsOnly()); + List connectorNames =3D config.getAllowableConnectors(); + TransportConfiguration[] allowableConnections =3D null; + if(connectorNames !=3D null) + { + allowableConnections =3D connectorNameListToArray(connectorNames); } + clusterConnection =3D new ClusterConnectionImpl(dg, + connector, + new SimpleString(confi= g.getName()), + new SimpleString(confi= g.getAddress()), + config.getRetryInterva= l(), + config.isDuplicateDete= ction(), + config.isForwardWhenNo= Consumers(), + config.getConfirmation= WindowSize(), + executorFactory, + server, + postOffice, + managementService, + scheduledExecutor, + config.getMaxHops(), + nodeUUID, + backup, + server.getConfiguratio= n().getClusterUser(), + server.getConfiguratio= n().getClusterPassword(), + config.isAllowableConn= ectionsOnly(), + allowableConnections); = managementService.registerCluster(clusterConnection, config); = @@ -809,31 +766,18 @@ = private void announceBackup(final ClusterConnectionConfiguration config= , final TransportConfiguration connector) throws Exception { - if (config.getStaticConnectors() !=3D null) - { - TransportConfiguration[] tcConfigs =3D connectorNameListToArray(c= onfig.getStaticConnectors()); - - backupServerLocator =3D (ServerLocatorInternal)HornetQClient.crea= teServerLocatorWithoutHA(tcConfigs); - backupServerLocator.setReconnectAttempts(-1); - } - else if (config.getDiscoveryGroupName() !=3D null) - { - DiscoveryGroupConfiguration dg =3D configuration.getDiscoveryGrou= pConfigurations() + DiscoveryGroupConfiguration dg =3D configuration.getDiscoveryGroupCo= nfigurations() .get(config.getDisc= overyGroupName()); = - if (dg =3D=3D null) - { - ClusterManagerImpl.log.warn("No discovery group with name '" += config.getDiscoveryGroupName() + - "'. The cluster connection will no= t be deployed."); - } - - backupServerLocator =3D (ServerLocatorInternal)HornetQClient.crea= teServerLocatorWithoutHA(dg); - backupServerLocator.setReconnectAttempts(-1); - } - else + if (dg =3D=3D null) { - return; + ClusterManagerImpl.log.warn("No discovery group with name '" + co= nfig.getDiscoveryGroupName() + + "'. The cluster connection will not= be deployed."); } + + backupServerLocator =3D (ServerLocatorInternal)HornetQClient.createS= erverLocatorWithoutHA(dg); + backupServerLocator.setReconnectAttempts(-1); + log.info("announcing backup"); this.executorFactory.getExecutor().execute(new Runnable() { Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapt= er.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.jav= a 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.jav= a 2011-01-27 14:06:20 UTC (rev 10150) @@ -37,6 +37,8 @@ import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.jms.HornetQJMSClient; import org.hornetq.api.jms.JMSFactoryType; +import org.hornetq.core.client.impl.DiscoveryGroupConstants; +import org.hornetq.core.client.impl.SimpleUDPServerLocatorImpl; import org.hornetq.core.logging.Logger; import org.hornetq.jms.client.HornetQConnectionFactory; import org.hornetq.ra.inflow.HornetQActivation; @@ -1403,20 +1405,24 @@ } else if (discoveryAddress !=3D null) { + // FIXME make discovery stategy pluggable with configuration + Map params =3D new HashMap(); + = Integer discoveryPort =3D overrideProperties.getDiscoveryPort() != =3D null ? overrideProperties.getDiscoveryPort() = : getDiscoveryPort(); = - DiscoveryGroupConfiguration groupConfiguration =3D new DiscoveryG= roupConfiguration(discoveryAddress, discoveryPort); - long refreshTimeout =3D overrideProperties.getDiscoveryRefreshTim= eout() !=3D null ? overrideProperties.getDiscoveryRefreshTimeout() : raPr= operties.getDiscoveryRefreshTimeout(); = long initialTimeout =3D overrideProperties.getDiscoveryInitialWai= tTimeout() !=3D null ? overrideProperties.getDiscoveryInitialWaitTimeout() : = raProperties.getDiscoveryInitialWaitTimeout(); = - groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout); + params.put(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, discoveryA= ddress); + params.put(DiscoveryGroupConstants.GROUP_PORT_NAME, discoveryPort= ); + params.put(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, initialT= imeout); + params.put(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, ini= tialTimeout); = - groupConfiguration.setRefreshTimeout(refreshTimeout); + DiscoveryGroupConfiguration groupConfiguration =3D new DiscoveryG= roupConfiguration(SimpleUDPServerLocatorImpl.class.getName(), params, null); = if (ha) { Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/clus= ter/failover/MultipleBackupsFailoverTestBase.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/fa= ilover/MultipleBackupsFailoverTestBase.java 2011-01-26 15:01:24 UTC (rev 10= 149) +++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/fa= ilover/MultipleBackupsFailoverTestBase.java 2011-01-27 14:06:20 UTC (rev 10= 150) @@ -32,7 +32,7 @@ import org.hornetq.api.core.client.ClusterTopologyListener; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; -import org.hornetq.core.client.impl.ServerLocatorImpl; +import org.hornetq.core.client.impl.AbstractServerLocator; import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.jms.client.HornetQTextMessage; import org.hornetq.tests.integration.cluster.util.TestableServer; @@ -189,7 +189,7 @@ { configs[i] =3D createTransportConfiguration(isNetty(), false, gen= erateParams(nodes[i], isNetty())); } - return new ServerLocatorImpl(true, configs); + return new AbstractServerLocator(true, configs); } = // Private ------------------------------------------------------- Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/= connection/CloseConnectionFactoryOnGCest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connec= tion/CloseConnectionFactoryOnGCest.java 2011-01-26 15:01:24 UTC (rev 10149) +++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connec= tion/CloseConnectionFactoryOnGCest.java 2011-01-27 14:06:20 UTC (rev 10150) @@ -27,7 +27,7 @@ import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.jms.HornetQJMSClient; import org.hornetq.api.jms.JMSFactoryType; -import org.hornetq.core.client.impl.ServerLocatorImpl; +import org.hornetq.core.client.impl.AbstractServerLocator; import org.hornetq.core.logging.Logger; import org.hornetq.jms.client.HornetQConnectionFactory; import org.hornetq.tests.util.JMSTestBase; @@ -61,7 +61,7 @@ = final AtomicInteger valueGC =3D new AtomicInteger(0); = - ServerLocatorImpl.finalizeCallback =3D new Runnable() + AbstractServerLocator.finalizeCallback =3D new Runnable() { public void run() { @@ -85,7 +85,7 @@ } finally { - ServerLocatorImpl.finalizeCallback =3D null; + AbstractServerLocator.finalizeCallback =3D null; } = assertEquals("The code is throwing exceptions", 0, valueGC.get()); --===============6609616134239106515==--