From do-not-reply at jboss.org Thu Jan 27 09:06:22 2011
Content-Type: multipart/mixed; boundary="===============6609616134239106515=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r10150 - in
branches/HORNETQ-316: src/main/org/hornetq/api/core/client and 7 other
directories.
Date: Thu, 27 Jan 2011 09:06:21 -0500
Message-ID: <201101271406.p0RE6LfR005012@svn01.web.mwc.hst.phx2.redhat.com>
--===============6609616134239106515==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: igarashitm
Date: 2011-01-27 09:06:20 -0500 (Thu, 27 Jan 2011)
New Revision: 10150
Added:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServe=
rLocator.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGrou=
pConstants.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServ=
erLocatorImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerL=
ocatorImpl.java
Removed:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocator=
Impl.java
Modified:
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfigu=
ration.java
branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.=
java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocator=
Internal.java
branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionC=
onfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfig=
urationParser.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/Clust=
erConnectionImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/Clust=
erManagerImpl.java
branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/fai=
lover/MultipleBackupsFailoverTestBase.java
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connect=
ion/CloseConnectionFactoryOnGCest.java
Log:
https://issues.jboss.org/browse/HORNETQ-316
first commit. just implemented pluggable discovery strategy and removed com=
pile errors. not yet debugged.
Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroup=
Configuration.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfig=
uration.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfig=
uration.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -1,5 +1,5 @@
/*
- * Copyright 2009 Red Hat, Inc.
+ * Copyright 2010 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -14,169 +14,45 @@
package org.hornetq.api.core;
=
import java.io.Serializable;
-
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.UUIDGenerator;
-
+import java.util.Map;
+import java.util.Properties;
/**
* A DiscoveryGroupConfiguration
*
- * @author Tim Fox
- * =
- * Created 18 Nov 2008 08:47:30
- *
- *
+ * @author Tim Fox
*/
public class DiscoveryGroupConfiguration implements Serializable
{
- private static final long serialVersionUID =3D 8657206421727863400L;
- =
- private static final Logger log =3D Logger.getLogger(DiscoveryGroupConf=
iguration.class);
+ private static final long serialVersionUID =3D 2877108926493109407L;
=
-
- private String name;
+ private final String serverLocatorClassName;
=
- private String localBindAddress;
-
- private String groupAddress;
-
- private int groupPort;
-
- private long refreshTimeout;
+ private final String name; =
=
- private long discoveryInitialWaitTimeout;
-
- public DiscoveryGroupConfiguration(final String name,
- final String localBindAddress,
- final String groupAddress,
- final int groupPort,
- final long refreshTimeout,
- final long discoveryInitialWaitTimeo=
ut)
- {
- this.name =3D name;
- this.groupAddress =3D groupAddress;
- this.localBindAddress =3D localBindAddress;
- this.groupPort =3D groupPort;
- this.refreshTimeout =3D refreshTimeout;
- this.discoveryInitialWaitTimeout =3D discoveryInitialWaitTimeout;
- }
-
- public DiscoveryGroupConfiguration(final String groupAddress,
- final int groupPort)
- {
- this(UUIDGenerator.getInstance().generateStringUUID(), null, groupAd=
dress, groupPort, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, Hor=
netQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
- }
-
- public String getName()
- {
- return name;
- }
+ private final Map params;
=
- public String getLocalBindAddress()
+ public DiscoveryGroupConfiguration(final String clazz, final Map params, final String name) =
{
- return localBindAddress;
- }
-
- public String getGroupAddress()
- {
- return groupAddress;
- }
-
- public int getGroupPort()
- {
- return groupPort;
- }
-
- public long getRefreshTimeout()
- {
- return refreshTimeout;
- }
-
- /**
- * @param name the name to set
- */
- public void setName(final String name)
- {
+ this.serverLocatorClassName =3D clazz;
+ =
+ this.params =3D params;
+ =
this.name =3D name;
}
=
- /**
- * @param localBindAddress the localBindAddress to set
- */
- public void setLocalBindAdress(final String localBindAddress)
+ public String getServerLocatorClassName()
{
- this.localBindAddress =3D localBindAddress;
+ return this.serverLocatorClassName;
}
=
- /**
- * @param groupAddress the groupAddress to set
- */
- public void setGroupAddress(final String groupAddress)
+ public Map getParams()
{
- this.groupAddress =3D groupAddress;
+ return this.params;
}
=
- /**
- * @param groupPort the groupPort to set
- */
- public void setGroupPort(final int groupPort)
+ public String getName()
{
- this.groupPort =3D groupPort;
+ return this.name;
}
-
- /**
- * @param refreshTimeout the refreshTimeout to set
- */
- public void setRefreshTimeout(final long refreshTimeout)
- {
- this.refreshTimeout =3D refreshTimeout;
- }
-
- /**
- * @return the discoveryInitialWaitTimeout
- */
- public long getDiscoveryInitialWaitTimeout()
- {
- return discoveryInitialWaitTimeout;
- }
-
- /**
- * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout t=
o set
- */
- public void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTim=
eout)
- {
- this.discoveryInitialWaitTimeout =3D discoveryInitialWaitTimeout;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this =3D=3D o) return true;
- if (o =3D=3D null || getClass() !=3D o.getClass()) return false;
-
- DiscoveryGroupConfiguration that =3D (DiscoveryGroupConfiguration) o;
-
- if (discoveryInitialWaitTimeout !=3D that.discoveryInitialWaitTimeou=
t) return false;
- if (groupPort !=3D that.groupPort) return false;
- if (refreshTimeout !=3D that.refreshTimeout) return false;
- if (groupAddress !=3D null ? !groupAddress.equals(that.groupAddress)=
: that.groupAddress !=3D null) return false;
- if (localBindAddress !=3D null ? !localBindAddress.equals(that.local=
BindAddress) : that.localBindAddress !=3D null)
- return false;
- if (name !=3D null ? !name.equals(that.name) : that.name !=3D null) =
return false;
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result =3D name !=3D null ? name.hashCode() : 0;
- result =3D 31 * result + (localBindAddress !=3D null ? localBindAddr=
ess.hashCode() : 0);
- result =3D 31 * result + (groupAddress !=3D null ? groupAddress.hash=
Code() : 0);
- result =3D 31 * result + groupPort;
- result =3D 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>>=
32));
- result =3D 31 * result + (int) (discoveryInitialWaitTimeout ^ (disco=
veryInitialWaitTimeout >>> 32));
- return result;
- }
+ =
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQ=
Client.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient=
.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient=
.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -13,13 +13,15 @@
package org.hornetq.api.core.client;
=
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBal=
ancingPolicy;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
=
-import java.util.List;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
=
/**
* Utility class for creating HornetQ {@link ClientSessionFactory} objects.
@@ -107,21 +109,34 @@
*/
public static ServerLocator createServerLocatorWithoutHA(TransportConfi=
guration... transportConfigurations)
{
- return new ServerLocatorImpl(false, transportConfigurations);
+ Map params =3D new HashMap();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, tran=
sportConfigurations);
+ DiscoveryGroupConfiguration config =3D new DiscoveryGroupConfigurati=
on(StaticServerLocatorImpl.class.getName(), params, null);
+ return createServerLocatorWithoutHA(config);
}
=
/**
* Create a ServerLocator which creates session factories from a set of=
live servers, no HA backup information is propagated to the client
* =
- * The UDP address and port are used to listen for live servers in the =
cluster
- * =
- * @param discoveryAddress The UDP group address to listen for updates
- * @param discoveryPort the UDP port to listen for updates
+ * @param groupConfiguration The configuration for server discovery
* @return the ServerLocator
*/
public static ServerLocator createServerLocatorWithoutHA(final Discover=
yGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(false, groupConfiguration);
+ ServerLocator serverLocator =3D null;
+ String className =3D groupConfiguration.getServerLocatorClassName();
+ try
+ {
+ ClassLoader loader =3D Thread.currentThread().getContextClassLoad=
er();
+ Class> clazz =3D loader.loadClass(className);
+ Constructor> constructor =3D clazz.getConstructor(Boolean.class=
, DiscoveryGroupConfiguration.class);
+ serverLocator =3D (ServerLocator)constructor.newInstance(Boolean.=
FALSE, groupConfiguration);
+ }
+ catch(Exception e)
+ {
+ new HornetQException(HornetQException.INTERNAL_ERROR, "Could not=
instantiate ServerLocator implementation class: " + className, e);
+ }
+ return serverLocator;
}
=
/**
@@ -135,9 +150,12 @@
*/
public static ServerLocator createServerLocatorWithHA(TransportConfigur=
ation... initialServers)
{
- return new ServerLocatorImpl(true, initialServers);
+ Map params =3D new HashMap();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, init=
ialServers);
+ DiscoveryGroupConfiguration config =3D new DiscoveryGroupConfigurati=
on(StaticServerLocatorImpl.class.getName(), params, null);
+ return createServerLocatorWithHA(config);
}
- =
+
/**
* Create a ServerLocator which will receive cluster topology updates f=
rom the cluster as servers leave or join and new backups are appointed or r=
emoved.
* The discoveryAddress and discoveryPort parameters in this method are=
used to listen for UDP broadcasts which contain connection information for=
members of the cluster.
@@ -150,7 +168,20 @@
*/
public static ServerLocator createServerLocatorWithHA(final DiscoveryGr=
oupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(true, groupConfiguration);
+ ServerLocator serverLocator =3D null;
+ String className =3D groupConfiguration.getServerLocatorClassName();
+ try
+ {
+ ClassLoader loader =3D Thread.currentThread().getContextClassLoad=
er();
+ Class> clazz =3D loader.loadClass(className);
+ Constructor> constructor =3D clazz.getConstructor(Boolean.class=
, DiscoveryGroupConfiguration.class);
+ serverLocator =3D (ServerLocator)constructor.newInstance(Boolean.=
TRUE, groupConfiguration);
+ }
+ catch(Exception e)
+ {
+ new HornetQException(HornetQException.INTERNAL_ERROR, "Could not=
instantiate ServerLocator implementation class: " + className, e);
+ }
+ return serverLocator;
}
=
=
Copied: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/Abstract=
ServerLocator.java (from rev 10124, branches/HORNETQ-316/src/main/org/horne=
tq/core/client/impl/ServerLocatorImpl.java)
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServ=
erLocator.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServ=
erLocator.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,1026 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPoli=
cy;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A AbstractServerLocator
+ *
+ * @author Tim Fox
+ */
+public abstract class AbstractServerLocator implements ServerLocatorIntern=
al, Serializable
+{
+ private static final long serialVersionUID =3D -1615857864410205260L;
+
+ private static final Logger log =3D Logger.getLogger(AbstractServerLoca=
tor.class);
+
+ private final boolean ha;
+
+ private boolean finalizeCheck =3D true;
+
+ private boolean clusterConnection;
+
+ private final Set topologyListeners =3D new Ha=
shSet();
+
+ private Set factories =3D new HashSet();
+
+ private TransportConfiguration[] initialConnectors;
+
+ private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+
+ private Topology topology =3D new Topology();
+
+ private Pair[] topology=
Array;
+
+ private boolean receivedTopology;
+
+ private boolean compressLargeMessage;
+
+ private ExecutorService threadPool;
+
+ private ScheduledExecutorService scheduledThreadPool;
+
+ private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+ private boolean readOnly;
+
+ // Settable attributes:
+
+ private boolean cacheLargeMessagesClient;
+
+ private long clientFailureCheckPeriod;
+
+ private long connectionTTL;
+
+ private long callTimeout;
+
+ private int minLargeMessageSize;
+
+ private int consumerWindowSize;
+
+ private int consumerMaxRate;
+
+ private int confirmationWindowSize;
+
+ private int producerWindowSize;
+
+ private int producerMaxRate;
+
+ private boolean blockOnAcknowledge;
+
+ private boolean blockOnDurableSend;
+
+ private boolean blockOnNonDurableSend;
+
+ private boolean autoGroup;
+
+ private boolean preAcknowledge;
+
+ private String connectionLoadBalancingPolicyClassName;
+
+ private int ackBatchSize;
+
+ private boolean useGlobalPools;
+
+ private int scheduledThreadPoolMaxSize;
+
+ private int threadPoolMaxSize;
+
+ private long retryInterval;
+
+ private double retryIntervalMultiplier;
+
+ private long maxRetryInterval;
+
+ private int reconnectAttempts;
+
+ private int initialConnectAttempts;
+
+ private boolean failoverOnInitialConnection;
+
+ private int initialMessagePacketSize;
+
+ private volatile boolean closed;
+
+ private final List interceptors =3D new CopyOnWriteArrayLi=
st();
+
+ private static ExecutorService globalThreadPool;
+
+ private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private String groupID;
+
+ private String nodeID;
+
+ private TransportConfiguration clusterTransportConfiguration;
+
+ private boolean backup;
+
+ private final Exception e =3D new Exception();
+ =
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback =3D null;
+
+ private static synchronized ExecutorService getGlobalThreadPool()
+ {
+ if (globalThreadPool =3D=3D null)
+ {
+ ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien=
t-global-threads", true, getThisClassLoader());
+
+ globalThreadPool =3D Executors.newCachedThreadPool(factory);
+ }
+
+ return globalThreadPool;
+ }
+
+ public static synchronized ScheduledExecutorService getGlobalScheduledT=
hreadPool()
+ {
+ if (globalScheduledThreadPool =3D=3D null)
+ {
+ ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien=
t-global-scheduled-threads",
+ true,
+ getThisClassLoad=
er());
+
+ globalScheduledThreadPool =3D Executors.newScheduledThreadPool(Ho=
rnetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+ fact=
ory);
+ }
+
+ return globalScheduledThreadPool;
+ }
+
+ protected void setThreadPools()
+ {
+ if (useGlobalPools)
+ {
+ threadPool =3D getGlobalThreadPool();
+
+ scheduledThreadPool =3D getGlobalScheduledThreadPool();
+ }
+ else
+ {
+ ThreadFactory factory =3D new HornetQThreadFactory("HornetQ-clien=
t-factory-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoad=
er());
+
+ if (threadPoolMaxSize =3D=3D -1)
+ {
+ threadPool =3D Executors.newCachedThreadPool(factory);
+ }
+ else
+ {
+ threadPool =3D Executors.newFixedThreadPool(threadPoolMaxSize,=
factory);
+ }
+
+ factory =3D new HornetQThreadFactory("HornetQ-client-factory-ping=
er-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ scheduledThreadPool =3D Executors.newScheduledThreadPool(schedule=
dThreadPoolMaxSize, factory);
+ }
+ }
+
+ private static ClassLoader getThisClassLoader()
+ {
+ return AccessController.doPrivileged(new PrivilegedAction()
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
+ }
+
+ protected void instantiateLoadBalancingPolicy()
+ {
+ if (connectionLoadBalancingPolicyClassName =3D=3D null)
+ {
+ throw new IllegalStateException("Please specify a load balancing =
policy class name on the session factory");
+ }
+
+ AccessController.doPrivileged(new PrivilegedAction