[jboss-cvs] JBossAS SVN: r72317 - in trunk: cluster/src/resources/jgroups and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Apr 16 16:48:54 EDT 2008


Author: bstansberry at jboss.com
Date: 2008-04-16 16:48:54 -0400 (Wed, 16 Apr 2008)
New Revision: 72317

Added:
   trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java
   trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java
   trunk/testsuite/src/resources/cluster/channelfactory/
   trunk/testsuite/src/resources/cluster/channelfactory/stacks.xml
Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/
   trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
   trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
   trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml
   trunk/testsuite/local.properties
Log:
[JBAS-5414] Ensure app classloaders don't leak to JGroups threads
[JBAS-5442] AS ChannelFactory never returns MuxChannel
[JBAS-5451] Fix JMX registration handling by ChannelFactory


Property changes on: trunk/cluster/src/main/org/jboss/ha/framework/server
___________________________________________________________________
Name: svn:ignore
   + DelegatingChannel.java


Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2008-04-16 20:46:57 UTC (rev 72316)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2008-04-16 20:48:54 UTC (rev 72317)
@@ -267,20 +267,23 @@
    {
       logHistory ("Starting partition");
       
-      log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
-            " using stack " + getMultiplexerStack());
-
-      channel = createMuxChannel(config);
+      if (channel == null || !channel.isOpen())
+      {
+         log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
+               " using stack " + getMultiplexerStack());
+   
+         channel = createMuxChannel(config);
+         
+         channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
+         channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
+      }
       
-      channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
-      channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
-      
       log.info("Initializing partition " + getPartitionName());
       logHistory ("Initializing partition " + getPartitionName());
       
       dispatcher = new RpcHandler(channel, null, null, new Object(), config.getDeadlockDetection());
       
-      // Subscribe to events generated by the org.jgroups. protocol stack
+      // Subscribe to events generated by the channel
       log.debug("setMembershipListener");
       dispatcher.setMembershipListener(this);
       log.debug("setMessageListener");

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java	2008-04-16 20:46:57 UTC (rev 72316)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java	2008-04-16 20:48:54 UTC (rev 72317)
@@ -22,18 +22,49 @@
 
 package org.jboss.ha.framework.server;
 
+import java.lang.ref.WeakReference;
 import java.net.InetAddress;
 import java.rmi.dgc.VMID;
 import java.rmi.server.UID;
+import java.security.AccessController;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
+import javax.management.MBeanRegistration;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.jboss.logging.Logger;
 import org.jboss.system.ServiceMBean;
 import org.jboss.system.server.ServerConfigUtil;
+import org.jboss.util.StringPropertyReplacer;
+import org.jboss.util.loading.ContextClassLoaderSwitcher;
 import org.jgroups.Channel;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelListenerAdapter;
 import org.jgroups.Event;
 import org.jgroups.Global;
-import org.jgroups.mux.MuxChannel;
+import org.jgroups.JChannel;
+import org.jgroups.conf.ConfiguratorFactory;
+import org.jgroups.conf.ProtocolData;
+import org.jgroups.conf.ProtocolStackConfigurator;
+import org.jgroups.jmx.JmxConfigurator;
+import org.jgroups.protocols.TP;
 import org.jgroups.stack.IpAddress;
+import org.jgroups.stack.Protocol;
+import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.ThreadFactory;
+import org.jgroups.util.ThreadNamingPattern;
+import org.jgroups.util.Util;
 
 /**
  * Extension to the JGroups JChannelFactory that supports the addition
@@ -44,10 +75,12 @@
  * @version $Revision: 69425 $
  */
 public class JChannelFactory extends org.jgroups.JChannelFactory
-      implements JChannelFactoryMBean
+      implements JChannelFactoryMBean, MBeanRegistration
 {
    protected static Logger log = Logger.getLogger(JChannelFactory.class);
    
+   private static final String UNSHARED_TRANSPORT_NAME_BASE = "unnamed_";
+   
    private static final int CREATED = ServiceMBean.CREATED;
    private static final int STARTING = ServiceMBean.STARTING;
    private static final int STARTED = ServiceMBean.STARTED;
@@ -59,9 +92,49 @@
    private InetAddress nodeAddress;
    private String nodeName;
    private int namingServicePort = -1;
-   private int state;
+   private int state = ServiceMBean.UNREGISTERED;
    private boolean assignLogicalAddresses = true;
+   private boolean manageChannelThreadPools = true;
+   private boolean addMissingSingletonName = true;
+   private boolean domainSet;
+   private final ContextClassLoaderSwitcher classLoaderSwitcher;
+   private final Set<String> registeredChannels = new HashSet<String>();
 
+   public JChannelFactory()
+   {
+      this.classLoaderSwitcher = (ContextClassLoaderSwitcher) AccessController.doPrivileged(ContextClassLoaderSwitcher.INSTANTIATOR);
+   }
+   
+
+   @Override
+   public Channel createChannel() throws ChannelException
+   {
+      // Use type 'Object' so we call createChannel(Object) not createChannel(String)
+//      Object stack = JChannel.DEFAULT_PROTOCOL_STACK;
+//      return createChannel(stack);
+      throw new UnsupportedOperationException("No-arg createChannel() is not supported");
+   }
+
+   @Override
+   public Channel createChannel(Object properties) throws ChannelException
+   {
+      Channel channel = super.createChannel(properties);
+      
+      if (manageChannelThreadPools)
+      {
+         fixChannelThreadPools(channel);
+      }
+      
+      if (assignLogicalAddresses)
+      {
+         setChannelUniqueId(channel);
+      }
+      
+      // can't register in JMX as we don't have a channel name
+      
+      return channel;
+   }
+
    /**
     * {@inheritDoc}
     * 
@@ -74,22 +147,30 @@
     * the first time a channel is created.
     */
    @Override
-   public Channel createMultiplexerChannel(String stack_name, String id, boolean register_for_state_transfer, String substate_id) throws Exception
+   public Channel createChannel(String stack_name) throws Exception
    {
-      // Due to problems with coordinated state transfer, we suppress register_for_state_transfer usage.
-      // TODO revert to normal if coordinated state transfer is fixed
-//      Channel channel = super.createMultiplexerChannel(stack_name, id, register_for_state_transfer, substate_id);
-      Channel channel = createSharedTransportChannel(stack_name);
+      String props=stack_name != null? getConfig(stack_name) : null;
+      if (props == null)
+      {
+         log.warn("No protocol stack found with name " + stack_name +
+                  "; creating default channel");
+         return createChannel();
+      }
       
-      if (channel == null)      
+      JChannel channel = new JChannel(props);
+      
+      if (manageChannelThreadPools)
       {
-         log.debug("Config for " + stack_name + " does not include singleton_name; creating MuxChannel. Config is " + getConfig(stack_name));
-         channel = super.createMultiplexerChannel(stack_name, id, false, null);
+         fixChannelThreadPools(channel);
       }
       
       if (assignLogicalAddresses)
+      {
          setChannelUniqueId(channel);
+      }
       
+      // can't register in JMX as we don't have a channel name
+      
       return channel;
    }
    
@@ -107,9 +188,80 @@
    @Override
    public Channel createMultiplexerChannel(String stack_name, String id) throws Exception
    {
-      return createMultiplexerChannel(stack_name, id, false, null);
+      String configStr = getConfig(stack_name);
+      
+      if (configStr == null)
+         throw new IllegalStateException("Unknown stack_name " + stack_name);
+      
+      ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
+      Map tpProps = getTransportProperties(config);
+      
+      if (!tpProps.containsKey(Global.SINGLETON_NAME))
+      {
+         if (addMissingSingletonName)
+         {
+            String singletonName = UNSHARED_TRANSPORT_NAME_BASE + stack_name;
+            
+            log.warn("Config for " + stack_name + " does not include " +
+                      "singleton_name; adding a name of " + singletonName +
+                      ". You should configure a singleton_name for this stack.");
+            
+            config = addSingletonName(config, singletonName);
+            log.debug("Stack config after adding singleton_name is " + config.getProtocolStackString());
+            tpProps = getTransportProperties(config);                       
+         }
+         else
+         {
+            throw new IllegalStateException("Config for " + stack_name + " does not include " +
+                      "singleton_name and MuxChannels are not supported.");
+         }
+      }
+      
+      JChannel channel = new JChannel(config);
+      
+      if (manageChannelThreadPools)
+      {
+         fixChannelThreadPools(channel, tpProps);
+      }
+      
+      if (assignLogicalAddresses)
+      {
+         setChannelUniqueId(channel);
+      }
+      
+      if (isExposeChannels() && id != null && id.length() > 0)
+      {
+         registerChannel(channel, id);
+      }
+      
+      return channel;
+   }  
+
+   
+   /**
+    * {@inheritDoc}
+    * 
+    * Overrides the superclass version by generating a unique node id
+    * and passing it down the Channel as additional_data and by suppressing the 
+    * <code>register_for_state_transfer</code> param.
+    * <p/>
+    * The additional data is the value of {@link #getNodeName()} which either
+    * can be injected as part of configuration or will be calculated 
+    * the first time a channel is created.
+    */
+   @Override
+   public Channel createMultiplexerChannel(String stack_name, String id, boolean register_for_state_transfer, String substate_id) throws Exception
+   {
+      return createMultiplexerChannel(stack_name, id);
    }
-
+   
+   @Override
+   public void setDomain(String domain)
+   {
+      super.setDomain(domain);
+      domainSet = true;
+   }
+   
    public InetAddress getNodeAddress()
    {
       return nodeAddress;
@@ -175,6 +327,26 @@
       this.assignLogicalAddresses = logicalAddresses;
    }
 
+   public boolean getManageChannelThreadPools()
+   {
+      return manageChannelThreadPools;
+   }
+
+   public void setManageChannelThreadPools(boolean manageChannelThreadPools)
+   {
+      this.manageChannelThreadPools = manageChannelThreadPools;
+   }
+
+   public boolean getAddMissingSingletonName()
+   {
+      return addMissingSingletonName;
+   }
+
+   public void setAddMissingSingletonName(boolean addMissingSingletonName)
+   {
+      this.addMissingSingletonName = addMissingSingletonName;
+   }
+
    @Override
    public void create() throws Exception
    {
@@ -200,7 +372,6 @@
       }
       
       log.debug("Created JChannelFactory");
-      super.create();
    }
 
    @Override
@@ -283,7 +454,13 @@
       
       try
       {
-         super.destroy();
+         // DON'T call super.destroy() as that may deregister the JMX proxy
+         // to this pojo service, leading to ugliness when the proxy is destroyed
+         
+         for (String channelId : registeredChannels)
+         {
+            unregister(channelId);
+         }
       }
       catch (Throwable t)
       {
@@ -292,46 +469,46 @@
       state = DESTROYED;
       log.debug("Destroyed JChannelFactory");
    }
-   
-   /**
-    * Checks whether the transport protocol configuration for <code>stack_name</code>
-    * supports the shared use of the protocol between channels, creating a
-    * returning a channel if it does.
-    * <p>
-    * Determines whether a shared transport is possible by looking for the
-    * {@link Global#SINGLETON_NAME} parameter in the configuration of the
-    * first protocol in the string returned by {@link #getConfig(String)}.
-    * </p> 
-    * 
-    * @param stack_name the name of the protocol stack
-    * @return a channel configured per <code>stack_name</code>, or <code>null</code>
-    *         if <code>stack_name</code> is not a register protocol stack or
-    *         its transport protocol configuration does not support sharing.
-    * 
-    * @throws Exception
-    */
-   public Channel createSharedTransportChannel(String stack_name) throws Exception
+
+   // ------------------------------------------------------- MBeanRegistration
+
+   public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception
    {
-      Channel ch = null;
-      String config = getConfig(stack_name);
-      if (config != null)
+      setServer(server);
+      if (!domainSet || getDomain() == null)
       {
-         // Format of string is 
-         // PROTOCOL1(param1=value1;param2=value2):PROTOCOL2(param1=value1;param2=value2):
-         // PROTOCOL1 is always the transport protocol.
-         // We look for the singleton_name param in the transport protocol
-         int tp_idx = config.indexOf("):");
-         int singleton_idx = config.indexOf(Global.SINGLETON_NAME);
-         if (singleton_idx > -1 && singleton_idx < tp_idx)
-         {
-            ch = createChannel(stack_name);
-         }
-      }      
-      return ch;
+         setDomain(name.getDomain());
+      }
+      return name;
    }
 
-   private void setChannelUniqueId(Channel channel) throws Exception
+
+   public void postRegister(Boolean registrationDone)
    {
+      if (registrationDone != null && registrationDone.booleanValue()
+            && state == ServiceMBean.UNREGISTERED)
+      {
+         state = ServiceMBean.REGISTERED;
+      }
+   }
+
+
+   public void preDeregister() throws Exception
+   { 
+   }
+
+   public void postDeregister()
+   { 
+      setServer(null);
+      if (state == ServiceMBean.DESTROYED)
+         state = ServiceMBean.UNREGISTERED;
+   }
+
+   // ----------------------------------------------------------------- Private
+
+
+   private void setChannelUniqueId(Channel channel)
+   {
       IpAddress address = (IpAddress) channel.getLocalAddress();
       if (address == null)
       {
@@ -349,14 +526,8 @@
       }
       else if (address.getAdditionalData() == null)
       {
-         Channel testee = channel;
-         if (channel instanceof MuxChannel)
+         if (channel.isConnected())
          {
-            testee = ((MuxChannel) channel).getChannel();
-         }
-         
-         if (testee.isConnected())
-         {
             throw new IllegalStateException("Underlying JChannel was " +
                     "connected before additional_data was set");
          }
@@ -374,7 +545,7 @@
       return ServiceMBean.states[state];
    }
 
-   private String generateUniqueNodeName () throws Exception
+   private String generateUniqueNodeName ()
    {
       // we first try to find a simple meaningful name:
       // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
@@ -420,4 +591,571 @@
       return hostIP + ":" + uid;
    }
    
+   private void fixChannelThreadPools(Channel channel) throws ChannelException
+   {
+      if (channel instanceof JChannel)
+      {
+         JChannel jchannel = (JChannel) channel;
+         String configStr = jchannel.getProperties();
+         ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
+         Map tpProps = getTransportProperties(config);
+         fixChannelThreadPools(jchannel, tpProps);
+      }
+      else
+      {
+         log.debug("Cannot fix thread pools for unknown Channel type " + channel.getClass().getName());
+      }
+   }
+   
+   private Map getTransportProperties(ProtocolStackConfigurator config)
+   {
+      Map tpProps = null;
+      try
+      {
+         ProtocolData[] protocols=config.getProtocolStack();
+         ProtocolData transport=protocols[0];
+         tpProps = transport.getParameters();
+      }
+      catch (UnsupportedOperationException uoe)
+      {
+         // JGroups version hasn't implemented ProtocolStackConfigurator.getProtocolStack()
+         // So we do it ourselves
+         String configStr = config.getProtocolStackString();
+         String tpConfigStr = configStr.substring(configStr.indexOf('(') + 1, configStr.indexOf(')'));
+         String[] params = tpConfigStr.split(";");
+         tpProps = new HashMap();
+         for (String param : params)
+         {
+            String[] keyVal = param.split("=");
+            if (keyVal.length != 2)
+               throw new IllegalStateException("Invalid parameter " + param + " in stack " + configStr);
+            tpProps.put(keyVal[0], keyVal[1]);
+         }
+      }
+      
+      return tpProps;
+   }
+  
+   private ProtocolStackConfigurator addSingletonName(ProtocolStackConfigurator orig, String singletonName)
+      throws ChannelException
+   {
+      ProtocolStackConfigurator result = null;
+      try
+      {
+         ProtocolData[] protocols=orig.getProtocolStack();
+         ProtocolData transport=protocols[0];
+         Map tpProps = transport.getParameters();
+         tpProps.put(Global.SINGLETON_NAME, singletonName);
+         // we've now updated the state of orig; just return it
+         result = orig;
+      }
+      catch (UnsupportedOperationException uoe)
+      {
+         // JGroups version hasn't implemented ProtocolStackConfigurator.getProtocolStack()
+         // So we do things manually via string manipulation         
+         String config = orig.getProtocolStackString();
+         int idx = config.indexOf('(') + 1;
+         StringBuilder builder = new StringBuilder(config.substring(0, idx));
+         builder.append(Global.SINGLETON_NAME);
+         builder.append('=');
+         builder.append(singletonName);
+         builder.append(';');
+         builder.append(config.substring(idx));
+         
+         result = ConfiguratorFactory.getStackConfigurator(builder.toString());
+      }
+      
+      return result;
+   }
+   
+   private void fixChannelThreadPools(JChannel channel, Map props)
+   {
+      if (!channel.isConnected())
+      {
+         TransportThreadPoolProperties tpProps = new TransportThreadPoolProperties(props);
+         
+         ProtocolStack stack = channel.getProtocolStack();
+         List<Protocol> protocols = stack.getProtocols();
+         for (int i = protocols.size() - 1; i >= 0; i--)
+         {
+            if (protocols.get(i) instanceof TP)
+            {
+               TP tp = (TP) protocols.get(i);
+               fixChannelThreadPools(tp, stack, tpProps);
+               break;
+            }
+         }
+      }
+   }
+   
+   private void fixChannelThreadPools(TP tp, ProtocolStack stack, TransportThreadPoolProperties tpProps)
+   {      
+      ClassLoader tccl = classLoaderSwitcher.getContextClassLoader();
+      if (tccl == null)
+      {
+         tccl = getClass().getClassLoader();
+      }
+      WeakReference<ClassLoader> tcclRef = new WeakReference(tccl);
+      
+      if (tpProps.thread_pool_enabled && tp.getDefaultThreadPool() == null) 
+      {
+         BlockingQueue<Runnable> thread_pool_queue = null;
+         if (tpProps.thread_pool_queue_enabled)
+         {
+             thread_pool_queue=new LinkedBlockingQueue<Runnable>(tpProps.thread_pool_queue_max_size);
+         }
+         else
+         {
+             thread_pool_queue=new SynchronousQueue<Runnable>();
+         }
+         ExecutorService thread_pool = createThreadPool(tpProps.thread_pool_min_threads, 
+                                                        tpProps.thread_pool_max_threads, 
+                                                        tpProps.thread_pool_keep_alive_time,
+                                                        tpProps.thread_pool_rejection_policy, 
+                                                        thread_pool_queue, 
+                                                        "Incoming",
+                                                        tpProps.thread_naming_pattern,
+                                                        tcclRef);
+         tp.setDefaultThreadPool(thread_pool);
+         
+         log.debug("Fixed default thread pool for " + tp);
+      }
+      
+      if (tpProps.oob_thread_pool_enabled && tp.getOOBThreadPool() == null) 
+      {
+         BlockingQueue<Runnable> oob_thread_pool_queue = null;
+         if(tpProps.oob_thread_pool_queue_enabled)
+         {
+             oob_thread_pool_queue=new LinkedBlockingQueue<Runnable>(tpProps.oob_thread_pool_queue_max_size);
+         }
+         else
+         {
+             oob_thread_pool_queue=new SynchronousQueue<Runnable>();
+         }
+         ExecutorService oob_thread_pool = createThreadPool(tpProps.oob_thread_pool_min_threads, 
+                                                            tpProps.oob_thread_pool_max_threads, 
+                                                            tpProps.oob_thread_pool_keep_alive_time,
+                                                            tpProps.oob_thread_pool_rejection_policy, 
+                                                            oob_thread_pool_queue, 
+                                                            "OOB",
+                                                            tpProps.thread_naming_pattern,
+                                                            tcclRef);
+         
+         tp.setOOBThreadPool(oob_thread_pool);
+         
+         log.debug("Fixed OOB thread pool for " + tp);
+      }
+      
+      ThreadFactory stackFactoryBase = ProtocolStack.newThreadFactory(Util.getGlobalThreadGroup(),"",false);
+      ThreadFactory stackFactory = new ClassLoaderControlThreadFactory(stackFactoryBase, tcclRef);
+      stack.setThreadFactory(stackFactory);
+      
+      log.debug("Fixed thread factory for " + stack);
+      
+      ThreadFactory timerFactoryBase = ProtocolStack.newThreadFactory(new ThreadGroup(Util.getGlobalThreadGroup(), "Timers"), "Timer", true);
+      ThreadFactory timerFactory = new ClassLoaderControlThreadFactory(timerFactoryBase, tcclRef);
+      stack.setTimerThreadFactory(timerFactory);
+      
+      log.debug("Fixed timer thread factory for " + stack);
+   }
+
+   private ExecutorService createThreadPool(int min_threads, int max_threads, 
+                                            long keep_alive_time, String rejection_policy,
+                                            BlockingQueue<Runnable> queue, final String thread_name,
+                                            ThreadNamingPattern thread_naming_pattern,
+                                            WeakReference<ClassLoader> tcclRef) 
+   {
+       ThreadPoolExecutor pool=new RestoreTCCLThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue, tcclRef);
+       
+       ThreadGroup pool_thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "Thread Pools");
+       // FIXME: replace with call to ProtocolStack.newIDThreadFactory(...) 
+       // when we integrate JGroups 2.6.3 or later       
+       ThreadFactory base = new IdThreadFactory(new DefaultThreadFactory(pool_thread_group, thread_name, false), 
+                                                thread_naming_pattern);
+       ThreadFactory threadFactory = new ClassLoaderControlThreadFactory(base, tcclRef);
+       pool.setThreadFactory(threadFactory);
+
+       if(rejection_policy != null) {
+           if(rejection_policy.equals("abort"))
+               pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+           else if(rejection_policy.equals("discard"))
+               pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+           else if(rejection_policy.equals("discardoldest"))
+               pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
+           else
+               pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+       }
+
+       return pool;
+   }  
+   
+   /** 
+    * Sets the context class loader on <code>thread</code> to the classloader
+    * in effect when this factory was instantiated.
+    * 
+    * @param thread the thread to set
+    */
+   private void setDefaultThreadContextClassLoader(Thread thread, WeakReference<ClassLoader> classLoaderRef)
+   {
+      ClassLoader cl = classLoaderRef.get();
+      if (cl == null)
+         cl = getClass().getClassLoader();
+      classLoaderSwitcher.setContextClassLoader(thread, cl);
+   }
+   
+   private void registerChannel(JChannel ch, String channelId) throws Exception 
+   {
+      if(getServer() != null)
+      {
+         // Register for channel closed notification so we can unregister
+         JmxDeregistrationChannelListener listener = new JmxDeregistrationChannelListener(channelId);
+         ch.addChannelListener(listener);
+         JmxConfigurator.registerChannel(ch, getServer(), getDomain(), channelId, isExposeProtocols());
+         synchronized (registeredChannels)
+         {
+            registeredChannels.add(channelId);
+         }
+      }
+   }
+   
+   private void unregister(String channelId) 
+   {
+      if(getServer() != null)
+      {
+         String oname = getDomain() + ":type=channel,cluster=" + channelId;
+         try
+         {
+            getServer().unregisterMBean(new ObjectName(oname));
+            oname = getDomain() + ":type=protocol,cluster=" + channelId + ",*";
+            JmxConfigurator.unregister(getServer(), oname);
+            synchronized (registeredChannels)
+            {
+               registeredChannels.remove(channelId);
+            }
+         }
+         catch(Exception e)
+         {
+            log.error("failed unregistering " + oname, e);
+         }
+         
+         
+      }
+   }
+
+   /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
+   static class DefaultThreadFactory implements ThreadFactory
+   {       
+       private final ThreadGroup group;
+       private final String baseName;
+       private final boolean createDaemons;   
+       
+       public DefaultThreadFactory(ThreadGroup group,
+                                   String baseName, 
+                                   boolean createDaemons)
+       {
+           this.group = group;
+           this.baseName = baseName;
+           this.createDaemons = createDaemons;               
+       }
+             
+       public Thread newThread(Runnable r, String name) 
+       {
+           return newThread(group, r, name);
+       }      
+
+       public Thread newThread(Runnable r) 
+       {
+           return newThread(group, r, baseName);                      
+       }
+       
+       public Thread newThread(ThreadGroup group, Runnable r, String name) 
+       {
+           Thread thread = new Thread(group, r, name);
+           thread.setDaemon(createDaemons);
+           return thread;
+       }
+   } 
+   
+   /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
+   protected static class PatternedThreadFactory implements ThreadFactory 
+   {
+       protected final ThreadFactory f;
+       protected ThreadNamingPattern pattern;
+
+       public PatternedThreadFactory(ThreadFactory factory, ThreadNamingPattern pattern)
+       {
+           f = factory;
+           this.pattern = pattern;           
+       }
+       
+       public void setThreadNamingPattern(ThreadNamingPattern pattern) 
+       {
+           this.pattern = pattern;           
+       }
+
+       public Thread newThread(Runnable r, String name) 
+       {
+           Thread newThread = f.newThread(r, name);
+           renameThread(newThread);
+           return newThread;
+       }
+
+       public Thread newThread(Runnable r) 
+       {
+           Thread newThread = f.newThread(r);
+           renameThread(newThread);
+           return newThread;
+       }        
+       
+       public Thread newThread(ThreadGroup group, Runnable r, String name) 
+       {
+           Thread newThread = f.newThread(group, r, name);
+           renameThread(newThread);
+           return newThread;
+       }
+
+       protected void renameThread(Thread new_thread) 
+       {
+           if(pattern!=null)
+               pattern.renameThread(new_thread);
+       }
+   }
+               
+   /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
+   private static class IdThreadFactory extends PatternedThreadFactory 
+   {
+       short current_id=0;
+
+       public IdThreadFactory(ThreadFactory factory, ThreadNamingPattern pattern) 
+       {
+           super(factory, pattern);
+       }              
+
+       protected void renameThread(Thread new_thread) 
+       {
+           if(pattern != null) 
+           {
+               short id;
+               synchronized(this) 
+               {
+                   id=++current_id;
+               }
+               pattern.renameThread(new_thread.getName() + "-" + id, new_thread);
+           }
+       }
+   }
+   
+   private class ClassLoaderControlThreadFactory implements ThreadFactory
+   {
+      private final ThreadFactory f;
+      private final WeakReference<ClassLoader> classLoaderRef;
+
+      public ClassLoaderControlThreadFactory(ThreadFactory factory, WeakReference<ClassLoader> classLoaderRef)
+      {
+          this.f = factory;
+          this.classLoaderRef = classLoaderRef;
+      }
+
+
+      public Thread newThread(Runnable r, String name) 
+      {
+          Thread newThread = f.newThread(r, name);
+          setDefaultThreadContextClassLoader(newThread, classLoaderRef);
+          return newThread;
+      }
+
+      public Thread newThread(Runnable r) 
+      {
+          Thread newThread = f.newThread(r);
+          setDefaultThreadContextClassLoader(newThread, classLoaderRef);
+          return newThread;
+      }        
+      
+      public Thread newThread(ThreadGroup group, Runnable r, String name) 
+      {
+          Thread newThread = f.newThread(group, r, name);
+          setDefaultThreadContextClassLoader(newThread, classLoaderRef);
+          return newThread;
+      }
+   }
+   
+   private class RestoreTCCLThreadPoolExecutor extends ThreadPoolExecutor
+   {      
+      private final WeakReference<ClassLoader> classLoaderRef;
+      
+      public RestoreTCCLThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
+                                           long keepAliveTime, TimeUnit unit,
+                                           BlockingQueue<Runnable> workQueue,
+                                           WeakReference<ClassLoader> classLoaderRef)
+      {
+         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+         this.classLoaderRef = classLoaderRef;
+      }
+
+      @Override
+      protected void afterExecute(Runnable r, Throwable t)
+      {
+         try
+         {
+            super.afterExecute(r, t);
+         }
+         finally
+         {
+            setDefaultThreadContextClassLoader(Thread.currentThread(), classLoaderRef);
+         }
+      }      
+   }
+   
+   // Yikes! We recreate all the TP.setProperties stuff here. :(
+   private class TransportThreadPoolProperties
+   {
+      private ThreadNamingPattern thread_naming_pattern = new ThreadNamingPattern("cl");
+      private boolean oob_thread_pool_enabled=true;
+      private int oob_thread_pool_min_threads=2;
+      private int oob_thread_pool_max_threads=10;
+      private long oob_thread_pool_keep_alive_time=30000;
+      private boolean oob_thread_pool_queue_enabled=true;
+      private int oob_thread_pool_queue_max_size=500;
+      private String oob_thread_pool_rejection_policy="Run";
+      
+      private boolean thread_pool_enabled=true;
+      private int thread_pool_min_threads=2;
+      private int thread_pool_max_threads=10;
+      private long thread_pool_keep_alive_time=30000;
+      private boolean thread_pool_queue_enabled=true;
+      private int thread_pool_queue_max_size=500;
+      private String thread_pool_rejection_policy="Run";
+      
+      TransportThreadPoolProperties(Map props)
+      {
+         String str = (String) props.get("thread_naming_pattern");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            thread_naming_pattern=new ThreadNamingPattern(str);
+         }
+         
+         // ================= OOB thread pool =============================
+         
+         str = (String) props.get("oob_thread_pool.enabled");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            oob_thread_pool_enabled=Boolean.valueOf(str).booleanValue();
+         }
+
+         str = (String) props.get("oob_thread_pool.min_threads");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            oob_thread_pool_min_threads=Integer.valueOf(str).intValue();
+         }
+
+         str = (String) props.get("oob_thread_pool.max_threads");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            oob_thread_pool_max_threads=Integer.valueOf(str).intValue();
+         }
+
+         str = (String) props.get("oob_thread_pool.keep_alive_time");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            oob_thread_pool_keep_alive_time=Long.valueOf(str).longValue();
+         }
+
+         str = (String) props.get("oob_thread_pool.queue_enabled");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            oob_thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
+         }
+
+         str = (String) props.get("oob_thread_pool.queue_max_size");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            oob_thread_pool_queue_max_size=Integer.valueOf(str).intValue();
+         }
+
+         str = (String) props.get("oob_thread_pool.rejection_policy");
+         if(str != null) 
+         {
+            str = str.toLowerCase().trim();
+            str = StringPropertyReplacer.replaceProperties(str);
+            oob_thread_pool_rejection_policy = str;
+         }
+         
+         // ================= Regular thread pool =============================
+         
+         str = (String) props.get("thread_pool.enabled");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            thread_pool_enabled=Boolean.valueOf(str).booleanValue();
+         }
+
+         str = (String) props.get("thread_pool.min_threads");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            thread_pool_min_threads=Integer.valueOf(str).intValue();
+         }
+
+         str = (String) props.get("thread_pool.max_threads");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            thread_pool_max_threads=Integer.valueOf(str).intValue();
+         }
+
+         str = (String) props.get("thread_pool.keep_alive_time");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            thread_pool_keep_alive_time=Long.valueOf(str).longValue();
+         }
+
+         str = (String) props.get("thread_pool.queue_enabled");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
+         }
+
+         str = (String) props.get("thread_pool.queue_max_size");
+         if(str != null) 
+         {
+            str = StringPropertyReplacer.replaceProperties(str);
+            thread_pool_queue_max_size=Integer.valueOf(str).intValue();
+         }
+
+         str = (String) props.get("thread_pool.rejection_policy");
+         if(str != null) 
+         {
+            str=str.toLowerCase().trim();
+            str = StringPropertyReplacer.replaceProperties(str);
+            thread_pool_rejection_policy=str;
+         }
+      }
+   }
+
+
+
+   private class JmxDeregistrationChannelListener extends ChannelListenerAdapter
+   {
+      private final String channelId;
+      
+      JmxDeregistrationChannelListener(String channelId)
+      {
+         this.channelId = channelId;
+      }
+      
+      public void channelClosed(Channel channel) 
+      {
+         unregister(channelId);
+      }            
+   }
+   
 }

Modified: trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml
===================================================================
--- trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml	2008-04-16 20:46:57 UTC (rev 72316)
+++ trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml	2008-04-16 20:48:54 UTC (rev 72317)
@@ -15,7 +15,12 @@
         <property name="server"><inject bean="JMXKernel" property="mbeanServer"/></property>
         <property name="domain">jboss.jgroups</property>
         <property name="exposeChannels">true</property>
-        <property name="exposeProtocols">true</property>
+        <property name="exposeProtocols">true</property>
+        
+        <property name="manageChannelThreadPools">true</property>
+
+        <!-- Whether or not to assign a unique name to nodes -->        
+        <property name="assignLogicalAddresses">true</property> 
         
         <!-- The address used to determine the node name  -->
         <property name="nodeAddress">${jboss.bind.address}</property>

Modified: trunk/testsuite/local.properties
===================================================================
--- trunk/testsuite/local.properties	2008-04-16 20:46:57 UTC (rev 72316)
+++ trunk/testsuite/local.properties	2008-04-16 20:48:54 UTC (rev 72317)
@@ -49,12 +49,12 @@
 # (even at the same machine). Actually what we needed are just node0 and node1
 # ips and the rest are optional. 
 #
-#node0=${env.MYTESTIP_1}
+node0=${env.MYTESTIP_1}
 #node0.http.url=http://192.168.1.103:8080
 #node0.jndiurl=jnp://192.168.1.103:1099
 #node0.hajndi.url=jnp://192.168.1.103:1100
 
-#node1=${env.MYTESTIP_2}
+node1=${env.MYTESTIP_2}
 #node1.http.url=http://192.168.1.113:8080
 #node1.jndiurl=jnp://192.168.1.113:1099
 #node1.hajndi.url=jnp://192.168.1.113:1100

Added: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java	                        (rev 0)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java	2008-04-16 20:48:54 UTC (rev 72317)
@@ -0,0 +1,448 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.cluster.defaultcfg.test;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.ha.framework.server.JChannelFactory;
+import org.jboss.logging.Logger;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestHandler;
+
+/**
+ * Tests classloader leak handling of {@link JChannelFactory}.
+ * 
+ * @author Brian Stansberry
+ */
+public class JChannelFactoryClassLoaderLeakTestCase extends TestCase
+{
+   private static final Logger log = Logger.getLogger(JChannelFactoryClassLoaderLeakTestCase.class);
+   
+   private static Method OBJECT_ARG = null;
+   private static Method STRING_ARG = null;
+   private static Method SIMPLE_MUX = null;
+   private static Method COMPLEX_MUX = null;
+   
+   static
+   {
+      Class clazz = JChannelFactory.class;
+      try
+      {
+         OBJECT_ARG = clazz.getDeclaredMethod("createChannel", new Class[] { Object.class });
+         STRING_ARG = clazz.getDeclaredMethod("createChannel", new Class[] { String.class });
+         SIMPLE_MUX = clazz.getDeclaredMethod("createMultiplexerChannel", new Class[] { String.class, String.class });
+         COMPLEX_MUX = clazz.getDeclaredMethod("createMultiplexerChannel", new Class[] { String.class, String.class, boolean.class, String.class });
+      }
+      catch (NoSuchMethodException nsme)
+      {
+         log.error("Reflection failure", nsme);
+      }
+   }
+   
+   private JChannelFactory factory1;
+   private JChannelFactory factory2;
+   private Channel channel1;
+   private Channel channel2;
+   private String jgroups_bind_addr;
+   
+   /**
+    * Create a new JChannelFactoryUnitTestCase.
+    * 
+    * @param name
+    */
+   public JChannelFactoryClassLoaderLeakTestCase(String name)
+   {
+      super(name);
+   }
+
+   protected void setUp() throws Exception
+   {
+      if (COMPLEX_MUX == null)
+         throw new IllegalStateException("Reflection failed in class init; see logs");
+      
+      super.setUp();
+      
+      String jgroups_bind_addr = System.getProperty("jgroups.bind_addr");
+      if (jgroups_bind_addr == null)
+      {
+         System.setProperty("jbosstest.cluster.node0", System.getProperty("jbosstest.cluster.node0", "localhost"));
+      }
+      
+      factory1 = new JChannelFactory();
+      factory1.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
+      factory1.setAssignLogicalAddresses(false);
+      factory1.setExposeChannels(false);
+      factory1.create();
+      factory1.start();
+      factory2 = new JChannelFactory();
+      factory2.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
+      factory2.setAssignLogicalAddresses(false);
+      factory2.setExposeChannels(false);
+      factory2.create();
+      factory2.start();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      if (jgroups_bind_addr == null)
+         System.clearProperty("jgroups.bind_addr");
+      
+      if (channel1 != null && channel1.isOpen())
+         channel1.close();
+      
+      if (channel2 != null && channel2.isOpen())
+         channel2.close();
+      
+      if (factory1 != null)
+      {
+         factory1.stop();
+         factory1.destroy();
+      }
+      if (factory2 != null)
+      {
+         factory2.stop();
+         factory2.destroy();
+      }
+   }
+   
+   public void testClassLoaderLeakObjectShared() throws Exception
+   {  
+      Object[] args1 = { factory1.getConfig("shared1") };
+      Object[] args2 = { factory2.getConfig("shared2") };
+      classloaderLeakTest(OBJECT_ARG, args1, args2);
+   }
+   
+   public void testClassLoaderLeakObjectUnshared() throws Exception
+   {  
+      Object[] args1 = { factory1.getConfig("unshared1") };
+      Object[] args2 = { factory2.getConfig("unshared2") };
+      classloaderLeakTest(OBJECT_ARG, args1, args2);
+   }
+   
+   public void testClassLoaderLeakStringShared() throws Exception
+   {  
+      Object[] args1 = { "shared1" };
+      Object[] args2 = { "shared2" };
+      classloaderLeakTest(STRING_ARG, args1, args2);
+   }
+   
+   public void testClassLoaderLeakStringUnshared() throws Exception
+   {  
+      Object[] args1 = { "unshared1" };
+      Object[] args2 = { "unshared2" };
+      classloaderLeakTest(STRING_ARG, args1, args2);
+   }
+   
+   public void testClassLoaderLeakSimpleMuxShared() throws Exception
+   {  
+      Object[] args1 = { "shared1", "leaktest" };
+      Object[] args2 = { "shared2", "leaktest" };
+      classloaderLeakTest(SIMPLE_MUX, args1, args2);
+   }
+   
+   public void testClassLoaderLeakSimpleMuxUnshared() throws Exception
+   {  
+      Object[] args1 = { "unshared1", "leaktest" };
+      Object[] args2 = { "unshared2", "leaktest" };
+      classloaderLeakTest(SIMPLE_MUX, args1, args2);
+   }
+   
+   public void testClassLoaderLeakComplexMuxShared() throws Exception
+   {  
+      Object[] args1 = { "shared1", "leaktest", Boolean.FALSE, null };
+      Object[] args2 = { "shared2", "leaktest", Boolean.FALSE, null };
+      classloaderLeakTest(COMPLEX_MUX, args1, args2);
+   }
+   
+   public void testClassLoaderLeakComplexMuxUnshared() throws Exception
+   {  
+      Object[] args1 = { "unshared1", "leaktest", Boolean.FALSE, null };
+      Object[] args2 = { "unshared2", "leaktest", Boolean.FALSE, null };
+      classloaderLeakTest(COMPLEX_MUX, args1, args2);
+   }
+   
+   public void testClassLoaderLeakNonConcurrent() throws Exception
+   {  
+      Object[] args1 = { "nonconcurrent1" };
+      Object[] args2 = { "nonconcurrent2" };
+      classloaderLeakTest(STRING_ARG, args1, args2);
+   }
+   
+   private void classloaderLeakTest(Method factoryMeth, Object[] factory1Args, 
+                                    Object[] factory2Args) 
+      throws Exception
+   {
+      int numThreads = 8;
+      int numLoops = 100;
+      
+      Semaphore semaphore = new Semaphore(numThreads);
+      
+      ThreadGroup runnerGroup = new ThreadGroup("TestRunners");
+      
+      ClassLoader ours = Thread.currentThread().getContextClassLoader();
+      
+      // The classloader we want channel threads to use
+      ClassLoader cl = new ClassLoader(ours){};
+      ClassLoaderLeakHandler handler = new ClassLoaderLeakHandler(cl, semaphore, runnerGroup);
+      
+      MessageDispatcher[] dispatchers = new MessageDispatcher[2];
+      
+      Thread.currentThread().setContextClassLoader(cl);    
+      try
+      {
+         channel1 = (Channel) factoryMeth.invoke(factory1, factory1Args);
+         dispatchers[0] = new MessageDispatcher(channel1, handler, handler, handler);
+         
+         channel1.connect("leaktest");
+         assertEquals("No classloader leak on channel1 connect", null, handler.getLeakedClassLoader());
+         
+         channel2 = (Channel) factoryMeth.invoke(factory2, factory2Args);
+         dispatchers[1] = new MessageDispatcher(channel2, handler, handler, handler);
+         
+         channel2.connect("leaktest");
+         assertEquals("No classloader leak on channel2 connect", null, handler.getLeakedClassLoader());
+      }
+      finally
+      {
+         Thread.currentThread().setContextClassLoader(ours);
+      }
+      
+      log.info("Channels connected");
+      
+      ClassLoaderLeakRunner[] runners = new ClassLoaderLeakRunner[numThreads];      
+      
+      for (int i = 0; i < runners.length; i++)
+      {
+         MessageDispatcher disp = dispatchers[i % 2];
+            
+         runners[i] = new ClassLoaderLeakRunner(disp, numLoops, runnerGroup, semaphore);         
+      }
+
+      semaphore.acquire(numThreads);
+      
+      for (int i = 0; i < runners.length; i++)
+      {
+         runners[i].start();
+      }
+      
+      semaphore.release(numThreads);
+      
+      try
+      {
+         assertTrue("messages received within 15 seconds", semaphore.tryAcquire(numThreads, 15, TimeUnit.SECONDS));
+         
+         log.info("Messages received");
+      }
+      finally
+      {
+         for (int i = 0; i < runners.length; i++)
+         {
+            runners[i].stop();
+         }
+      }
+      
+      log.info("Sender threads stopped");
+      
+      assertEquals("No classloader leak", null, handler.getLeakedClassLoader());
+   }
+
+   private class ClassLoaderLeakRunner implements Runnable
+   {
+      private Thread thread;
+      private final MessageDispatcher dispatcher;
+      private final int numMsgs;
+      private final ThreadGroup threadGroup;
+      private final Semaphore semaphore;
+      private boolean stopped;
+      private Exception exception;
+      
+      ClassLoaderLeakRunner(MessageDispatcher dispatcher, int numMsgs, ThreadGroup group, Semaphore semaphore)
+      {
+         this.dispatcher = dispatcher;
+         this.numMsgs = numMsgs;
+         this.threadGroup = group;
+         this.semaphore = semaphore;
+      }
+      
+      public void run()
+      {
+         boolean acquired = false;
+         ClassLoader cl = new ClassLoader(Thread.currentThread().getContextClassLoader()){};
+         try
+         {
+            semaphore.acquire();         
+            acquired = true;
+            
+            log.info(Thread.currentThread().getName() + " starting");        
+            
+            Thread.currentThread().setContextClassLoader(cl);
+            for (int i = 0; i < numMsgs && !stopped && !Thread.interrupted(); i++)
+            {
+               Message msg = new Message(null, null, String.valueOf(i));
+               // sending this way calls receive()
+               dispatcher.send(msg);
+               // sending this way calls handle()
+               dispatcher.castMessage(null, msg, GroupRequest.GET_ALL, 0, false);
+            }
+            
+            log.info(Thread.currentThread().getName() + " done");
+         }
+         catch (Exception e)
+         {
+            this.exception = e;
+         }
+         finally
+         {
+            if (acquired)
+               semaphore.release();
+            
+            Thread.currentThread().setContextClassLoader(cl.getParent());
+         }
+      }
+
+      public Exception getException()
+      {
+         return exception;
+      }
+
+      public void start()
+      {
+         thread = new Thread(this.threadGroup, this);
+         thread.setDaemon(true);
+         thread.start();
+      }
+      
+      public void stop()
+      {
+         stopped = true;
+         if (thread != null && thread.isAlive())
+         {
+            try
+            {
+               thread.join(100);
+            }
+            catch (InterruptedException e)
+            {
+            }
+            if (thread.isAlive())
+               thread.interrupt();
+         }
+      }
+   }
+   
+   private class ClassLoaderLeakHandler 
+      implements MembershipListener, MessageListener, RequestHandler
+   {
+      private final Semaphore semaphore;
+      private final ClassLoader expected;
+      private final ThreadGroup runnerGroup;
+      private ClassLoader leakedClassLoader;
+      private final int numPermits;
+      
+      ClassLoaderLeakHandler(ClassLoader expected, Semaphore semaphore, ThreadGroup runnerGroup)
+      {
+         this.expected = expected;
+         this.semaphore = semaphore;
+         this.runnerGroup = runnerGroup;
+         this.numPermits = this.semaphore.availablePermits();
+      }
+
+      public Object handle(Message msg)
+      {
+         log.debug("handled(): " + msg.getObject());
+         checkClassLoader(true, "handle()");
+         return null;
+      }
+
+      public void block()
+      {
+         checkClassLoader(false, "block()");
+      }
+
+      public void suspect(Address suspected_mbr)
+      {
+         checkClassLoader(false, "suspect()");         
+      }
+
+      public void viewAccepted(View new_view)
+      {
+         checkClassLoader(false, "viewAccepted()");     
+         log.info("viewAccepted(): " + new_view);
+      }
+
+      public byte[] getState()
+      {
+         checkClassLoader(false, "getState()");  
+         return new byte[1];
+      }
+
+      public void receive(Message msg)
+      {
+         checkClassLoader(false, "receive()");  
+      }
+
+      public void setState(byte[] state)
+      {
+         checkClassLoader(false, "setState()");   
+      }
+
+      public ClassLoader getLeakedClassLoader()
+      {
+         return leakedClassLoader;
+      } 
+      
+      private void checkClassLoader(boolean fromHandle, String method)
+      {         
+         if (leakedClassLoader == null) // ignore msgs once we found a leak
+         {
+            // ignore runner threads that loop all the way back up 
+            if (Thread.currentThread().getThreadGroup().equals(runnerGroup))
+            {               
+               return; 
+            }
+            
+            ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+            if (!expected.equals(tccl))               
+            {
+               leakedClassLoader = tccl;
+               semaphore.release(numPermits);
+               log.info("ClassLoader leaked in " + method + ": " + tccl + " leaked to " + Thread.currentThread().getName());
+            }
+         }         
+      }
+      
+   }
+}

Added: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java	                        (rev 0)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java	2008-04-16 20:48:54 UTC (rev 72317)
@@ -0,0 +1,315 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.cluster.defaultcfg.test;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.jboss.ha.framework.server.JChannelFactory;
+import org.jboss.logging.Logger;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.jgroups.mux.MuxChannel;
+import org.jgroups.protocols.TP;
+import org.jgroups.stack.IpAddress;
+import org.jgroups.stack.Protocol;
+
+/**
+ * Basic tests of the AS-specific JChannelFactory
+ * @author Brian Stansberry
+ */
+public class JChannelFactoryUnitTestCase extends TestCase
+{
+   private static final Logger log = Logger.getLogger(JChannelFactoryUnitTestCase.class);
+   
+   private JChannelFactory factory1;
+   private JChannelFactory factory2;
+   private Channel channel1;
+   private Channel channel2;
+   private Channel channel3;
+   private Channel channel4;
+   private String jgroups_bind_addr;
+   private MBeanServer mbeanServer;
+   
+   /**
+    * Create a new JChannelFactoryUnitTestCase.
+    * 
+    * @param name
+    */
+   public JChannelFactoryUnitTestCase(String name)
+   {
+      super(name);
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      String jgroups_bind_addr = System.getProperty("jgroups.bind_addr");
+      if (jgroups_bind_addr == null)
+      {
+         System.setProperty("jbosstest.cluster.node0", System.getProperty("jbosstest.cluster.node0", "localhost"));
+      }
+      
+      factory1 = new JChannelFactory();
+      factory1.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
+      factory1.setAssignLogicalAddresses(false);
+      factory1.setNodeAddress(InetAddress.getByName("localhost"));
+      factory1.setNamingServicePort(123);
+      factory1.setExposeChannels(false);
+      factory1.create();
+      factory1.start();
+      factory2 = new JChannelFactory();
+      factory2.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
+      factory2.setAssignLogicalAddresses(false);
+      factory2.setNodeAddress(InetAddress.getByName("localhost"));
+      factory2.setNamingServicePort(456);
+      factory2.setNodeName("node1");
+      factory2.setExposeChannels(false);
+      factory2.create();
+      factory2.start();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      if (mbeanServer != null)
+         MBeanServerFactory.releaseMBeanServer(mbeanServer);
+      
+      if (jgroups_bind_addr == null)
+         System.clearProperty("jgroups.bind_addr");
+      
+      if (channel1 != null && channel1.isOpen())
+         channel1.close();
+      
+      if (channel2 != null && channel2.isOpen())
+         channel2.close();
+      
+      if (channel3 != null && channel3.isOpen())
+         channel3.close();
+      
+      if (channel4 != null && channel4.isOpen())
+         channel4.close();
+      
+      if (factory1 != null)
+      {
+         factory1.stop();
+         factory1.destroy();
+      }
+      if (factory2 != null)
+      {
+         factory2.stop();
+         factory1.destroy();
+      }
+   }
+   
+   public void testNoSingletonName() throws Exception
+   {
+      log.info("+++ testNoSingletonName()");      
+      
+      // Assert the stack is as expected
+      String cfg1 = factory1.getConfig("unshared1");
+      assertTrue("no singleton_name in unshared1", cfg1.indexOf("singleton_name") < 0);
+      String cfg2 = factory1.getConfig("unshared2");
+      assertTrue("no singleton_name in unshared2", cfg2.indexOf("singleton_name") < 0);
+      
+      channel1 = factory1.createChannel("unshared1");
+      assertFalse(channel1 instanceof MuxChannel);
+      channel1.connect("test");      
+      TP tp1 = getTP((JChannel) channel1);
+      
+      channel2 = factory2.createChannel("unshared2");
+      assertFalse(channel2 instanceof MuxChannel);
+      channel2.connect("test");
+      TP tp2 = getTP((JChannel) channel2);
+      
+      assertNotSame(tp1, tp2);
+      
+      channel3 = factory1.createMultiplexerChannel("unshared1", "test");
+      assertFalse(channel3 instanceof MuxChannel);
+      channel3.connect("test");
+      TP tp3 = getTP((JChannel) channel3);
+      
+      assertNotSame(tp1, tp3);
+      assertNotSame(tp2, tp3);
+      
+      channel4 = factory1.createMultiplexerChannel("unshared1", "test2");
+      assertFalse(channel4 instanceof MuxChannel);
+      channel4.connect("test2");
+      TP tp4 = getTP((JChannel) channel4);
+      
+      assertSame(tp3, tp4);
+   }
+   
+   /**
+    * Confirms that thread pool configurations are as expected following
+    * any massaging by the factory
+    */
+   public void testThreadPoolConfig() throws Exception
+   {
+      log.info("+++ testThreadPoolConfig()");      
+      
+      channel1 = factory1.createChannel("queues");
+      channel1.connect("test");
+      TP tp1 = getTP((JChannel) channel1);
+      
+      assertEquals(3000, tp1.getIncomingKeepAliveTime());
+      assertEquals(22, tp1.getIncomingMaxPoolSize());
+      assertEquals(2, tp1.getIncomingMinPoolSize());
+      assertEquals(750, tp1.getIncomingMaxQueueSize());
+      assertEquals(4000, tp1.getOOBKeepAliveTime());
+      assertEquals(12, tp1.getOOBMaxPoolSize());
+      assertEquals(3, tp1.getOOBMinPoolSize());
+      assertEquals(75, tp1.getOOBMaxQueueSize());
+      
+      Executor exec = tp1.getDefaultThreadPool();
+      assertNotNull(exec);
+      if (exec instanceof ThreadPoolExecutor)
+      {
+         assertTrue(((ThreadPoolExecutor) exec).getRejectedExecutionHandler() instanceof ThreadPoolExecutor.DiscardPolicy);
+      }
+      
+      exec = tp1.getOOBThreadPool();
+      assertNotNull(exec);
+      if (exec instanceof ThreadPoolExecutor)
+      {
+         assertTrue(((ThreadPoolExecutor) exec).getRejectedExecutionHandler() instanceof ThreadPoolExecutor.AbortPolicy);
+      }
+      
+      // Confirm that the no-pool config doesn't create a pool
+      channel2 = factory1.createChannel("nothreads1");
+      channel2.connect("test");
+      TP tp2 = getTP((JChannel) channel2);
+      
+      assertFalse(tp2.getDefaultThreadPool() instanceof ThreadPoolExecutor);
+      assertFalse(tp2.getOOBThreadPool() instanceof ThreadPoolExecutor);
+   }
+   
+   public void testJmxHandling() throws Exception
+   {
+      log.info("+++ testJmxHandling()");
+      
+      mbeanServer = MBeanServerFactory.createMBeanServer("jchannelfactorytest");
+      
+      ObjectName factoryName1 = new ObjectName("jboss.test:service=TestChannelFactory1");
+      ObjectName factoryName2 = new ObjectName("jboss.test:service=TestChannelFactory2");
+      
+      // destroy the factories so we can start clean
+      factory1.stop();
+      factory1.destroy();
+      factory2.stop();
+      factory2.destroy();
+      
+      factory1.setExposeChannels(true);
+      factory1.setExposeProtocols(false);
+      
+      factory2.setExposeChannels(true);
+      factory2.setExposeProtocols(true);
+      
+      mbeanServer.registerMBean(factory1, factoryName1);
+      
+      assertSame(mbeanServer, factory1.getServer());
+      assertEquals(factoryName1.getDomain(), factory1.getDomain());
+      
+      mbeanServer.registerMBean(factory2, factoryName2);
+      
+      factory1.create();
+      factory1.start();
+      factory2.create();
+      factory2.start();
+      
+      channel1 = factory1.createMultiplexerChannel("shared1", "shared");
+      ObjectName chName1 = new ObjectName("jboss.test:type=channel,cluster=shared");
+      assertTrue(chName1 + " registered", mbeanServer.isRegistered(chName1));
+      ObjectName udpName1 = new ObjectName("jboss.test:type=channel,cluster=shared,protocol=UDP");
+      assertFalse(udpName1 + " not registered", mbeanServer.isRegistered(udpName1));
+      
+      channel2 = factory2.createMultiplexerChannel("unshared1", "unshared");
+      ObjectName chName2 = new ObjectName("jboss.test:type=channel,cluster=unshared");
+      assertTrue(chName2 + " registered", mbeanServer.isRegistered(chName2));
+      ObjectName udpName2 = new ObjectName("jboss.test:type=protocol,cluster=unshared,protocol=UDP");
+      assertTrue(udpName2 + " registered", mbeanServer.isRegistered(udpName2));
+      
+      channel1.connect("shared");
+      assertTrue(chName1 + " still registered", mbeanServer.isRegistered(chName1));
+      assertTrue(chName2 + " still registered", mbeanServer.isRegistered(chName2));
+      
+      log.info("closing channel 1");
+      
+      channel1.close();
+      assertFalse(chName1 + " unregistered", mbeanServer.isRegistered(chName1));
+      assertTrue(chName2 + " not unregistered", mbeanServer.isRegistered(chName2));
+      assertTrue(udpName2 + " not unregistered", mbeanServer.isRegistered(udpName2));
+      
+      log.info("stopping factory2");
+      
+      factory2.stop();
+      factory2.destroy();
+      assertFalse(chName2 + " unregistered", mbeanServer.isRegistered(chName2));
+      assertFalse(udpName2 + " unregistered", mbeanServer.isRegistered(udpName2));
+   }
+   
+   public void testLogicalAddressAssignment() throws Exception
+   {
+      log.info("+++ testLogicalAddressAssignment()");
+      
+      channel1 = factory1.createChannel("shared1");
+      channel1.connect("shared");
+      IpAddress addr = (IpAddress) channel1.getLocalAddress();
+      assertEquals(null, addr.getAdditionalData());
+
+      factory1.setAssignLogicalAddresses(true);
+      factory2.setAssignLogicalAddresses(true);
+      
+      channel2 = factory1.createChannel("shared2");
+      channel2.connect("shared");
+      addr = (IpAddress) channel2.getLocalAddress();
+      byte[] addlData = addr.getAdditionalData();
+      assertNotNull(addlData);
+      assertEquals("127.0.0.1:123", new String(addlData));
+
+      channel3 = factory2.createChannel("unshared1");
+      channel3.connect("unshared");
+      addr = (IpAddress) channel3.getLocalAddress();
+      addlData = addr.getAdditionalData();
+      assertNotNull(addlData);
+      assertEquals("node1",  new String(addlData));
+   }
+   
+   private TP getTP(JChannel channel)
+   {
+      List<Protocol> protocols = channel.getProtocolStack().getProtocols();
+      return (TP) protocols.get(protocols.size() -1);
+   }
+   
+   
+}

Added: trunk/testsuite/src/resources/cluster/channelfactory/stacks.xml
===================================================================
--- trunk/testsuite/src/resources/cluster/channelfactory/stacks.xml	                        (rev 0)
+++ trunk/testsuite/src/resources/cluster/channelfactory/stacks.xml	2008-04-16 20:48:54 UTC (rev 72317)
@@ -0,0 +1,404 @@
+<!--
+  JGroups protocol stacks definitions used by the JChannelFactory unit tests.
+  Version: $Id:jgroups-channelfactory-stacks.xml 71313 2008-03-26 19:46:59Z bstansberry at jboss.com $
+-->
+<protocol_stacks>
+
+    <stack name="shared1"
+           description="shared stack1">
+        <config>
+          <UDP
+             singleton_name="shared1"
+             mcast_port="${jgroups.udp.mcast_port:35688}"
+             mcast_addr="${jgroups.udp.mcast_addr:228.111.111.111}"
+             tos="8"
+             ucast_recv_buf_size="20000000"
+             ucast_send_buf_size="640000"
+             mcast_recv_buf_size="25000000"
+             mcast_send_buf_size="640000"
+             loopback="true"
+             discard_incompatible_packets="true"
+             max_bundle_size="64000"
+             max_bundle_timeout="30"
+             use_incoming_packet_handler="true"
+             ip_ttl="${jgroups.udp.ip_ttl:2}"
+             enable_bundling="false"
+                 
+             use_concurrent_stack="true"
+
+		     thread_pool.enabled="true"
+		     thread_pool.min_threads="1"
+		     thread_pool.max_threads="200"
+		     thread_pool.keep_alive_time="5000"
+           thread_pool.queue_enabled="false"
+           thread_pool.queue_max_size="1000"
+		     thread_pool.rejection_policy="Run"
+		
+		     oob_thread_pool.enabled="true"
+		     oob_thread_pool.min_threads="1"
+		     oob_thread_pool.max_threads="8"
+		     oob_thread_pool.keep_alive_time="5000"
+		     oob_thread_pool.queue_enabled="false"
+		     oob_thread_pool.queue_max_size="100"
+		     oob_thread_pool.rejection_policy="Run"/>
+          <PING timeout="1000" num_initial_members="1"/>
+          <MERGE2 max_interval="100000" min_interval="20000"/>
+          <FD_SOCK/>
+          <FD timeout="10000" max_tries="5" shun="true"/>
+          <VERIFY_SUSPECT timeout="1500"/>
+          <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+          <UNICAST timeout="300,600,1200,2400,3600"/>
+          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+          <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   shun="true"
+                   view_bundling="true"
+                   view_ack_collection_timeout="5000"/>
+          <FC max_credits="2000000" min_threshold="0.10"/>
+          <FRAG2 frag_size="60000"/>
+          <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+          <pbcast.STATE_TRANSFER/>
+          <pbcast.FLUSH timeout="0"/>
+        </config>
+    </stack>
+    
+    <stack name="shared2"
+           description="shared stack2">
+        <config>
+          <UDP
+             singleton_name="shared2"
+             mcast_port="${jgroups.udp.mcast_port:35688}"
+             mcast_addr="${jgroups.udp.mcast_addr:228.111.111.111}"
+             tos="8"
+             ucast_recv_buf_size="20000000"
+             ucast_send_buf_size="640000"
+             mcast_recv_buf_size="25000000"
+             mcast_send_buf_size="640000"
+             loopback="true"
+             discard_incompatible_packets="true"
+             max_bundle_size="64000"
+             max_bundle_timeout="30"
+             use_incoming_packet_handler="true"
+             ip_ttl="${jgroups.udp.ip_ttl:2}"
+             enable_bundling="false"
+                 
+             use_concurrent_stack="true"
+
+           thread_pool.enabled="true"
+           thread_pool.min_threads="1"
+           thread_pool.max_threads="200"
+           thread_pool.keep_alive_time="5000"
+           thread_pool.queue_enabled="false"
+           thread_pool.queue_max_size="1000"
+           thread_pool.rejection_policy="Run"
+      
+           oob_thread_pool.enabled="true"
+           oob_thread_pool.min_threads="1"
+           oob_thread_pool.max_threads="8"
+           oob_thread_pool.keep_alive_time="5000"
+           oob_thread_pool.queue_enabled="false"
+           oob_thread_pool.queue_max_size="100"
+           oob_thread_pool.rejection_policy="Run"/>
+          <PING timeout="1000" num_initial_members="1"/>
+          <MERGE2 max_interval="100000" min_interval="20000"/>
+          <FD_SOCK/>
+          <FD timeout="10000" max_tries="5" shun="true"/>
+          <VERIFY_SUSPECT timeout="1500"/>
+          <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+          <UNICAST timeout="300,600,1200,2400,3600"/>
+          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+          <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   shun="true"
+                   view_bundling="true"
+                   view_ack_collection_timeout="5000"/>
+          <FC max_credits="2000000" min_threshold="0.10"/>
+          <FRAG2 frag_size="60000"/>
+          <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+          <pbcast.STATE_TRANSFER/>
+          <pbcast.FLUSH timeout="0"/>
+        </config>
+    </stack>
+    
+    <stack name="unshared1"
+           description="unshared stack 1">
+        <config>
+          <UDP
+             mcast_port="${jgroups.udp.mcast_port:35689}"
+             mcast_addr="${jgroups.udp.mcast_addr:228.112.112.112}"
+             tos="8"
+             ucast_recv_buf_size="20000000"
+             ucast_send_buf_size="640000"
+             mcast_recv_buf_size="25000000"
+             mcast_send_buf_size="640000"
+             loopback="true"
+             discard_incompatible_packets="true"
+             max_bundle_size="64000"
+             max_bundle_timeout="30"
+             use_incoming_packet_handler="true"
+             ip_ttl="${jgroups.udp.ip_ttl:2}"
+             enable_bundling="false"
+                 
+             use_concurrent_stack="true"
+
+           thread_pool.enabled="true"
+           thread_pool.min_threads="1"
+           thread_pool.max_threads="200"
+           thread_pool.keep_alive_time="5000"
+           thread_pool.queue_enabled="false"
+           thread_pool.queue_max_size="1000"
+           thread_pool.rejection_policy="Run"
+      
+           oob_thread_pool.enabled="true"
+           oob_thread_pool.min_threads="1"
+           oob_thread_pool.max_threads="8"
+           oob_thread_pool.keep_alive_time="5000"
+           oob_thread_pool.queue_enabled="false"
+           oob_thread_pool.queue_max_size="100"
+           oob_thread_pool.rejection_policy="Run"/>
+          <PING timeout="1000" num_initial_members="1"/>
+          <MERGE2 max_interval="100000" min_interval="20000"/>
+          <FD_SOCK/>
+          <FD timeout="10000" max_tries="5" shun="true"/>
+          <VERIFY_SUSPECT timeout="1500"/>
+          <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+          <UNICAST timeout="300,600,1200,2400,3600"/>
+          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+          <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   shun="true"
+                   view_bundling="true"
+                   view_ack_collection_timeout="5000"/>
+          <FC max_credits="2000000" min_threshold="0.10"/>
+          <FRAG2 frag_size="60000"/>
+          <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+          <pbcast.STATE_TRANSFER/>
+          <pbcast.FLUSH timeout="0"/>
+        </config>
+    </stack>
+    
+    <stack name="unshared2"
+           description="unshared stack 2">
+        <config>
+          <UDP
+             mcast_port="${jgroups.udp.mcast_port:35689}"
+             mcast_addr="${jgroups.udp.mcast_addr:228.112.112.112}"
+             tos="8"
+             ucast_recv_buf_size="20000000"
+             ucast_send_buf_size="640000"
+             mcast_recv_buf_size="25000000"
+             mcast_send_buf_size="640000"
+             loopback="true"
+             discard_incompatible_packets="true"
+             max_bundle_size="64000"
+             max_bundle_timeout="30"
+             use_incoming_packet_handler="true"
+             ip_ttl="${jgroups.udp.ip_ttl:2}"
+             enable_bundling="false"
+                 
+             use_concurrent_stack="true"
+
+           thread_pool.enabled="true"
+           thread_pool.min_threads="1"
+           thread_pool.max_threads="200"
+           thread_pool.keep_alive_time="5000"
+           thread_pool.queue_enabled="false"
+           thread_pool.queue_max_size="1000"
+           thread_pool.rejection_policy="Run"
+      
+           oob_thread_pool.enabled="true"
+           oob_thread_pool.min_threads="1"
+           oob_thread_pool.max_threads="8"
+           oob_thread_pool.keep_alive_time="5000"
+           oob_thread_pool.queue_enabled="false"
+           oob_thread_pool.queue_max_size="100"
+           oob_thread_pool.rejection_policy="Run"/>
+          <PING timeout="1000" num_initial_members="1"/>
+          <MERGE2 max_interval="100000" min_interval="20000"/>
+          <FD_SOCK/>
+          <FD timeout="10000" max_tries="5" shun="true"/>
+          <VERIFY_SUSPECT timeout="1500"/>
+          <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+          <UNICAST timeout="300,600,1200,2400,3600"/>
+          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+          <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   shun="true"
+                   view_bundling="true"
+                   view_ack_collection_timeout="5000"/>
+          <FC max_credits="2000000" min_threshold="0.10"/>
+          <FRAG2 frag_size="60000"/>
+          <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+          <pbcast.STATE_TRANSFER/>
+          <pbcast.FLUSH timeout="0"/>
+        </config>
+    </stack>
+    
+    <!--  Here we don't use the concurrent stack, so an incoming packet handler 
+          thread carries message -->
+    <stack name="nonconcurrent1"
+           description="no concurrent stack1">
+        <config>
+          <UDP
+             singleton_name="nonconcurrent1"
+             mcast_port="${jgroups.udp.mcast_port:35799}"
+             mcast_addr="${jgroups.udp.mcast_addr:228.114.114.114}"
+             tos="8"
+             ucast_recv_buf_size="20000000"
+             ucast_send_buf_size="640000"
+             mcast_recv_buf_size="25000000"
+             mcast_send_buf_size="640000"
+             loopback="true"
+             discard_incompatible_packets="true"
+             max_bundle_size="64000"
+             max_bundle_timeout="30"
+             use_incoming_packet_handler="true"
+             ip_ttl="${jgroups.udp.ip_ttl:2}"
+             enable_bundling="false"
+                 
+             use_concurrent_stack="false"
+
+             thread_pool.enabled="false"
+      
+             oob_thread_pool.enabled="false"/>  
+          <PING timeout="1000" num_initial_members="1"/>
+          <MERGE2 max_interval="100000" min_interval="20000"/>
+          <FD_SOCK/>
+          <FD timeout="10000" max_tries="5" shun="true"/>
+          <VERIFY_SUSPECT timeout="1500"/>
+          <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+          <UNICAST timeout="300,600,1200,2400,3600"/>
+          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+          <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   shun="true"
+                   view_bundling="true"
+                   view_ack_collection_timeout="5000"/>
+          <FC max_credits="2000000" min_threshold="0.10"/>
+          <FRAG2 frag_size="60000"/>
+          <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+          <pbcast.STATE_TRANSFER/>
+          <pbcast.FLUSH timeout="0"/>
+        </config>
+    </stack>
+    
+    <stack name="nonconcurrent2"
+           description="no concurrent stack2">
+        <config>
+          <UDP
+             singleton_name="nonconcurrent2"
+             mcast_port="${jgroups.udp.mcast_port:35799}"
+             mcast_addr="${jgroups.udp.mcast_addr:228.114.114.114}"
+             tos="8"
+             ucast_recv_buf_size="20000000"
+             ucast_send_buf_size="640000"
+             mcast_recv_buf_size="25000000"
+             mcast_send_buf_size="640000"
+             loopback="true"
+             discard_incompatible_packets="true"
+             max_bundle_size="64000"
+             max_bundle_timeout="30"
+             use_incoming_packet_handler="true"
+             ip_ttl="${jgroups.udp.ip_ttl:2}"
+             enable_bundling="false"
+                 
+             use_concurrent_stack="false"
+
+             thread_pool.enabled="false"
+      
+             oob_thread_pool.enabled="false"/>  
+          <PING timeout="1000" num_initial_members="1"/>
+          <MERGE2 max_interval="100000" min_interval="20000"/>
+          <FD_SOCK/>
+          <FD timeout="10000" max_tries="5" shun="true"/>
+          <VERIFY_SUSPECT timeout="1500"/>
+          <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+          <UNICAST timeout="300,600,1200,2400,3600"/>
+          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+          <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   shun="true"
+                   view_bundling="true"
+                   view_ack_collection_timeout="5000"/>
+          <FC max_credits="2000000" min_threshold="0.10"/>
+          <FRAG2 frag_size="60000"/>
+          <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+          <pbcast.STATE_TRANSFER/>
+          <pbcast.FLUSH timeout="0"/>
+        </config>
+    </stack>
+
+    <stack name="queues" description="to check queue configuration">
+        <config>
+          <UDP
+             singleton_name="queues"
+             mcast_port="${jgroups.udp.mcast_port:35688}"
+             mcast_addr="${jgroups.udp.mcast_addr:228.111.111.111}"
+             tos="8"
+             ucast_recv_buf_size="20000000"
+             ucast_send_buf_size="640000"
+             mcast_recv_buf_size="25000000"
+             mcast_send_buf_size="640000"
+             loopback="true"
+             discard_incompatible_packets="true"
+             max_bundle_size="64000"
+             max_bundle_timeout="30"
+             use_incoming_packet_handler="true"
+             ip_ttl="${jgroups.udp.ip_ttl:2}"
+             enable_bundling="false"
+                 
+             use_concurrent_stack="true"
+
+           thread_pool.enabled="true"
+           thread_pool.min_threads="2"
+           thread_pool.max_threads="22"
+           thread_pool.keep_alive_time="3000"
+           thread_pool.queue_enabled="true"
+           thread_pool.queue_max_size="750"
+           thread_pool.rejection_policy="Discard"
+      
+           oob_thread_pool.enabled="true"
+           oob_thread_pool.min_threads="3"
+           oob_thread_pool.max_threads="12"
+           oob_thread_pool.keep_alive_time="4000"
+           oob_thread_pool.queue_enabled="true"
+           oob_thread_pool.queue_max_size="75"
+           oob_thread_pool.rejection_policy="Abort"/>
+          <PING timeout="1000" num_initial_members="1"/>
+          <MERGE2 max_interval="100000" min_interval="20000"/>
+          <FD_SOCK/>
+          <FD timeout="10000" max_tries="5" shun="true"/>
+          <VERIFY_SUSPECT timeout="1500"/>
+          <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+          <UNICAST timeout="300,600,1200,2400,3600"/>
+          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+          <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   shun="true"
+                   view_bundling="true"
+                   view_ack_collection_timeout="5000"/>
+          <FC max_credits="2000000" min_threshold="0.10"/>
+          <FRAG2 frag_size="60000"/>
+          <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+          <pbcast.STATE_TRANSFER/>
+          <pbcast.FLUSH timeout="0"/>
+        </config>
+    </stack>
+
+</protocol_stacks>
+
+




More information about the jboss-cvs-commits mailing list