[jboss-cvs] JBossAS SVN: r74015 - in trunk: cluster/src/main/org/jboss/ha/framework/server and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 3 16:15:02 EDT 2008


Author: bstansberry at jboss.com
Date: 2008-06-03 16:15:02 -0400 (Tue, 03 Jun 2008)
New Revision: 74015

Modified:
   trunk/cluster/.classpath
   trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
   trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml
   trunk/component-matrix/pom.xml
   trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java
   trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java
Log:
[JBAS-5265] Upgrade to JGroups 2.6.3.CR1

Modified: trunk/cluster/.classpath
===================================================================
--- trunk/cluster/.classpath	2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/cluster/.classpath	2008-06-03 20:15:02 UTC (rev 74015)
@@ -4,11 +4,11 @@
 	<classpathentry kind="src" path="src/examples"/>
 	<classpathentry exported="true" kind="src" path="/server"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
-	<classpathentry exported="true" kind="lib" path="/thirdparty/jgroups/lib/jgroups.jar" sourcepath="/thirdparty/jgroups/lib/jgroups-sources.jar"/>
+	<classpathentry exported="true" kind="lib" path="/thirdparty/jgroups/lib/jgroups.jar"/>
 	<classpathentry exported="true" kind="lib" path="/thirdparty/apache-logging/lib/commons-logging.jar"/>
 	<classpathentry exported="true" kind="lib" path="/thirdparty/jboss/jboss-ha-client/lib/jboss-ha-client.jar" sourcepath="/thirdparty/jboss/jboss-ha-client/lib/jboss-ha-client-sources.jar"/>
 	<classpathentry exported="true" kind="lib" path="/thirdparty/jboss/jboss-ha-server-api/lib/jboss-ha-server-api.jar" sourcepath="/thirdparty/jboss/jboss-ha-server-api/lib/jboss-ha-server-api-sources.jar"/>
 	<classpathentry exported="true" kind="lib" path="/thirdparty/jboss/cache/jbosscache-core/lib/jbosscache-core.jar" sourcepath="/thirdparty/jboss/cache/jbosscache-core/lib/jbosscache-core-sources.jar"/>
 	<classpathentry exported="true" kind="lib" path="/thirdparty/jboss/cache/jbosscache-pojo/lib/jbosscache-pojo.jar" sourcepath="/thirdparty/jboss/cache/jbosscache-pojo/lib/jbosscache-pojo-sources.jar"/>
-   <classpathentry kind="output" path="output/eclipse-classes"/>
+	<classpathentry kind="output" path="output/eclipse-classes"/>
 </classpath>

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java	2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java	2008-06-03 20:15:02 UTC (rev 74015)
@@ -22,7 +22,6 @@
 
 package org.jboss.ha.framework.server;
 
-import java.lang.ref.WeakReference;
 import java.net.InetAddress;
 import java.rmi.dgc.VMID;
 import java.rmi.server.UID;
@@ -32,12 +31,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import javax.management.MBeanRegistration;
 import javax.management.MBeanServer;
@@ -46,7 +40,6 @@
 import org.jboss.logging.Logger;
 import org.jboss.system.ServiceMBean;
 import org.jboss.system.server.ServerConfigUtil;
-import org.jboss.util.StringPropertyReplacer;
 import org.jboss.util.loading.ContextClassLoaderSwitcher;
 import org.jgroups.Channel;
 import org.jgroups.ChannelException;
@@ -62,8 +55,11 @@
 import org.jgroups.stack.IpAddress;
 import org.jgroups.stack.Protocol;
 import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.DefaultThreadFactory;
+import org.jgroups.util.LazyThreadFactory;
+import org.jgroups.util.ThreadDecorator;
 import org.jgroups.util.ThreadFactory;
-import org.jgroups.util.ThreadNamingPattern;
+import org.jgroups.util.ThreadManager;
 import org.jgroups.util.Util;
 
 /**
@@ -108,17 +104,18 @@
    private int namingServicePort = -1;
    private int state = ServiceMBean.UNREGISTERED;
    private boolean assignLogicalAddresses = true;
-   private boolean manageChannelThreadPools = true;
+   private boolean manageNewThreadClassLoader = true;
+   private boolean manageReleasedThreadClassLoader = false;
    private boolean addMissingSingletonName = true;
    private boolean domainSet;
    private final ContextClassLoaderSwitcher classLoaderSwitcher;
    private final Set<String> registeredChannels = new HashSet<String>();
 
+   @SuppressWarnings("unchecked")
    public JChannelFactory()
    {
       this.classLoaderSwitcher = (ContextClassLoaderSwitcher) AccessController.doPrivileged(ContextClassLoaderSwitcher.INSTANTIATOR);
-   }
-   
+   }   
 
    /**
     * Always throws <code>ChannelException</code>; this method is not supported.
@@ -136,9 +133,9 @@
       
       Channel channel = super.createChannel(properties);
       
-      if (manageChannelThreadPools)
+      if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
       {
-         fixChannelThreadPools(channel);
+         fixChannelThreadManagement(channel);
       }
       
       if (assignLogicalAddresses)
@@ -176,9 +173,9 @@
       
       JChannel channel = new JChannel(props);
       
-      if (manageChannelThreadPools)
+      if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
       {
-         fixChannelThreadPools(channel);
+         fixChannelThreadManagement(channel);
       }
       
       if (assignLogicalAddresses)
@@ -242,7 +239,7 @@
          throw new IllegalStateException("Unknown stack_name " + stack_name);
       
       ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
-      Map tpProps = getTransportProperties(config);
+      Map<String, String> tpProps = getTransportProperties(config);
       
       if (!tpProps.containsKey(Global.SINGLETON_NAME))
       {
@@ -267,9 +264,9 @@
       
       JChannel channel = new JChannel(config);
       
-      if (manageChannelThreadPools)
+      if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
       {
-         fixChannelThreadPools(channel, tpProps);
+         fixChannelThreadManagement(channel);
       }
       
       if (assignLogicalAddresses)
@@ -434,33 +431,63 @@
    }
 
    /**
-    * Gets whether this factory should override the standard JGroups
-    * thread pools and thread factories to ensure application classloaders do
-    * not leak to channel threads.
+    * Gets whether this factory should update the standard JGroups
+    * thread factories to ensure application classloaders do not leak to 
+    * newly created channel threads.
     * 
-    * @return <code>true</code> if the pools and factories should be overridden.
+    * @return <code>true</code> if the factories should be updated.
     *         Default is <code>true</code>.
     */
-   public boolean getManageChannelThreadPools()
+   public boolean getManageNewThreadClassLoader()
    {
-      return manageChannelThreadPools;
+      return manageNewThreadClassLoader;
    }
 
    /**
-    * Sets whether this factory should override the standard JGroups
-    * thread pools and thread factories to ensure application classloaders do
-    * not leak to channel threads. This should only be set to <code>false</code>
+    * Sets whether this factory should update the standard JGroups
+    * thread factories to ensure application classloaders do not leak to 
+    * newly created channel threads. This should only be set to <code>false</code>
     * if a JGroups release is used that itself prevents such classloader leaks.
     * 
-    * @param manageChannelThreadPools <code>true</code> if the pools and 
-    *                                 factories should be overridden.
+    * @param manage <code>true</code> if the factories should be updated.
     */
-   public void setManageChannelThreadPools(boolean manageChannelThreadPools)
+   public void setManageNewThreadClassLoader(boolean manage)
    {
-      this.manageChannelThreadPools = manageChannelThreadPools;
+      this.manageNewThreadClassLoader = manage;
    }
 
    /**
+    * Gets whether this factory should update the standard JGroups
+    * thread pools to ensure application classloaders have not leaked to 
+    * threads returned to the pool.
+    * 
+    * @return <code>true</code> if the pools should be updated.
+    *         Default is <code>false</code>.
+    */
+   public boolean getManageReleasedThreadClassLoader()
+   {
+      return manageReleasedThreadClassLoader;
+   }
+
+   /**
+    * Sets whether this factory should update the standard JGroups
+    * thread pools to ensure application classloaders have not leaked to 
+    * threads returned to the pool.
+    * <p>
+    * There is a small performance cost to enabling this, and applications
+    * can prevent any need to enable it by properly restoring the thread
+    * context classloader if they change it.  Therefore, by default this
+    * is set to <code>false</code>.
+    * </p>
+    * 
+    * @param manage <code>true</code> if the factories should be updated.
+    */
+   public void setManageReleasedThreadClassLoader(boolean manage)
+   {
+      this.manageReleasedThreadClassLoader = manage;
+   }
+
+   /**
     * Gets whether {@link #createMultiplexerChannel(String, String)} should 
     * create a synthetic singleton name attribute for a channel's transport
     * protocol if one isn't configured.  If this is <code>false</code> and
@@ -651,6 +678,20 @@
       if (state == ServiceMBean.DESTROYED)
          state = ServiceMBean.UNREGISTERED;
    }
+   
+   /**
+    * Gets the classloader that channel threads should be set to if
+    * {@link #getManageNewThreadClassloader()} or {@link #getManageReleasedThreadClassLoader()}
+    * are <code>true</code>.
+    * <p>
+    * This implementation returns this class' classloader.
+    * 
+    * @return the classloader.
+    */
+   protected ClassLoader getDefaultChannelThreadContextClassLoader()
+   {
+      return getClass().getClassLoader();
+   }
 
    // ----------------------------------------------------------------- Private
 
@@ -673,7 +714,7 @@
          
          log.debug("Passing unique node id " + nodeName + " to the channel as additional data");
          
-         java.util.HashMap staticNodeName = new java.util.HashMap();
+         HashMap<String, byte[]> staticNodeName = new HashMap<String, byte[]>();
          staticNodeName.put("additional_data", this.nodeName.getBytes());
          channel.down(new Event(Event.CONFIG, staticNodeName));
          
@@ -745,25 +786,10 @@
       return hostIP + ":" + uid;
    }
    
-   private void fixChannelThreadPools(Channel channel) throws ChannelException
+   @SuppressWarnings("unchecked")
+   private Map<String, String> getTransportProperties(ProtocolStackConfigurator config)
    {
-      if (channel instanceof JChannel)
-      {
-         JChannel jchannel = (JChannel) channel;
-         String configStr = jchannel.getProperties();
-         ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
-         Map tpProps = getTransportProperties(config);
-         fixChannelThreadPools(jchannel, tpProps);
-      }
-      else
-      {
-         log.debug("Cannot fix thread pools for unknown Channel type " + channel.getClass().getName());
-      }
-   }
-   
-   private Map getTransportProperties(ProtocolStackConfigurator config)
-   {
-      Map tpProps = null;
+      Map<String, String> tpProps = null;
       try
       {
          ProtocolData[] protocols=config.getProtocolStack();
@@ -777,7 +803,7 @@
          String configStr = config.getProtocolStackString();
          String tpConfigStr = configStr.substring(configStr.indexOf('(') + 1, configStr.indexOf(')'));
          String[] params = tpConfigStr.split(";");
-         tpProps = new HashMap();
+         tpProps = new HashMap<String, String>();
          for (String param : params)
          {
             String[] keyVal = param.split("=");
@@ -790,6 +816,7 @@
       return tpProps;
    }
   
+   @SuppressWarnings("unchecked")
    private ProtocolStackConfigurator addSingletonName(ProtocolStackConfigurator orig, String singletonName)
       throws ChannelException
    {
@@ -798,7 +825,7 @@
       {
          ProtocolData[] protocols=orig.getProtocolStack();
          ProtocolData transport=protocols[0];
-         Map tpProps = transport.getParameters();
+         Map<String, String> tpProps = transport.getParameters();
          tpProps.put(Global.SINGLETON_NAME, singletonName);
          // we've now updated the state of orig; just return it
          result = orig;
@@ -822,139 +849,165 @@
       return result;
    }
    
-   private void fixChannelThreadPools(JChannel channel, Map props)
+   private void fixChannelThreadManagement(Channel channel) throws ChannelException
    {
-      if (!channel.isConnected())
+      if (!(channel instanceof JChannel))
       {
-         TransportThreadPoolProperties tpProps = new TransportThreadPoolProperties(props);
-         
-         ProtocolStack stack = channel.getProtocolStack();
-         List<Protocol> protocols = stack.getProtocols();
-         for (int i = protocols.size() - 1; i >= 0; i--)
+         log.debug("Cannot fix thread pools for unknown Channel type " + channel.getClass().getName());
+         return;
+      }
+      
+      JChannel jchannel = (JChannel) channel;
+      
+      ProtocolStack stack = jchannel.getProtocolStack();
+      List<Protocol> protocols = stack.getProtocols();
+      TP tp = null;
+      for (int i = protocols.size() - 1; i >= 0; i--)
+      {
+         if (protocols.get(i) instanceof TP)
          {
-            if (protocols.get(i) instanceof TP)
-            {
-               TP tp = (TP) protocols.get(i);
-               fixChannelThreadPools(tp, stack, tpProps);
-               break;
-            }
+            tp = (TP) protocols.get(i);
+            break;
          }
       }
+      
+      ClassLoader defaultTCCL = getDefaultChannelThreadContextClassLoader();
+      ThreadDecoratorImpl threadDecorator = new ThreadDecoratorImpl(defaultTCCL);
+      if (manageNewThreadClassLoader)
+      {
+         fixProtocolThreadFactories(tp, threadDecorator);
+      }
+      
+      if (manageReleasedThreadClassLoader)
+      {
+         fixTransportThreadPools(tp, threadDecorator);
+      }
    }
-   
-   private void fixChannelThreadPools(TP tp, ProtocolStack stack, TransportThreadPoolProperties tpProps)
-   {      
-      ClassLoader tccl = classLoaderSwitcher.getContextClassLoader();
-      if (tccl == null)
+
+   private void fixProtocolThreadFactories(TP tp, ThreadDecoratorImpl threadDecorator)
+   {
+      ThreadFactory stackFactory = tp.getThreadFactory();
+      if (stackFactory == null)
       {
-         tccl = getClass().getClassLoader();
+         stackFactory = new DefaultThreadFactory(Util.getGlobalThreadGroup(), "", false);
+         tp.setThreadFactory(stackFactory);
       }
-      WeakReference<ClassLoader> tcclRef = new WeakReference(tccl);
+      fixThreadManager(stackFactory, threadDecorator, "TP.getThreadFactory()");
       
-      if (tpProps.thread_pool_enabled && tp.getDefaultThreadPool() == null) 
+      log.debug("Fixed thread factory for " + tp);
+      
+      ThreadFactory timerFactory = tp.getTimerThreadFactory();
+      if (timerFactory == null)
       {
-         BlockingQueue<Runnable> thread_pool_queue = null;
-         if (tpProps.thread_pool_queue_enabled)
+         timerFactory = new LazyThreadFactory(Util.getGlobalThreadGroup(), "Timer", true, true);
+         tp.setTimerThreadFactory(timerFactory);            
+      }
+      fixThreadManager(timerFactory, threadDecorator, "TP.getTimerThreadFactory()");
+      
+      log.debug("Fixed timer thread factory for " + tp);
+      
+      ThreadGroup pool_thread_group = null;
+      if (tp.isDefaulThreadPoolEnabled())
+      {
+         ThreadFactory defaultPoolFactory = tp.getDefaultThreadPoolThreadFactory();
+         if (defaultPoolFactory == null)
          {
-             thread_pool_queue=new LinkedBlockingQueue<Runnable>(tpProps.thread_pool_queue_max_size);
+            pool_thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "Thread Pools");
+            defaultPoolFactory = new DefaultThreadFactory(pool_thread_group, "Incoming", false, true);
+            tp.setThreadFactory(defaultPoolFactory);
          }
-         else
-         {
-             thread_pool_queue=new SynchronousQueue<Runnable>();
-         }
-         ExecutorService thread_pool = createThreadPool(tpProps.thread_pool_min_threads, 
-                                                        tpProps.thread_pool_max_threads, 
-                                                        tpProps.thread_pool_keep_alive_time,
-                                                        tpProps.thread_pool_rejection_policy, 
-                                                        thread_pool_queue, 
-                                                        "Incoming",
-                                                        tpProps.thread_naming_pattern,
-                                                        tcclRef);
-         tp.setDefaultThreadPool(thread_pool);
+         fixThreadManager(defaultPoolFactory, threadDecorator, "TP.getDefaultThreadPoolThreadFactory()");
          
-         log.debug("Fixed default thread pool for " + tp);
+         log.debug("Fixed default pool thread factory for " + tp);
       }
       
-      if (tpProps.oob_thread_pool_enabled && tp.getOOBThreadPool() == null) 
+      if (tp.isOOBThreadPoolEnabled())
       {
-         BlockingQueue<Runnable> oob_thread_pool_queue = null;
-         if(tpProps.oob_thread_pool_queue_enabled)
+         ThreadFactory oobPoolFactory = tp.getOOBThreadPoolThreadFactory();
+         if (oobPoolFactory == null)
          {
-             oob_thread_pool_queue=new LinkedBlockingQueue<Runnable>(tpProps.oob_thread_pool_queue_max_size);
+            if (pool_thread_group == null)
+               pool_thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "Thread Pools");
+            oobPoolFactory = new DefaultThreadFactory(pool_thread_group, "Incoming", false, true);
+            tp.setThreadFactory(oobPoolFactory);
          }
-         else
+         fixThreadManager(oobPoolFactory, threadDecorator, "TP.getOOBThreadPoolThreadFactory()");
+         
+         log.debug("Fixed oob pool thread factory for " + tp);
+      }
+      
+      Map<ThreadFactory, Protocol> factories= new HashMap<ThreadFactory, Protocol>();
+      Protocol tmp=tp.getUpProtocol();
+      while(tmp != null) {
+        ThreadFactory f=tmp.getThreadFactory();
+         if(f != null && !factories.containsKey(f))
          {
-             oob_thread_pool_queue=new SynchronousQueue<Runnable>();
+            factories.put(f, tmp);
          }
-         ExecutorService oob_thread_pool = createThreadPool(tpProps.oob_thread_pool_min_threads, 
-                                                            tpProps.oob_thread_pool_max_threads, 
-                                                            tpProps.oob_thread_pool_keep_alive_time,
-                                                            tpProps.oob_thread_pool_rejection_policy, 
-                                                            oob_thread_pool_queue, 
-                                                            "OOB",
-                                                            tpProps.thread_naming_pattern,
-                                                            tcclRef);
-         
-         tp.setOOBThreadPool(oob_thread_pool);
-         
-         log.debug("Fixed OOB thread pool for " + tp);
+         tmp=tmp.getUpProtocol();
       }
       
-      ThreadFactory stackFactoryBase = ProtocolStack.newThreadFactory(Util.getGlobalThreadGroup(),"",false);
-      ThreadFactory stackFactory = new ClassLoaderControlThreadFactory(stackFactoryBase, tcclRef);
-      stack.setThreadFactory(stackFactory);
+      for (Map.Entry<ThreadFactory, Protocol> entry : factories.entrySet())
+      {
+         fixThreadManager(entry.getKey(), threadDecorator, entry.getValue().getClass().getSimpleName() + ".getThreadFactory()");
+      }
       
-      log.debug("Fixed thread factory for " + stack);
-      
-      ThreadFactory timerFactoryBase = ProtocolStack.newThreadFactory(new ThreadGroup(Util.getGlobalThreadGroup(), "Timers"), "Timer", true);
-      ThreadFactory timerFactory = new ClassLoaderControlThreadFactory(timerFactoryBase, tcclRef);
-      stack.setTimerThreadFactory(timerFactory);
-      
-      log.debug("Fixed timer thread factory for " + stack);
+      log.debug("Fixed Protocol thread factories");
    }
 
-   private ExecutorService createThreadPool(int min_threads, int max_threads, 
-                                            long keep_alive_time, String rejection_policy,
-                                            BlockingQueue<Runnable> queue, final String thread_name,
-                                            ThreadNamingPattern thread_naming_pattern,
-                                            WeakReference<ClassLoader> tcclRef) 
+   private void fixTransportThreadPools(TP tp, ThreadDecoratorImpl threadDecorator)
    {
-       ThreadPoolExecutor pool=new RestoreTCCLThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue, tcclRef);
-       
-       ThreadGroup pool_thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "Thread Pools");
-       // FIXME: replace with call to ProtocolStack.newIDThreadFactory(...) 
-       // when we integrate JGroups 2.6.3 or later       
-       ThreadFactory base = new IdThreadFactory(new DefaultThreadFactory(pool_thread_group, thread_name, false), 
-                                                thread_naming_pattern);
-       ThreadFactory threadFactory = new ClassLoaderControlThreadFactory(base, tcclRef);
-       pool.setThreadFactory(threadFactory);
-
-       if(rejection_policy != null) {
-           if(rejection_policy.equals("abort"))
-               pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
-           else if(rejection_policy.equals("discard"))
-               pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
-           else if(rejection_policy.equals("discardoldest"))
-               pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
-           else
-               pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-       }
-
-       return pool;
-   }  
+      Executor threadPool = tp.getDefaultThreadPool();
+      if (tp.isDefaulThreadPoolEnabled())
+      {
+         fixThreadManager(threadPool, threadDecorator, "TP.getDefaultThreadPool()");
+         
+         log.debug("Fixed default thread pool for " + tp);
+      }
+      
+      threadPool = tp.getOOBThreadPool();
+      if (tp.isOOBThreadPoolEnabled())
+      {
+         fixThreadManager(threadPool, threadDecorator, "TP.getOOBThreadPool()"); 
+         
+         log.debug("Fixed OOB thread pool for " + tp);
+      }
+   }
    
+   private void fixThreadManager(Object manager, ThreadDecoratorImpl decorator, String managerSource)
+   {
+      if (manager instanceof ThreadManager)
+      {
+         ThreadManager threadManager = (ThreadManager) manager;
+         
+         ThreadDecorator existing = threadManager.getThreadDecorator();
+         if (existing instanceof ThreadDecoratorImpl)
+         {
+            // already been handled
+            return;
+         }
+         else if (existing != null)
+         {
+            // someone else has added one; integrate with it
+            decorator.setParent(existing);
+         }
+         threadManager.setThreadDecorator(decorator);
+      }
+      else
+      {
+         log.warn(managerSource + " is not a ThreadManager");
+      }
+   }
+   
    /** 
     * Sets the context class loader on <code>thread</code> to the classloader
     * in effect when this factory was instantiated.
     * 
     * @param thread the thread to set
     */
-   private void setDefaultThreadContextClassLoader(Thread thread, WeakReference<ClassLoader> classLoaderRef)
+   private void setDefaultThreadContextClassLoader(Thread thread, ClassLoader classLoader)
    {
-      ClassLoader cl = classLoaderRef.get();
-      if (cl == null)
-         cl = getClass().getClassLoader();
-      classLoaderSwitcher.setContextClassLoader(thread, cl);
+      classLoaderSwitcher.setContextClassLoader(thread, classLoader);
    }
    
    private void registerChannel(JChannel ch, String channelId) throws Exception 
@@ -993,308 +1046,43 @@
          }
       }
    }
-
-   /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
-   static class DefaultThreadFactory implements ThreadFactory
-   {       
-       private final ThreadGroup group;
-       private final String baseName;
-       private final boolean createDaemons;   
-       
-       public DefaultThreadFactory(ThreadGroup group,
-                                   String baseName, 
-                                   boolean createDaemons)
-       {
-           this.group = group;
-           this.baseName = baseName;
-           this.createDaemons = createDaemons;               
-       }
-             
-       public Thread newThread(Runnable r, String name) 
-       {
-           return newThread(group, r, name);
-       }      
-
-       public Thread newThread(Runnable r) 
-       {
-           return newThread(group, r, baseName);                      
-       }
-       
-       public Thread newThread(ThreadGroup group, Runnable r, String name) 
-       {
-           Thread thread = new Thread(group, r, name);
-           thread.setDaemon(createDaemons);
-           return thread;
-       }
-   } 
    
-   /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
-   protected static class PatternedThreadFactory implements ThreadFactory 
+   private class ThreadDecoratorImpl implements ThreadDecorator
    {
-       protected final ThreadFactory f;
-       protected ThreadNamingPattern pattern;
-
-       public PatternedThreadFactory(ThreadFactory factory, ThreadNamingPattern pattern)
-       {
-           f = factory;
-           this.pattern = pattern;           
-       }
-       
-       public void setThreadNamingPattern(ThreadNamingPattern pattern) 
-       {
-           this.pattern = pattern;           
-       }
-
-       public Thread newThread(Runnable r, String name) 
-       {
-           Thread newThread = f.newThread(r, name);
-           renameThread(newThread);
-           return newThread;
-       }
-
-       public Thread newThread(Runnable r) 
-       {
-           Thread newThread = f.newThread(r);
-           renameThread(newThread);
-           return newThread;
-       }        
-       
-       public Thread newThread(ThreadGroup group, Runnable r, String name) 
-       {
-           Thread newThread = f.newThread(group, r, name);
-           renameThread(newThread);
-           return newThread;
-       }
-
-       protected void renameThread(Thread new_thread) 
-       {
-           if(pattern!=null)
-               pattern.renameThread(new_thread);
-       }
-   }
-               
-   /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
-   private static class IdThreadFactory extends PatternedThreadFactory 
-   {
-       short current_id=0;
-
-       public IdThreadFactory(ThreadFactory factory, ThreadNamingPattern pattern) 
-       {
-           super(factory, pattern);
-       }              
-
-       protected void renameThread(Thread new_thread) 
-       {
-           if(pattern != null) 
-           {
-               short id;
-               synchronized(this) 
-               {
-                   id=++current_id;
-               }
-               pattern.renameThread(new_thread.getName() + "-" + id, new_thread);
-           }
-       }
-   }
-   
-   private class ClassLoaderControlThreadFactory implements ThreadFactory
-   {
-      private final ThreadFactory f;
-      private final WeakReference<ClassLoader> classLoaderRef;
-
-      public ClassLoaderControlThreadFactory(ThreadFactory factory, WeakReference<ClassLoader> classLoaderRef)
+      private final ClassLoader classloader;
+      private ThreadDecorator parent;
+      
+      private ThreadDecoratorImpl(ClassLoader classloader)
       {
-          this.f = factory;
-          this.classLoaderRef = classLoaderRef;
+         this.classloader = classloader;
       }
 
-
-      public Thread newThread(Runnable r, String name) 
+      public void threadCreated(Thread thread)
       {
-          Thread newThread = f.newThread(r, name);
-          setDefaultThreadContextClassLoader(newThread, classLoaderRef);
-          return newThread;
+         if (parent != null)
+            parent.threadCreated(thread);
+         setDefaultThreadContextClassLoader(thread, classloader);
       }
 
-      public Thread newThread(Runnable r) 
+      public void threadReleased(Thread thread)
       {
-          Thread newThread = f.newThread(r);
-          setDefaultThreadContextClassLoader(newThread, classLoaderRef);
-          return newThread;
-      }        
-      
-      public Thread newThread(ThreadGroup group, Runnable r, String name) 
-      {
-          Thread newThread = f.newThread(group, r, name);
-          setDefaultThreadContextClassLoader(newThread, classLoaderRef);
-          return newThread;
+         if (parent != null)
+            parent.threadCreated(thread);
+         setDefaultThreadContextClassLoader(thread, classloader);
       }
-   }
-   
-   private class RestoreTCCLThreadPoolExecutor extends ThreadPoolExecutor
-   {      
-      private final WeakReference<ClassLoader> classLoaderRef;
-      
-      public RestoreTCCLThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
-                                           long keepAliveTime, TimeUnit unit,
-                                           BlockingQueue<Runnable> workQueue,
-                                           WeakReference<ClassLoader> classLoaderRef)
+
+      public ThreadDecorator getParent()
       {
-         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
-         this.classLoaderRef = classLoaderRef;
+         return parent;
       }
 
-      @Override
-      protected void afterExecute(Runnable r, Throwable t)
+      public void setParent(ThreadDecorator parent)
       {
-         try
-         {
-            super.afterExecute(r, t);
-         }
-         finally
-         {
-            setDefaultThreadContextClassLoader(Thread.currentThread(), classLoaderRef);
-         }
-      }      
-   }
-   
-   // Yikes! We recreate all the TP.setProperties stuff here. :(
-   private class TransportThreadPoolProperties
-   {
-      private ThreadNamingPattern thread_naming_pattern = new ThreadNamingPattern("cl");
-      private boolean oob_thread_pool_enabled=true;
-      private int oob_thread_pool_min_threads=2;
-      private int oob_thread_pool_max_threads=10;
-      private long oob_thread_pool_keep_alive_time=30000;
-      private boolean oob_thread_pool_queue_enabled=true;
-      private int oob_thread_pool_queue_max_size=500;
-      private String oob_thread_pool_rejection_policy="Run";
-      
-      private boolean thread_pool_enabled=true;
-      private int thread_pool_min_threads=2;
-      private int thread_pool_max_threads=10;
-      private long thread_pool_keep_alive_time=30000;
-      private boolean thread_pool_queue_enabled=true;
-      private int thread_pool_queue_max_size=500;
-      private String thread_pool_rejection_policy="Run";
-      
-      TransportThreadPoolProperties(Map props)
-      {
-         String str = (String) props.get("thread_naming_pattern");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            thread_naming_pattern=new ThreadNamingPattern(str);
-         }
-         
-         // ================= OOB thread pool =============================
-         
-         str = (String) props.get("oob_thread_pool.enabled");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            oob_thread_pool_enabled=Boolean.valueOf(str).booleanValue();
-         }
-
-         str = (String) props.get("oob_thread_pool.min_threads");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            oob_thread_pool_min_threads=Integer.valueOf(str).intValue();
-         }
-
-         str = (String) props.get("oob_thread_pool.max_threads");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            oob_thread_pool_max_threads=Integer.valueOf(str).intValue();
-         }
-
-         str = (String) props.get("oob_thread_pool.keep_alive_time");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            oob_thread_pool_keep_alive_time=Long.valueOf(str).longValue();
-         }
-
-         str = (String) props.get("oob_thread_pool.queue_enabled");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            oob_thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
-         }
-
-         str = (String) props.get("oob_thread_pool.queue_max_size");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            oob_thread_pool_queue_max_size=Integer.valueOf(str).intValue();
-         }
-
-         str = (String) props.get("oob_thread_pool.rejection_policy");
-         if(str != null) 
-         {
-            str = str.toLowerCase().trim();
-            str = StringPropertyReplacer.replaceProperties(str);
-            oob_thread_pool_rejection_policy = str;
-         }
-         
-         // ================= Regular thread pool =============================
-         
-         str = (String) props.get("thread_pool.enabled");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            thread_pool_enabled=Boolean.valueOf(str).booleanValue();
-         }
-
-         str = (String) props.get("thread_pool.min_threads");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            thread_pool_min_threads=Integer.valueOf(str).intValue();
-         }
-
-         str = (String) props.get("thread_pool.max_threads");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            thread_pool_max_threads=Integer.valueOf(str).intValue();
-         }
-
-         str = (String) props.get("thread_pool.keep_alive_time");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            thread_pool_keep_alive_time=Long.valueOf(str).longValue();
-         }
-
-         str = (String) props.get("thread_pool.queue_enabled");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
-         }
-
-         str = (String) props.get("thread_pool.queue_max_size");
-         if(str != null) 
-         {
-            str = StringPropertyReplacer.replaceProperties(str);
-            thread_pool_queue_max_size=Integer.valueOf(str).intValue();
-         }
-
-         str = (String) props.get("thread_pool.rejection_policy");
-         if(str != null) 
-         {
-            str=str.toLowerCase().trim();
-            str = StringPropertyReplacer.replaceProperties(str);
-            thread_pool_rejection_policy=str;
-         }
+         this.parent = parent;
       }
+      
    }
 
-
-
    private class JmxDeregistrationChannelListener extends ChannelListenerAdapter
    {
       private final String channelId;

Modified: trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml
===================================================================
--- trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml	2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml	2008-06-03 20:15:02 UTC (rev 74015)
@@ -17,7 +17,8 @@
         <property name="exposeChannels">true</property>
         <property name="exposeProtocols">true</property>
         
-        <property name="manageChannelThreadPools">true</property>
+        <property name="manageNewThreadClassLoader">true</property>        
+        <property name="manageReleasedThreadClassLoader">false</property>
 
         <!-- Whether or not to assign a unique name to nodes -->        
         <property name="assignLogicalAddresses">true</property> 

Modified: trunk/component-matrix/pom.xml
===================================================================
--- trunk/component-matrix/pom.xml	2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/component-matrix/pom.xml	2008-06-03 20:15:02 UTC (rev 74015)
@@ -825,7 +825,7 @@
       <dependency>
         <groupId>jgroups</groupId>
         <artifactId>jgroups</artifactId>
-        <version>2.6.2</version>
+        <version>2.6.3.CR1</version>
       </dependency>
       
       <dependency>

Modified: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java	2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java	2008-06-03 20:15:02 UTC (rev 74015)
@@ -75,6 +75,7 @@
    private Channel channel1;
    private Channel channel2;
    private String jgroups_bind_addr;
+   private ClassLoader testLoader;
    
    /**
     * Create a new JChannelFactoryUnitTestCase.
@@ -93,22 +94,26 @@
       
       super.setUp();
       
+      testLoader = new ClassLoader(Thread.currentThread().getContextClassLoader()){};
+      
       String jgroups_bind_addr = System.getProperty("jgroups.bind_addr");
       if (jgroups_bind_addr == null)
       {
          System.setProperty("jbosstest.cluster.node0", System.getProperty("jbosstest.cluster.node0", "localhost"));
       }
       
-      factory1 = new JChannelFactory();
+      factory1 = new TestClassLoaderJChannelFactory();
       factory1.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
       factory1.setAssignLogicalAddresses(false);
       factory1.setExposeChannels(false);
+      factory1.setManageReleasedThreadClassLoader(true);
       factory1.create();
       factory1.start();
-      factory2 = new JChannelFactory();
+      factory2 = new TestClassLoaderJChannelFactory();
       factory2.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
       factory2.setAssignLogicalAddresses(false);
       factory2.setExposeChannels(false);
+      factory2.setManageReleasedThreadClassLoader(true);
       factory2.create();
       factory2.start();
    }
@@ -117,6 +122,8 @@
    {
       super.tearDown();
       
+      testLoader = null;
+      
       if (jgroups_bind_addr == null)
          System.clearProperty("jgroups.bind_addr");
       
@@ -215,14 +222,13 @@
       ClassLoader ours = Thread.currentThread().getContextClassLoader();
       
       // The classloader we want channel threads to use
-      ClassLoader cl = new ClassLoader(ours){};
-      ClassLoaderLeakHandler handler = new ClassLoaderLeakHandler(cl, semaphore, runnerGroup);
+      ClassLoaderLeakHandler handler = new ClassLoaderLeakHandler(testLoader, semaphore, runnerGroup);
       
       MessageDispatcher[] dispatchers = new MessageDispatcher[2];
       
-      Thread.currentThread().setContextClassLoader(cl);    
-      try
-      {
+//      Thread.currentThread().setContextClassLoader(testLoader);    
+//      try
+//      {
          channel1 = (Channel) factoryMeth.invoke(factory1, factory1Args);
          dispatchers[0] = new MessageDispatcher(channel1, handler, handler, handler);
          
@@ -234,11 +240,11 @@
          
          channel2.connect("leaktest");
          assertEquals("No classloader leak on channel2 connect", null, handler.getLeakedClassLoader());
-      }
-      finally
-      {
-         Thread.currentThread().setContextClassLoader(ours);
-      }
+//      }
+//      finally
+//      {
+//         Thread.currentThread().setContextClassLoader(ours);
+//      }
       
       log.info("Channels connected");
       
@@ -369,6 +375,7 @@
       private final Semaphore semaphore;
       private final ClassLoader expected;
       private final ThreadGroup runnerGroup;
+      private final Thread main;
       private ClassLoader leakedClassLoader;
       private final int numPermits;
       
@@ -378,6 +385,7 @@
          this.semaphore = semaphore;
          this.runnerGroup = runnerGroup;
          this.numPermits = this.semaphore.availablePermits();
+         this.main = Thread.currentThread();
       }
 
       public Object handle(Message msg)
@@ -429,7 +437,8 @@
          if (leakedClassLoader == null) // ignore msgs once we found a leak
          {
             // ignore runner threads that loop all the way back up 
-            if (Thread.currentThread().getThreadGroup().equals(runnerGroup))
+            Thread current = Thread.currentThread();
+            if (current == main || current.getThreadGroup().equals(runnerGroup))
             {               
                return; 
             }
@@ -445,4 +454,16 @@
       }
       
    }
+   
+   private class TestClassLoaderJChannelFactory
+   extends JChannelFactory
+   {
+
+      @Override
+      protected ClassLoader getDefaultChannelThreadContextClassLoader()
+      {
+         return testLoader;
+      }
+      
+   }
 }

Modified: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java	2008-06-03 19:40:25 UTC (rev 74014)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java	2008-06-03 20:15:02 UTC (rev 74015)
@@ -85,6 +85,7 @@
       factory1.setNodeAddress(InetAddress.getByName("localhost"));
       factory1.setNamingServicePort(123);
       factory1.setExposeChannels(false);
+      factory1.setManageReleasedThreadClassLoader(true);
       factory1.create();
       factory1.start();
       factory2 = new JChannelFactory();
@@ -94,6 +95,7 @@
       factory2.setNamingServicePort(456);
       factory2.setNodeName("node1");
       factory2.setExposeChannels(false);
+      factory2.setManageReleasedThreadClassLoader(true);
       factory2.create();
       factory2.start();
    }
@@ -193,17 +195,9 @@
       
       Executor exec = tp1.getDefaultThreadPool();
       assertNotNull(exec);
-      if (exec instanceof ThreadPoolExecutor)
-      {
-         assertTrue(((ThreadPoolExecutor) exec).getRejectedExecutionHandler() instanceof ThreadPoolExecutor.DiscardPolicy);
-      }
       
       exec = tp1.getOOBThreadPool();
       assertNotNull(exec);
-      if (exec instanceof ThreadPoolExecutor)
-      {
-         assertTrue(((ThreadPoolExecutor) exec).getRejectedExecutionHandler() instanceof ThreadPoolExecutor.AbortPolicy);
-      }
       
       // Confirm that the no-pool config doesn't create a pool
       channel2 = factory1.createChannel("nonconcurrent1");




More information about the jboss-cvs-commits mailing list