[jboss-cvs] JBossAS SVN: r74015 - in trunk: cluster/src/main/org/jboss/ha/framework/server and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 3 16:15:02 EDT 2008
Author: bstansberry at jboss.com
Date: 2008-06-03 16:15:02 -0400 (Tue, 03 Jun 2008)
New Revision: 74015
Modified:
trunk/cluster/.classpath
trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml
trunk/component-matrix/pom.xml
trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java
trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java
Log:
[JBAS-5265] Upgrade to JGroups 2.6.3.CR1
Modified: trunk/cluster/.classpath
===================================================================
--- trunk/cluster/.classpath 2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/cluster/.classpath 2008-06-03 20:15:02 UTC (rev 74015)
@@ -4,11 +4,11 @@
<classpathentry kind="src" path="src/examples"/>
<classpathentry exported="true" kind="src" path="/server"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
- <classpathentry exported="true" kind="lib" path="/thirdparty/jgroups/lib/jgroups.jar" sourcepath="/thirdparty/jgroups/lib/jgroups-sources.jar"/>
+ <classpathentry exported="true" kind="lib" path="/thirdparty/jgroups/lib/jgroups.jar"/>
<classpathentry exported="true" kind="lib" path="/thirdparty/apache-logging/lib/commons-logging.jar"/>
<classpathentry exported="true" kind="lib" path="/thirdparty/jboss/jboss-ha-client/lib/jboss-ha-client.jar" sourcepath="/thirdparty/jboss/jboss-ha-client/lib/jboss-ha-client-sources.jar"/>
<classpathentry exported="true" kind="lib" path="/thirdparty/jboss/jboss-ha-server-api/lib/jboss-ha-server-api.jar" sourcepath="/thirdparty/jboss/jboss-ha-server-api/lib/jboss-ha-server-api-sources.jar"/>
<classpathentry exported="true" kind="lib" path="/thirdparty/jboss/cache/jbosscache-core/lib/jbosscache-core.jar" sourcepath="/thirdparty/jboss/cache/jbosscache-core/lib/jbosscache-core-sources.jar"/>
<classpathentry exported="true" kind="lib" path="/thirdparty/jboss/cache/jbosscache-pojo/lib/jbosscache-pojo.jar" sourcepath="/thirdparty/jboss/cache/jbosscache-pojo/lib/jbosscache-pojo-sources.jar"/>
- <classpathentry kind="output" path="output/eclipse-classes"/>
+ <classpathentry kind="output" path="output/eclipse-classes"/>
</classpath>
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java 2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java 2008-06-03 20:15:02 UTC (rev 74015)
@@ -22,7 +22,6 @@
package org.jboss.ha.framework.server;
-import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.rmi.dgc.VMID;
import java.rmi.server.UID;
@@ -32,12 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import javax.management.MBeanRegistration;
import javax.management.MBeanServer;
@@ -46,7 +40,6 @@
import org.jboss.logging.Logger;
import org.jboss.system.ServiceMBean;
import org.jboss.system.server.ServerConfigUtil;
-import org.jboss.util.StringPropertyReplacer;
import org.jboss.util.loading.ContextClassLoaderSwitcher;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
@@ -62,8 +55,11 @@
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.DefaultThreadFactory;
+import org.jgroups.util.LazyThreadFactory;
+import org.jgroups.util.ThreadDecorator;
import org.jgroups.util.ThreadFactory;
-import org.jgroups.util.ThreadNamingPattern;
+import org.jgroups.util.ThreadManager;
import org.jgroups.util.Util;
/**
@@ -108,17 +104,18 @@
private int namingServicePort = -1;
private int state = ServiceMBean.UNREGISTERED;
private boolean assignLogicalAddresses = true;
- private boolean manageChannelThreadPools = true;
+ private boolean manageNewThreadClassLoader = true;
+ private boolean manageReleasedThreadClassLoader = false;
private boolean addMissingSingletonName = true;
private boolean domainSet;
private final ContextClassLoaderSwitcher classLoaderSwitcher;
private final Set<String> registeredChannels = new HashSet<String>();
+ @SuppressWarnings("unchecked")
public JChannelFactory()
{
this.classLoaderSwitcher = (ContextClassLoaderSwitcher) AccessController.doPrivileged(ContextClassLoaderSwitcher.INSTANTIATOR);
- }
-
+ }
/**
* Always throws <code>ChannelException</code>; this method is not supported.
@@ -136,9 +133,9 @@
Channel channel = super.createChannel(properties);
- if (manageChannelThreadPools)
+ if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
{
- fixChannelThreadPools(channel);
+ fixChannelThreadManagement(channel);
}
if (assignLogicalAddresses)
@@ -176,9 +173,9 @@
JChannel channel = new JChannel(props);
- if (manageChannelThreadPools)
+ if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
{
- fixChannelThreadPools(channel);
+ fixChannelThreadManagement(channel);
}
if (assignLogicalAddresses)
@@ -242,7 +239,7 @@
throw new IllegalStateException("Unknown stack_name " + stack_name);
ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
- Map tpProps = getTransportProperties(config);
+ Map<String, String> tpProps = getTransportProperties(config);
if (!tpProps.containsKey(Global.SINGLETON_NAME))
{
@@ -267,9 +264,9 @@
JChannel channel = new JChannel(config);
- if (manageChannelThreadPools)
+ if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
{
- fixChannelThreadPools(channel, tpProps);
+ fixChannelThreadManagement(channel);
}
if (assignLogicalAddresses)
@@ -434,33 +431,63 @@
}
/**
- * Gets whether this factory should override the standard JGroups
- * thread pools and thread factories to ensure application classloaders do
- * not leak to channel threads.
+ * Gets whether this factory should update the standard JGroups
+ * thread factories to ensure application classloaders do not leak to
+ * newly created channel threads.
*
- * @return <code>true</code> if the pools and factories should be overridden.
+ * @return <code>true</code> if the factories should be updated.
* Default is <code>true</code>.
*/
- public boolean getManageChannelThreadPools()
+ public boolean getManageNewThreadClassLoader()
{
- return manageChannelThreadPools;
+ return manageNewThreadClassLoader;
}
/**
- * Sets whether this factory should override the standard JGroups
- * thread pools and thread factories to ensure application classloaders do
- * not leak to channel threads. This should only be set to <code>false</code>
+ * Sets whether this factory should update the standard JGroups
+ * thread factories to ensure application classloaders do not leak to
+ * newly created channel threads. This should only be set to <code>false</code>
* if a JGroups release is used that itself prevents such classloader leaks.
*
- * @param manageChannelThreadPools <code>true</code> if the pools and
- * factories should be overridden.
+ * @param manage <code>true</code> if the factories should be updated.
*/
- public void setManageChannelThreadPools(boolean manageChannelThreadPools)
+ public void setManageNewThreadClassLoader(boolean manage)
{
- this.manageChannelThreadPools = manageChannelThreadPools;
+ this.manageNewThreadClassLoader = manage;
}
/**
+ * Gets whether this factory should update the standard JGroups
+ * thread pools to ensure application classloaders have not leaked to
+ * threads returned to the pool.
+ *
+ * @return <code>true</code> if the pools should be updated.
+ * Default is <code>false</code>.
+ */
+ public boolean getManageReleasedThreadClassLoader()
+ {
+ return manageReleasedThreadClassLoader;
+ }
+
+ /**
+ * Sets whether this factory should update the standard JGroups
+ * thread pools to ensure application classloaders have not leaked to
+ * threads returned to the pool.
+ * <p>
+ * There is a small performance cost to enabling this, and applications
+ * can prevent any need to enable it by properly restoring the thread
+ * context classloader if they change it. Therefore, by default this
+ * is set to <code>false</code>.
+ * </p>
+ *
+ * @param manage <code>true</code> if the factories should be updated.
+ */
+ public void setManageReleasedThreadClassLoader(boolean manage)
+ {
+ this.manageReleasedThreadClassLoader = manage;
+ }
+
+ /**
* Gets whether {@link #createMultiplexerChannel(String, String)} should
* create a synthetic singleton name attribute for a channel's transport
* protocol if one isn't configured. If this is <code>false</code> and
@@ -651,6 +678,20 @@
if (state == ServiceMBean.DESTROYED)
state = ServiceMBean.UNREGISTERED;
}
+
+ /**
+ * Gets the classloader that channel threads should be set to if
+ * {@link #getManageNewThreadClassloader()} or {@link #getManageReleasedThreadClassLoader()}
+ * are <code>true</code>.
+ * <p>
+ * This implementation returns this class' classloader.
+ *
+ * @return the classloader.
+ */
+ protected ClassLoader getDefaultChannelThreadContextClassLoader()
+ {
+ return getClass().getClassLoader();
+ }
// ----------------------------------------------------------------- Private
@@ -673,7 +714,7 @@
log.debug("Passing unique node id " + nodeName + " to the channel as additional data");
- java.util.HashMap staticNodeName = new java.util.HashMap();
+ HashMap<String, byte[]> staticNodeName = new HashMap<String, byte[]>();
staticNodeName.put("additional_data", this.nodeName.getBytes());
channel.down(new Event(Event.CONFIG, staticNodeName));
@@ -745,25 +786,10 @@
return hostIP + ":" + uid;
}
- private void fixChannelThreadPools(Channel channel) throws ChannelException
+ @SuppressWarnings("unchecked")
+ private Map<String, String> getTransportProperties(ProtocolStackConfigurator config)
{
- if (channel instanceof JChannel)
- {
- JChannel jchannel = (JChannel) channel;
- String configStr = jchannel.getProperties();
- ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
- Map tpProps = getTransportProperties(config);
- fixChannelThreadPools(jchannel, tpProps);
- }
- else
- {
- log.debug("Cannot fix thread pools for unknown Channel type " + channel.getClass().getName());
- }
- }
-
- private Map getTransportProperties(ProtocolStackConfigurator config)
- {
- Map tpProps = null;
+ Map<String, String> tpProps = null;
try
{
ProtocolData[] protocols=config.getProtocolStack();
@@ -777,7 +803,7 @@
String configStr = config.getProtocolStackString();
String tpConfigStr = configStr.substring(configStr.indexOf('(') + 1, configStr.indexOf(')'));
String[] params = tpConfigStr.split(";");
- tpProps = new HashMap();
+ tpProps = new HashMap<String, String>();
for (String param : params)
{
String[] keyVal = param.split("=");
@@ -790,6 +816,7 @@
return tpProps;
}
+ @SuppressWarnings("unchecked")
private ProtocolStackConfigurator addSingletonName(ProtocolStackConfigurator orig, String singletonName)
throws ChannelException
{
@@ -798,7 +825,7 @@
{
ProtocolData[] protocols=orig.getProtocolStack();
ProtocolData transport=protocols[0];
- Map tpProps = transport.getParameters();
+ Map<String, String> tpProps = transport.getParameters();
tpProps.put(Global.SINGLETON_NAME, singletonName);
// we've now updated the state of orig; just return it
result = orig;
@@ -822,139 +849,165 @@
return result;
}
- private void fixChannelThreadPools(JChannel channel, Map props)
+ private void fixChannelThreadManagement(Channel channel) throws ChannelException
{
- if (!channel.isConnected())
+ if (!(channel instanceof JChannel))
{
- TransportThreadPoolProperties tpProps = new TransportThreadPoolProperties(props);
-
- ProtocolStack stack = channel.getProtocolStack();
- List<Protocol> protocols = stack.getProtocols();
- for (int i = protocols.size() - 1; i >= 0; i--)
+ log.debug("Cannot fix thread pools for unknown Channel type " + channel.getClass().getName());
+ return;
+ }
+
+ JChannel jchannel = (JChannel) channel;
+
+ ProtocolStack stack = jchannel.getProtocolStack();
+ List<Protocol> protocols = stack.getProtocols();
+ TP tp = null;
+ for (int i = protocols.size() - 1; i >= 0; i--)
+ {
+ if (protocols.get(i) instanceof TP)
{
- if (protocols.get(i) instanceof TP)
- {
- TP tp = (TP) protocols.get(i);
- fixChannelThreadPools(tp, stack, tpProps);
- break;
- }
+ tp = (TP) protocols.get(i);
+ break;
}
}
+
+ ClassLoader defaultTCCL = getDefaultChannelThreadContextClassLoader();
+ ThreadDecoratorImpl threadDecorator = new ThreadDecoratorImpl(defaultTCCL);
+ if (manageNewThreadClassLoader)
+ {
+ fixProtocolThreadFactories(tp, threadDecorator);
+ }
+
+ if (manageReleasedThreadClassLoader)
+ {
+ fixTransportThreadPools(tp, threadDecorator);
+ }
}
-
- private void fixChannelThreadPools(TP tp, ProtocolStack stack, TransportThreadPoolProperties tpProps)
- {
- ClassLoader tccl = classLoaderSwitcher.getContextClassLoader();
- if (tccl == null)
+
+ private void fixProtocolThreadFactories(TP tp, ThreadDecoratorImpl threadDecorator)
+ {
+ ThreadFactory stackFactory = tp.getThreadFactory();
+ if (stackFactory == null)
{
- tccl = getClass().getClassLoader();
+ stackFactory = new DefaultThreadFactory(Util.getGlobalThreadGroup(), "", false);
+ tp.setThreadFactory(stackFactory);
}
- WeakReference<ClassLoader> tcclRef = new WeakReference(tccl);
+ fixThreadManager(stackFactory, threadDecorator, "TP.getThreadFactory()");
- if (tpProps.thread_pool_enabled && tp.getDefaultThreadPool() == null)
+ log.debug("Fixed thread factory for " + tp);
+
+ ThreadFactory timerFactory = tp.getTimerThreadFactory();
+ if (timerFactory == null)
{
- BlockingQueue<Runnable> thread_pool_queue = null;
- if (tpProps.thread_pool_queue_enabled)
+ timerFactory = new LazyThreadFactory(Util.getGlobalThreadGroup(), "Timer", true, true);
+ tp.setTimerThreadFactory(timerFactory);
+ }
+ fixThreadManager(timerFactory, threadDecorator, "TP.getTimerThreadFactory()");
+
+ log.debug("Fixed timer thread factory for " + tp);
+
+ ThreadGroup pool_thread_group = null;
+ if (tp.isDefaulThreadPoolEnabled())
+ {
+ ThreadFactory defaultPoolFactory = tp.getDefaultThreadPoolThreadFactory();
+ if (defaultPoolFactory == null)
{
- thread_pool_queue=new LinkedBlockingQueue<Runnable>(tpProps.thread_pool_queue_max_size);
+ pool_thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "Thread Pools");
+ defaultPoolFactory = new DefaultThreadFactory(pool_thread_group, "Incoming", false, true);
+ tp.setThreadFactory(defaultPoolFactory);
}
- else
- {
- thread_pool_queue=new SynchronousQueue<Runnable>();
- }
- ExecutorService thread_pool = createThreadPool(tpProps.thread_pool_min_threads,
- tpProps.thread_pool_max_threads,
- tpProps.thread_pool_keep_alive_time,
- tpProps.thread_pool_rejection_policy,
- thread_pool_queue,
- "Incoming",
- tpProps.thread_naming_pattern,
- tcclRef);
- tp.setDefaultThreadPool(thread_pool);
+ fixThreadManager(defaultPoolFactory, threadDecorator, "TP.getDefaultThreadPoolThreadFactory()");
- log.debug("Fixed default thread pool for " + tp);
+ log.debug("Fixed default pool thread factory for " + tp);
}
- if (tpProps.oob_thread_pool_enabled && tp.getOOBThreadPool() == null)
+ if (tp.isOOBThreadPoolEnabled())
{
- BlockingQueue<Runnable> oob_thread_pool_queue = null;
- if(tpProps.oob_thread_pool_queue_enabled)
+ ThreadFactory oobPoolFactory = tp.getOOBThreadPoolThreadFactory();
+ if (oobPoolFactory == null)
{
- oob_thread_pool_queue=new LinkedBlockingQueue<Runnable>(tpProps.oob_thread_pool_queue_max_size);
+ if (pool_thread_group == null)
+ pool_thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "Thread Pools");
+ oobPoolFactory = new DefaultThreadFactory(pool_thread_group, "Incoming", false, true);
+ tp.setThreadFactory(oobPoolFactory);
}
- else
+ fixThreadManager(oobPoolFactory, threadDecorator, "TP.getOOBThreadPoolThreadFactory()");
+
+ log.debug("Fixed oob pool thread factory for " + tp);
+ }
+
+ Map<ThreadFactory, Protocol> factories= new HashMap<ThreadFactory, Protocol>();
+ Protocol tmp=tp.getUpProtocol();
+ while(tmp != null) {
+ ThreadFactory f=tmp.getThreadFactory();
+ if(f != null && !factories.containsKey(f))
{
- oob_thread_pool_queue=new SynchronousQueue<Runnable>();
+ factories.put(f, tmp);
}
- ExecutorService oob_thread_pool = createThreadPool(tpProps.oob_thread_pool_min_threads,
- tpProps.oob_thread_pool_max_threads,
- tpProps.oob_thread_pool_keep_alive_time,
- tpProps.oob_thread_pool_rejection_policy,
- oob_thread_pool_queue,
- "OOB",
- tpProps.thread_naming_pattern,
- tcclRef);
-
- tp.setOOBThreadPool(oob_thread_pool);
-
- log.debug("Fixed OOB thread pool for " + tp);
+ tmp=tmp.getUpProtocol();
}
- ThreadFactory stackFactoryBase = ProtocolStack.newThreadFactory(Util.getGlobalThreadGroup(),"",false);
- ThreadFactory stackFactory = new ClassLoaderControlThreadFactory(stackFactoryBase, tcclRef);
- stack.setThreadFactory(stackFactory);
+ for (Map.Entry<ThreadFactory, Protocol> entry : factories.entrySet())
+ {
+ fixThreadManager(entry.getKey(), threadDecorator, entry.getValue().getClass().getSimpleName() + ".getThreadFactory()");
+ }
- log.debug("Fixed thread factory for " + stack);
-
- ThreadFactory timerFactoryBase = ProtocolStack.newThreadFactory(new ThreadGroup(Util.getGlobalThreadGroup(), "Timers"), "Timer", true);
- ThreadFactory timerFactory = new ClassLoaderControlThreadFactory(timerFactoryBase, tcclRef);
- stack.setTimerThreadFactory(timerFactory);
-
- log.debug("Fixed timer thread factory for " + stack);
+ log.debug("Fixed Protocol thread factories");
}
- private ExecutorService createThreadPool(int min_threads, int max_threads,
- long keep_alive_time, String rejection_policy,
- BlockingQueue<Runnable> queue, final String thread_name,
- ThreadNamingPattern thread_naming_pattern,
- WeakReference<ClassLoader> tcclRef)
+ private void fixTransportThreadPools(TP tp, ThreadDecoratorImpl threadDecorator)
{
- ThreadPoolExecutor pool=new RestoreTCCLThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue, tcclRef);
-
- ThreadGroup pool_thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "Thread Pools");
- // FIXME: replace with call to ProtocolStack.newIDThreadFactory(...)
- // when we integrate JGroups 2.6.3 or later
- ThreadFactory base = new IdThreadFactory(new DefaultThreadFactory(pool_thread_group, thread_name, false),
- thread_naming_pattern);
- ThreadFactory threadFactory = new ClassLoaderControlThreadFactory(base, tcclRef);
- pool.setThreadFactory(threadFactory);
-
- if(rejection_policy != null) {
- if(rejection_policy.equals("abort"))
- pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
- else if(rejection_policy.equals("discard"))
- pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
- else if(rejection_policy.equals("discardoldest"))
- pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
- else
- pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- }
-
- return pool;
- }
+ Executor threadPool = tp.getDefaultThreadPool();
+ if (tp.isDefaulThreadPoolEnabled())
+ {
+ fixThreadManager(threadPool, threadDecorator, "TP.getDefaultThreadPool()");
+
+ log.debug("Fixed default thread pool for " + tp);
+ }
+
+ threadPool = tp.getOOBThreadPool();
+ if (tp.isOOBThreadPoolEnabled())
+ {
+ fixThreadManager(threadPool, threadDecorator, "TP.getOOBThreadPool()");
+
+ log.debug("Fixed OOB thread pool for " + tp);
+ }
+ }
+ private void fixThreadManager(Object manager, ThreadDecoratorImpl decorator, String managerSource)
+ {
+ if (manager instanceof ThreadManager)
+ {
+ ThreadManager threadManager = (ThreadManager) manager;
+
+ ThreadDecorator existing = threadManager.getThreadDecorator();
+ if (existing instanceof ThreadDecoratorImpl)
+ {
+ // already been handled
+ return;
+ }
+ else if (existing != null)
+ {
+ // someone else has added one; integrate with it
+ decorator.setParent(existing);
+ }
+ threadManager.setThreadDecorator(decorator);
+ }
+ else
+ {
+ log.warn(managerSource + " is not a ThreadManager");
+ }
+ }
+
/**
* Sets the context class loader on <code>thread</code> to the classloader
* in effect when this factory was instantiated.
*
* @param thread the thread to set
*/
- private void setDefaultThreadContextClassLoader(Thread thread, WeakReference<ClassLoader> classLoaderRef)
+ private void setDefaultThreadContextClassLoader(Thread thread, ClassLoader classLoader)
{
- ClassLoader cl = classLoaderRef.get();
- if (cl == null)
- cl = getClass().getClassLoader();
- classLoaderSwitcher.setContextClassLoader(thread, cl);
+ classLoaderSwitcher.setContextClassLoader(thread, classLoader);
}
private void registerChannel(JChannel ch, String channelId) throws Exception
@@ -993,308 +1046,43 @@
}
}
}
-
- /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
- static class DefaultThreadFactory implements ThreadFactory
- {
- private final ThreadGroup group;
- private final String baseName;
- private final boolean createDaemons;
-
- public DefaultThreadFactory(ThreadGroup group,
- String baseName,
- boolean createDaemons)
- {
- this.group = group;
- this.baseName = baseName;
- this.createDaemons = createDaemons;
- }
-
- public Thread newThread(Runnable r, String name)
- {
- return newThread(group, r, name);
- }
-
- public Thread newThread(Runnable r)
- {
- return newThread(group, r, baseName);
- }
-
- public Thread newThread(ThreadGroup group, Runnable r, String name)
- {
- Thread thread = new Thread(group, r, name);
- thread.setDaemon(createDaemons);
- return thread;
- }
- }
- /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
- protected static class PatternedThreadFactory implements ThreadFactory
+ private class ThreadDecoratorImpl implements ThreadDecorator
{
- protected final ThreadFactory f;
- protected ThreadNamingPattern pattern;
-
- public PatternedThreadFactory(ThreadFactory factory, ThreadNamingPattern pattern)
- {
- f = factory;
- this.pattern = pattern;
- }
-
- public void setThreadNamingPattern(ThreadNamingPattern pattern)
- {
- this.pattern = pattern;
- }
-
- public Thread newThread(Runnable r, String name)
- {
- Thread newThread = f.newThread(r, name);
- renameThread(newThread);
- return newThread;
- }
-
- public Thread newThread(Runnable r)
- {
- Thread newThread = f.newThread(r);
- renameThread(newThread);
- return newThread;
- }
-
- public Thread newThread(ThreadGroup group, Runnable r, String name)
- {
- Thread newThread = f.newThread(group, r, name);
- renameThread(newThread);
- return newThread;
- }
-
- protected void renameThread(Thread new_thread)
- {
- if(pattern!=null)
- pattern.renameThread(new_thread);
- }
- }
-
- /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
- private static class IdThreadFactory extends PatternedThreadFactory
- {
- short current_id=0;
-
- public IdThreadFactory(ThreadFactory factory, ThreadNamingPattern pattern)
- {
- super(factory, pattern);
- }
-
- protected void renameThread(Thread new_thread)
- {
- if(pattern != null)
- {
- short id;
- synchronized(this)
- {
- id=++current_id;
- }
- pattern.renameThread(new_thread.getName() + "-" + id, new_thread);
- }
- }
- }
-
- private class ClassLoaderControlThreadFactory implements ThreadFactory
- {
- private final ThreadFactory f;
- private final WeakReference<ClassLoader> classLoaderRef;
-
- public ClassLoaderControlThreadFactory(ThreadFactory factory, WeakReference<ClassLoader> classLoaderRef)
+ private final ClassLoader classloader;
+ private ThreadDecorator parent;
+
+ private ThreadDecoratorImpl(ClassLoader classloader)
{
- this.f = factory;
- this.classLoaderRef = classLoaderRef;
+ this.classloader = classloader;
}
-
- public Thread newThread(Runnable r, String name)
+ public void threadCreated(Thread thread)
{
- Thread newThread = f.newThread(r, name);
- setDefaultThreadContextClassLoader(newThread, classLoaderRef);
- return newThread;
+ if (parent != null)
+ parent.threadCreated(thread);
+ setDefaultThreadContextClassLoader(thread, classloader);
}
- public Thread newThread(Runnable r)
+ public void threadReleased(Thread thread)
{
- Thread newThread = f.newThread(r);
- setDefaultThreadContextClassLoader(newThread, classLoaderRef);
- return newThread;
- }
-
- public Thread newThread(ThreadGroup group, Runnable r, String name)
- {
- Thread newThread = f.newThread(group, r, name);
- setDefaultThreadContextClassLoader(newThread, classLoaderRef);
- return newThread;
+ if (parent != null)
+ parent.threadCreated(thread);
+ setDefaultThreadContextClassLoader(thread, classloader);
}
- }
-
- private class RestoreTCCLThreadPoolExecutor extends ThreadPoolExecutor
- {
- private final WeakReference<ClassLoader> classLoaderRef;
-
- public RestoreTCCLThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- WeakReference<ClassLoader> classLoaderRef)
+
+ public ThreadDecorator getParent()
{
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- this.classLoaderRef = classLoaderRef;
+ return parent;
}
- @Override
- protected void afterExecute(Runnable r, Throwable t)
+ public void setParent(ThreadDecorator parent)
{
- try
- {
- super.afterExecute(r, t);
- }
- finally
- {
- setDefaultThreadContextClassLoader(Thread.currentThread(), classLoaderRef);
- }
- }
- }
-
- // Yikes! We recreate all the TP.setProperties stuff here. :(
- private class TransportThreadPoolProperties
- {
- private ThreadNamingPattern thread_naming_pattern = new ThreadNamingPattern("cl");
- private boolean oob_thread_pool_enabled=true;
- private int oob_thread_pool_min_threads=2;
- private int oob_thread_pool_max_threads=10;
- private long oob_thread_pool_keep_alive_time=30000;
- private boolean oob_thread_pool_queue_enabled=true;
- private int oob_thread_pool_queue_max_size=500;
- private String oob_thread_pool_rejection_policy="Run";
-
- private boolean thread_pool_enabled=true;
- private int thread_pool_min_threads=2;
- private int thread_pool_max_threads=10;
- private long thread_pool_keep_alive_time=30000;
- private boolean thread_pool_queue_enabled=true;
- private int thread_pool_queue_max_size=500;
- private String thread_pool_rejection_policy="Run";
-
- TransportThreadPoolProperties(Map props)
- {
- String str = (String) props.get("thread_naming_pattern");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- thread_naming_pattern=new ThreadNamingPattern(str);
- }
-
- // ================= OOB thread pool =============================
-
- str = (String) props.get("oob_thread_pool.enabled");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- oob_thread_pool_enabled=Boolean.valueOf(str).booleanValue();
- }
-
- str = (String) props.get("oob_thread_pool.min_threads");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- oob_thread_pool_min_threads=Integer.valueOf(str).intValue();
- }
-
- str = (String) props.get("oob_thread_pool.max_threads");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- oob_thread_pool_max_threads=Integer.valueOf(str).intValue();
- }
-
- str = (String) props.get("oob_thread_pool.keep_alive_time");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- oob_thread_pool_keep_alive_time=Long.valueOf(str).longValue();
- }
-
- str = (String) props.get("oob_thread_pool.queue_enabled");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- oob_thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
- }
-
- str = (String) props.get("oob_thread_pool.queue_max_size");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- oob_thread_pool_queue_max_size=Integer.valueOf(str).intValue();
- }
-
- str = (String) props.get("oob_thread_pool.rejection_policy");
- if(str != null)
- {
- str = str.toLowerCase().trim();
- str = StringPropertyReplacer.replaceProperties(str);
- oob_thread_pool_rejection_policy = str;
- }
-
- // ================= Regular thread pool =============================
-
- str = (String) props.get("thread_pool.enabled");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- thread_pool_enabled=Boolean.valueOf(str).booleanValue();
- }
-
- str = (String) props.get("thread_pool.min_threads");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- thread_pool_min_threads=Integer.valueOf(str).intValue();
- }
-
- str = (String) props.get("thread_pool.max_threads");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- thread_pool_max_threads=Integer.valueOf(str).intValue();
- }
-
- str = (String) props.get("thread_pool.keep_alive_time");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- thread_pool_keep_alive_time=Long.valueOf(str).longValue();
- }
-
- str = (String) props.get("thread_pool.queue_enabled");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
- }
-
- str = (String) props.get("thread_pool.queue_max_size");
- if(str != null)
- {
- str = StringPropertyReplacer.replaceProperties(str);
- thread_pool_queue_max_size=Integer.valueOf(str).intValue();
- }
-
- str = (String) props.get("thread_pool.rejection_policy");
- if(str != null)
- {
- str=str.toLowerCase().trim();
- str = StringPropertyReplacer.replaceProperties(str);
- thread_pool_rejection_policy=str;
- }
+ this.parent = parent;
}
+
}
-
-
private class JmxDeregistrationChannelListener extends ChannelListenerAdapter
{
private final String channelId;
Modified: trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml
===================================================================
--- trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml 2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml 2008-06-03 20:15:02 UTC (rev 74015)
@@ -17,7 +17,8 @@
<property name="exposeChannels">true</property>
<property name="exposeProtocols">true</property>
- <property name="manageChannelThreadPools">true</property>
+ <property name="manageNewThreadClassLoader">true</property>
+ <property name="manageReleasedThreadClassLoader">false</property>
<!-- Whether or not to assign a unique name to nodes -->
<property name="assignLogicalAddresses">true</property>
Modified: trunk/component-matrix/pom.xml
===================================================================
--- trunk/component-matrix/pom.xml 2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/component-matrix/pom.xml 2008-06-03 20:15:02 UTC (rev 74015)
@@ -825,7 +825,7 @@
<dependency>
<groupId>jgroups</groupId>
<artifactId>jgroups</artifactId>
- <version>2.6.2</version>
+ <version>2.6.3.CR1</version>
</dependency>
<dependency>
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java 2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java 2008-06-03 20:15:02 UTC (rev 74015)
@@ -75,6 +75,7 @@
private Channel channel1;
private Channel channel2;
private String jgroups_bind_addr;
+ private ClassLoader testLoader;
/**
* Create a new JChannelFactoryUnitTestCase.
@@ -93,22 +94,26 @@
super.setUp();
+ testLoader = new ClassLoader(Thread.currentThread().getContextClassLoader()){};
+
String jgroups_bind_addr = System.getProperty("jgroups.bind_addr");
if (jgroups_bind_addr == null)
{
System.setProperty("jbosstest.cluster.node0", System.getProperty("jbosstest.cluster.node0", "localhost"));
}
- factory1 = new JChannelFactory();
+ factory1 = new TestClassLoaderJChannelFactory();
factory1.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
factory1.setAssignLogicalAddresses(false);
factory1.setExposeChannels(false);
+ factory1.setManageReleasedThreadClassLoader(true);
factory1.create();
factory1.start();
- factory2 = new JChannelFactory();
+ factory2 = new TestClassLoaderJChannelFactory();
factory2.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
factory2.setAssignLogicalAddresses(false);
factory2.setExposeChannels(false);
+ factory2.setManageReleasedThreadClassLoader(true);
factory2.create();
factory2.start();
}
@@ -117,6 +122,8 @@
{
super.tearDown();
+ testLoader = null;
+
if (jgroups_bind_addr == null)
System.clearProperty("jgroups.bind_addr");
@@ -215,14 +222,13 @@
ClassLoader ours = Thread.currentThread().getContextClassLoader();
// The classloader we want channel threads to use
- ClassLoader cl = new ClassLoader(ours){};
- ClassLoaderLeakHandler handler = new ClassLoaderLeakHandler(cl, semaphore, runnerGroup);
+ ClassLoaderLeakHandler handler = new ClassLoaderLeakHandler(testLoader, semaphore, runnerGroup);
MessageDispatcher[] dispatchers = new MessageDispatcher[2];
- Thread.currentThread().setContextClassLoader(cl);
- try
- {
+// Thread.currentThread().setContextClassLoader(testLoader);
+// try
+// {
channel1 = (Channel) factoryMeth.invoke(factory1, factory1Args);
dispatchers[0] = new MessageDispatcher(channel1, handler, handler, handler);
@@ -234,11 +240,11 @@
channel2.connect("leaktest");
assertEquals("No classloader leak on channel2 connect", null, handler.getLeakedClassLoader());
- }
- finally
- {
- Thread.currentThread().setContextClassLoader(ours);
- }
+// }
+// finally
+// {
+// Thread.currentThread().setContextClassLoader(ours);
+// }
log.info("Channels connected");
@@ -369,6 +375,7 @@
private final Semaphore semaphore;
private final ClassLoader expected;
private final ThreadGroup runnerGroup;
+ private final Thread main;
private ClassLoader leakedClassLoader;
private final int numPermits;
@@ -378,6 +385,7 @@
this.semaphore = semaphore;
this.runnerGroup = runnerGroup;
this.numPermits = this.semaphore.availablePermits();
+ this.main = Thread.currentThread();
}
public Object handle(Message msg)
@@ -429,7 +437,8 @@
if (leakedClassLoader == null) // ignore msgs once we found a leak
{
// ignore runner threads that loop all the way back up
- if (Thread.currentThread().getThreadGroup().equals(runnerGroup))
+ Thread current = Thread.currentThread();
+ if (current == main || current.getThreadGroup().equals(runnerGroup))
{
return;
}
@@ -445,4 +454,16 @@
}
}
+
+ private class TestClassLoaderJChannelFactory
+ extends JChannelFactory
+ {
+
+ @Override
+ protected ClassLoader getDefaultChannelThreadContextClassLoader()
+ {
+ return testLoader;
+ }
+
+ }
}
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java 2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java 2008-06-03 20:15:02 UTC (rev 74015)
@@ -85,6 +85,7 @@
factory1.setNodeAddress(InetAddress.getByName("localhost"));
factory1.setNamingServicePort(123);
factory1.setExposeChannels(false);
+ factory1.setManageReleasedThreadClassLoader(true);
factory1.create();
factory1.start();
factory2 = new JChannelFactory();
@@ -94,6 +95,7 @@
factory2.setNamingServicePort(456);
factory2.setNodeName("node1");
factory2.setExposeChannels(false);
+ factory2.setManageReleasedThreadClassLoader(true);
factory2.create();
factory2.start();
}
@@ -193,17 +195,9 @@
Executor exec = tp1.getDefaultThreadPool();
assertNotNull(exec);
- if (exec instanceof ThreadPoolExecutor)
- {
- assertTrue(((ThreadPoolExecutor) exec).getRejectedExecutionHandler() instanceof ThreadPoolExecutor.DiscardPolicy);
- }
exec = tp1.getOOBThreadPool();
assertNotNull(exec);
- if (exec instanceof ThreadPoolExecutor)
- {
- assertTrue(((ThreadPoolExecutor) exec).getRejectedExecutionHandler() instanceof ThreadPoolExecutor.AbortPolicy);
- }
// Confirm that the no-pool config doesn't create a pool
channel2 = factory1.createChannel("nonconcurrent1");
More information about the jboss-cvs-commits
mailing list