[jboss-cvs] JBossAS SVN: r72317 - in trunk: cluster/src/resources/jgroups and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 16 16:48:54 EDT 2008
Author: bstansberry at jboss.com
Date: 2008-04-16 16:48:54 -0400 (Wed, 16 Apr 2008)
New Revision: 72317
Added:
trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java
trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java
trunk/testsuite/src/resources/cluster/channelfactory/
trunk/testsuite/src/resources/cluster/channelfactory/stacks.xml
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/
trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml
trunk/testsuite/local.properties
Log:
[JBAS-5414] Ensure app classloaders don't leak to JGroups threads
[JBAS-5442] AS ChannelFactory never returns MuxChannel
[JBAS-5451] Fix JMX registration handling by ChannelFactory
Property changes on: trunk/cluster/src/main/org/jboss/ha/framework/server
___________________________________________________________________
Name: svn:ignore
+ DelegatingChannel.java
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2008-04-16 20:46:57 UTC (rev 72316)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2008-04-16 20:48:54 UTC (rev 72317)
@@ -267,20 +267,23 @@
{
logHistory ("Starting partition");
- log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
- " using stack " + getMultiplexerStack());
-
- channel = createMuxChannel(config);
+ if (channel == null || !channel.isOpen())
+ {
+ log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
+ " using stack " + getMultiplexerStack());
+
+ channel = createMuxChannel(config);
+
+ channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
+ channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
+ }
- channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
- channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
-
log.info("Initializing partition " + getPartitionName());
logHistory ("Initializing partition " + getPartitionName());
dispatcher = new RpcHandler(channel, null, null, new Object(), config.getDeadlockDetection());
- // Subscribe to events generated by the org.jgroups. protocol stack
+ // Subscribe to events generated by the channel
log.debug("setMembershipListener");
dispatcher.setMembershipListener(this);
log.debug("setMessageListener");
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java 2008-04-16 20:46:57 UTC (rev 72316)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java 2008-04-16 20:48:54 UTC (rev 72317)
@@ -22,18 +22,49 @@
package org.jboss.ha.framework.server;
+import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.rmi.dgc.VMID;
import java.rmi.server.UID;
+import java.security.AccessController;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.management.MBeanRegistration;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import org.jboss.logging.Logger;
import org.jboss.system.ServiceMBean;
import org.jboss.system.server.ServerConfigUtil;
+import org.jboss.util.StringPropertyReplacer;
+import org.jboss.util.loading.ContextClassLoaderSwitcher;
import org.jgroups.Channel;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelListenerAdapter;
import org.jgroups.Event;
import org.jgroups.Global;
-import org.jgroups.mux.MuxChannel;
+import org.jgroups.JChannel;
+import org.jgroups.conf.ConfiguratorFactory;
+import org.jgroups.conf.ProtocolData;
+import org.jgroups.conf.ProtocolStackConfigurator;
+import org.jgroups.jmx.JmxConfigurator;
+import org.jgroups.protocols.TP;
import org.jgroups.stack.IpAddress;
+import org.jgroups.stack.Protocol;
+import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.ThreadFactory;
+import org.jgroups.util.ThreadNamingPattern;
+import org.jgroups.util.Util;
/**
* Extension to the JGroups JChannelFactory that supports the addition
@@ -44,10 +75,12 @@
* @version $Revision: 69425 $
*/
public class JChannelFactory extends org.jgroups.JChannelFactory
- implements JChannelFactoryMBean
+ implements JChannelFactoryMBean, MBeanRegistration
{
protected static Logger log = Logger.getLogger(JChannelFactory.class);
+ private static final String UNSHARED_TRANSPORT_NAME_BASE = "unnamed_";
+
private static final int CREATED = ServiceMBean.CREATED;
private static final int STARTING = ServiceMBean.STARTING;
private static final int STARTED = ServiceMBean.STARTED;
@@ -59,9 +92,49 @@
private InetAddress nodeAddress;
private String nodeName;
private int namingServicePort = -1;
- private int state;
+ private int state = ServiceMBean.UNREGISTERED;
private boolean assignLogicalAddresses = true;
+ private boolean manageChannelThreadPools = true;
+ private boolean addMissingSingletonName = true;
+ private boolean domainSet;
+ private final ContextClassLoaderSwitcher classLoaderSwitcher;
+ private final Set<String> registeredChannels = new HashSet<String>();
+ public JChannelFactory()
+ {
+ this.classLoaderSwitcher = (ContextClassLoaderSwitcher) AccessController.doPrivileged(ContextClassLoaderSwitcher.INSTANTIATOR);
+ }
+
+
+ @Override
+ public Channel createChannel() throws ChannelException
+ {
+ // Use type 'Object' so we call createChannel(Object) not createChannel(String)
+// Object stack = JChannel.DEFAULT_PROTOCOL_STACK;
+// return createChannel(stack);
+ throw new UnsupportedOperationException("No-arg createChannel() is not supported");
+ }
+
+ @Override
+ public Channel createChannel(Object properties) throws ChannelException
+ {
+ Channel channel = super.createChannel(properties);
+
+ if (manageChannelThreadPools)
+ {
+ fixChannelThreadPools(channel);
+ }
+
+ if (assignLogicalAddresses)
+ {
+ setChannelUniqueId(channel);
+ }
+
+ // can't register in JMX as we don't have a channel name
+
+ return channel;
+ }
+
/**
* {@inheritDoc}
*
@@ -74,22 +147,30 @@
* the first time a channel is created.
*/
@Override
- public Channel createMultiplexerChannel(String stack_name, String id, boolean register_for_state_transfer, String substate_id) throws Exception
+ public Channel createChannel(String stack_name) throws Exception
{
- // Due to problems with coordinated state transfer, we suppress register_for_state_transfer usage.
- // TODO revert to normal if coordinated state transfer is fixed
-// Channel channel = super.createMultiplexerChannel(stack_name, id, register_for_state_transfer, substate_id);
- Channel channel = createSharedTransportChannel(stack_name);
+ String props=stack_name != null? getConfig(stack_name) : null;
+ if (props == null)
+ {
+ log.warn("No protocol stack found with name " + stack_name +
+ "; creating default channel");
+ return createChannel();
+ }
- if (channel == null)
+ JChannel channel = new JChannel(props);
+
+ if (manageChannelThreadPools)
{
- log.debug("Config for " + stack_name + " does not include singleton_name; creating MuxChannel. Config is " + getConfig(stack_name));
- channel = super.createMultiplexerChannel(stack_name, id, false, null);
+ fixChannelThreadPools(channel);
}
if (assignLogicalAddresses)
+ {
setChannelUniqueId(channel);
+ }
+ // can't register in JMX as we don't have a channel name
+
return channel;
}
@@ -107,9 +188,80 @@
@Override
public Channel createMultiplexerChannel(String stack_name, String id) throws Exception
{
- return createMultiplexerChannel(stack_name, id, false, null);
+ String configStr = getConfig(stack_name);
+
+ if (configStr == null)
+ throw new IllegalStateException("Unknown stack_name " + stack_name);
+
+ ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
+ Map tpProps = getTransportProperties(config);
+
+ if (!tpProps.containsKey(Global.SINGLETON_NAME))
+ {
+ if (addMissingSingletonName)
+ {
+ String singletonName = UNSHARED_TRANSPORT_NAME_BASE + stack_name;
+
+ log.warn("Config for " + stack_name + " does not include " +
+ "singleton_name; adding a name of " + singletonName +
+ ". You should configure a singleton_name for this stack.");
+
+ config = addSingletonName(config, singletonName);
+ log.debug("Stack config after adding singleton_name is " + config.getProtocolStackString());
+ tpProps = getTransportProperties(config);
+ }
+ else
+ {
+ throw new IllegalStateException("Config for " + stack_name + " does not include " +
+ "singleton_name and MuxChannels are not supported.");
+ }
+ }
+
+ JChannel channel = new JChannel(config);
+
+ if (manageChannelThreadPools)
+ {
+ fixChannelThreadPools(channel, tpProps);
+ }
+
+ if (assignLogicalAddresses)
+ {
+ setChannelUniqueId(channel);
+ }
+
+ if (isExposeChannels() && id != null && id.length() > 0)
+ {
+ registerChannel(channel, id);
+ }
+
+ return channel;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ *
+ * Overrides the superclass version by generating a unique node id
+ * and passing it down the Channel as additional_data and by suppressing the
+ * <code>register_for_state_transfer</code> param.
+ * <p/>
+ * The additional data is the value of {@link #getNodeName()} which either
+ * can be injected as part of configuration or will be calculated
+ * the first time a channel is created.
+ */
+ @Override
+ public Channel createMultiplexerChannel(String stack_name, String id, boolean register_for_state_transfer, String substate_id) throws Exception
+ {
+ return createMultiplexerChannel(stack_name, id);
}
-
+
+ @Override
+ public void setDomain(String domain)
+ {
+ super.setDomain(domain);
+ domainSet = true;
+ }
+
public InetAddress getNodeAddress()
{
return nodeAddress;
@@ -175,6 +327,26 @@
this.assignLogicalAddresses = logicalAddresses;
}
+ public boolean getManageChannelThreadPools()
+ {
+ return manageChannelThreadPools;
+ }
+
+ public void setManageChannelThreadPools(boolean manageChannelThreadPools)
+ {
+ this.manageChannelThreadPools = manageChannelThreadPools;
+ }
+
+ public boolean getAddMissingSingletonName()
+ {
+ return addMissingSingletonName;
+ }
+
+ public void setAddMissingSingletonName(boolean addMissingSingletonName)
+ {
+ this.addMissingSingletonName = addMissingSingletonName;
+ }
+
@Override
public void create() throws Exception
{
@@ -200,7 +372,6 @@
}
log.debug("Created JChannelFactory");
- super.create();
}
@Override
@@ -283,7 +454,13 @@
try
{
- super.destroy();
+ // DON'T call super.destroy() as that may deregister the JMX proxy
+ // to this pojo service, leading to ugliness when the proxy is destroyed
+
+ for (String channelId : registeredChannels)
+ {
+ unregister(channelId);
+ }
}
catch (Throwable t)
{
@@ -292,46 +469,46 @@
state = DESTROYED;
log.debug("Destroyed JChannelFactory");
}
-
- /**
- * Checks whether the transport protocol configuration for <code>stack_name</code>
- * supports the shared use of the protocol between channels, creating a
- * returning a channel if it does.
- * <p>
- * Determines whether a shared transport is possible by looking for the
- * {@link Global#SINGLETON_NAME} parameter in the configuration of the
- * first protocol in the string returned by {@link #getConfig(String)}.
- * </p>
- *
- * @param stack_name the name of the protocol stack
- * @return a channel configured per <code>stack_name</code>, or <code>null</code>
- * if <code>stack_name</code> is not a register protocol stack or
- * its transport protocol configuration does not support sharing.
- *
- * @throws Exception
- */
- public Channel createSharedTransportChannel(String stack_name) throws Exception
+
+ // ------------------------------------------------------- MBeanRegistration
+
+ public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception
{
- Channel ch = null;
- String config = getConfig(stack_name);
- if (config != null)
+ setServer(server);
+ if (!domainSet || getDomain() == null)
{
- // Format of string is
- // PROTOCOL1(param1=value1;param2=value2):PROTOCOL2(param1=value1;param2=value2):
- // PROTOCOL1 is always the transport protocol.
- // We look for the singleton_name param in the transport protocol
- int tp_idx = config.indexOf("):");
- int singleton_idx = config.indexOf(Global.SINGLETON_NAME);
- if (singleton_idx > -1 && singleton_idx < tp_idx)
- {
- ch = createChannel(stack_name);
- }
- }
- return ch;
+ setDomain(name.getDomain());
+ }
+ return name;
}
- private void setChannelUniqueId(Channel channel) throws Exception
+
+ public void postRegister(Boolean registrationDone)
{
+ if (registrationDone != null && registrationDone.booleanValue()
+ && state == ServiceMBean.UNREGISTERED)
+ {
+ state = ServiceMBean.REGISTERED;
+ }
+ }
+
+
+ public void preDeregister() throws Exception
+ {
+ }
+
+ public void postDeregister()
+ {
+ setServer(null);
+ if (state == ServiceMBean.DESTROYED)
+ state = ServiceMBean.UNREGISTERED;
+ }
+
+ // ----------------------------------------------------------------- Private
+
+
+ private void setChannelUniqueId(Channel channel)
+ {
IpAddress address = (IpAddress) channel.getLocalAddress();
if (address == null)
{
@@ -349,14 +526,8 @@
}
else if (address.getAdditionalData() == null)
{
- Channel testee = channel;
- if (channel instanceof MuxChannel)
+ if (channel.isConnected())
{
- testee = ((MuxChannel) channel).getChannel();
- }
-
- if (testee.isConnected())
- {
throw new IllegalStateException("Underlying JChannel was " +
"connected before additional_data was set");
}
@@ -374,7 +545,7 @@
return ServiceMBean.states[state];
}
- private String generateUniqueNodeName () throws Exception
+ private String generateUniqueNodeName ()
{
// we first try to find a simple meaningful name:
// 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
@@ -420,4 +591,571 @@
return hostIP + ":" + uid;
}
+ private void fixChannelThreadPools(Channel channel) throws ChannelException
+ {
+ if (channel instanceof JChannel)
+ {
+ JChannel jchannel = (JChannel) channel;
+ String configStr = jchannel.getProperties();
+ ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
+ Map tpProps = getTransportProperties(config);
+ fixChannelThreadPools(jchannel, tpProps);
+ }
+ else
+ {
+ log.debug("Cannot fix thread pools for unknown Channel type " + channel.getClass().getName());
+ }
+ }
+
+ private Map getTransportProperties(ProtocolStackConfigurator config)
+ {
+ Map tpProps = null;
+ try
+ {
+ ProtocolData[] protocols=config.getProtocolStack();
+ ProtocolData transport=protocols[0];
+ tpProps = transport.getParameters();
+ }
+ catch (UnsupportedOperationException uoe)
+ {
+ // JGroups version hasn't implemented ProtocolStackConfigurator.getProtocolStack()
+ // So we do it ourselves
+ String configStr = config.getProtocolStackString();
+ String tpConfigStr = configStr.substring(configStr.indexOf('(') + 1, configStr.indexOf(')'));
+ String[] params = tpConfigStr.split(";");
+ tpProps = new HashMap();
+ for (String param : params)
+ {
+ String[] keyVal = param.split("=");
+ if (keyVal.length != 2)
+ throw new IllegalStateException("Invalid parameter " + param + " in stack " + configStr);
+ tpProps.put(keyVal[0], keyVal[1]);
+ }
+ }
+
+ return tpProps;
+ }
+
+ private ProtocolStackConfigurator addSingletonName(ProtocolStackConfigurator orig, String singletonName)
+ throws ChannelException
+ {
+ ProtocolStackConfigurator result = null;
+ try
+ {
+ ProtocolData[] protocols=orig.getProtocolStack();
+ ProtocolData transport=protocols[0];
+ Map tpProps = transport.getParameters();
+ tpProps.put(Global.SINGLETON_NAME, singletonName);
+ // we've now updated the state of orig; just return it
+ result = orig;
+ }
+ catch (UnsupportedOperationException uoe)
+ {
+ // JGroups version hasn't implemented ProtocolStackConfigurator.getProtocolStack()
+ // So we do things manually via string manipulation
+ String config = orig.getProtocolStackString();
+ int idx = config.indexOf('(') + 1;
+ StringBuilder builder = new StringBuilder(config.substring(0, idx));
+ builder.append(Global.SINGLETON_NAME);
+ builder.append('=');
+ builder.append(singletonName);
+ builder.append(';');
+ builder.append(config.substring(idx));
+
+ result = ConfiguratorFactory.getStackConfigurator(builder.toString());
+ }
+
+ return result;
+ }
+
+ private void fixChannelThreadPools(JChannel channel, Map props)
+ {
+ if (!channel.isConnected())
+ {
+ TransportThreadPoolProperties tpProps = new TransportThreadPoolProperties(props);
+
+ ProtocolStack stack = channel.getProtocolStack();
+ List<Protocol> protocols = stack.getProtocols();
+ for (int i = protocols.size() - 1; i >= 0; i--)
+ {
+ if (protocols.get(i) instanceof TP)
+ {
+ TP tp = (TP) protocols.get(i);
+ fixChannelThreadPools(tp, stack, tpProps);
+ break;
+ }
+ }
+ }
+ }
+
+ private void fixChannelThreadPools(TP tp, ProtocolStack stack, TransportThreadPoolProperties tpProps)
+ {
+ ClassLoader tccl = classLoaderSwitcher.getContextClassLoader();
+ if (tccl == null)
+ {
+ tccl = getClass().getClassLoader();
+ }
+ WeakReference<ClassLoader> tcclRef = new WeakReference(tccl);
+
+ if (tpProps.thread_pool_enabled && tp.getDefaultThreadPool() == null)
+ {
+ BlockingQueue<Runnable> thread_pool_queue = null;
+ if (tpProps.thread_pool_queue_enabled)
+ {
+ thread_pool_queue=new LinkedBlockingQueue<Runnable>(tpProps.thread_pool_queue_max_size);
+ }
+ else
+ {
+ thread_pool_queue=new SynchronousQueue<Runnable>();
+ }
+ ExecutorService thread_pool = createThreadPool(tpProps.thread_pool_min_threads,
+ tpProps.thread_pool_max_threads,
+ tpProps.thread_pool_keep_alive_time,
+ tpProps.thread_pool_rejection_policy,
+ thread_pool_queue,
+ "Incoming",
+ tpProps.thread_naming_pattern,
+ tcclRef);
+ tp.setDefaultThreadPool(thread_pool);
+
+ log.debug("Fixed default thread pool for " + tp);
+ }
+
+ if (tpProps.oob_thread_pool_enabled && tp.getOOBThreadPool() == null)
+ {
+ BlockingQueue<Runnable> oob_thread_pool_queue = null;
+ if(tpProps.oob_thread_pool_queue_enabled)
+ {
+ oob_thread_pool_queue=new LinkedBlockingQueue<Runnable>(tpProps.oob_thread_pool_queue_max_size);
+ }
+ else
+ {
+ oob_thread_pool_queue=new SynchronousQueue<Runnable>();
+ }
+ ExecutorService oob_thread_pool = createThreadPool(tpProps.oob_thread_pool_min_threads,
+ tpProps.oob_thread_pool_max_threads,
+ tpProps.oob_thread_pool_keep_alive_time,
+ tpProps.oob_thread_pool_rejection_policy,
+ oob_thread_pool_queue,
+ "OOB",
+ tpProps.thread_naming_pattern,
+ tcclRef);
+
+ tp.setOOBThreadPool(oob_thread_pool);
+
+ log.debug("Fixed OOB thread pool for " + tp);
+ }
+
+ ThreadFactory stackFactoryBase = ProtocolStack.newThreadFactory(Util.getGlobalThreadGroup(),"",false);
+ ThreadFactory stackFactory = new ClassLoaderControlThreadFactory(stackFactoryBase, tcclRef);
+ stack.setThreadFactory(stackFactory);
+
+ log.debug("Fixed thread factory for " + stack);
+
+ ThreadFactory timerFactoryBase = ProtocolStack.newThreadFactory(new ThreadGroup(Util.getGlobalThreadGroup(), "Timers"), "Timer", true);
+ ThreadFactory timerFactory = new ClassLoaderControlThreadFactory(timerFactoryBase, tcclRef);
+ stack.setTimerThreadFactory(timerFactory);
+
+ log.debug("Fixed timer thread factory for " + stack);
+ }
+
+ private ExecutorService createThreadPool(int min_threads, int max_threads,
+ long keep_alive_time, String rejection_policy,
+ BlockingQueue<Runnable> queue, final String thread_name,
+ ThreadNamingPattern thread_naming_pattern,
+ WeakReference<ClassLoader> tcclRef)
+ {
+ ThreadPoolExecutor pool=new RestoreTCCLThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue, tcclRef);
+
+ ThreadGroup pool_thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "Thread Pools");
+ // FIXME: replace with call to ProtocolStack.newIDThreadFactory(...)
+ // when we integrate JGroups 2.6.3 or later
+ ThreadFactory base = new IdThreadFactory(new DefaultThreadFactory(pool_thread_group, thread_name, false),
+ thread_naming_pattern);
+ ThreadFactory threadFactory = new ClassLoaderControlThreadFactory(base, tcclRef);
+ pool.setThreadFactory(threadFactory);
+
+ if(rejection_policy != null) {
+ if(rejection_policy.equals("abort"))
+ pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+ else if(rejection_policy.equals("discard"))
+ pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ else if(rejection_policy.equals("discardoldest"))
+ pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
+ else
+ pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ return pool;
+ }
+
+ /**
+ * Sets the context class loader on <code>thread</code> to the classloader
+ * in effect when this factory was instantiated.
+ *
+ * @param thread the thread to set
+ */
+ private void setDefaultThreadContextClassLoader(Thread thread, WeakReference<ClassLoader> classLoaderRef)
+ {
+ ClassLoader cl = classLoaderRef.get();
+ if (cl == null)
+ cl = getClass().getClassLoader();
+ classLoaderSwitcher.setContextClassLoader(thread, cl);
+ }
+
+ private void registerChannel(JChannel ch, String channelId) throws Exception
+ {
+ if(getServer() != null)
+ {
+ // Register for channel closed notification so we can unregister
+ JmxDeregistrationChannelListener listener = new JmxDeregistrationChannelListener(channelId);
+ ch.addChannelListener(listener);
+ JmxConfigurator.registerChannel(ch, getServer(), getDomain(), channelId, isExposeProtocols());
+ synchronized (registeredChannels)
+ {
+ registeredChannels.add(channelId);
+ }
+ }
+ }
+
+ private void unregister(String channelId)
+ {
+ if(getServer() != null)
+ {
+ String oname = getDomain() + ":type=channel,cluster=" + channelId;
+ try
+ {
+ getServer().unregisterMBean(new ObjectName(oname));
+ oname = getDomain() + ":type=protocol,cluster=" + channelId + ",*";
+ JmxConfigurator.unregister(getServer(), oname);
+ synchronized (registeredChannels)
+ {
+ registeredChannels.remove(channelId);
+ }
+ }
+ catch(Exception e)
+ {
+ log.error("failed unregistering " + oname, e);
+ }
+
+
+ }
+ }
+
+ /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
+ static class DefaultThreadFactory implements ThreadFactory
+ {
+ private final ThreadGroup group;
+ private final String baseName;
+ private final boolean createDaemons;
+
+ public DefaultThreadFactory(ThreadGroup group,
+ String baseName,
+ boolean createDaemons)
+ {
+ this.group = group;
+ this.baseName = baseName;
+ this.createDaemons = createDaemons;
+ }
+
+ public Thread newThread(Runnable r, String name)
+ {
+ return newThread(group, r, name);
+ }
+
+ public Thread newThread(Runnable r)
+ {
+ return newThread(group, r, baseName);
+ }
+
+ public Thread newThread(ThreadGroup group, Runnable r, String name)
+ {
+ Thread thread = new Thread(group, r, name);
+ thread.setDaemon(createDaemons);
+ return thread;
+ }
+ }
+
+ /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
+ protected static class PatternedThreadFactory implements ThreadFactory
+ {
+ protected final ThreadFactory f;
+ protected ThreadNamingPattern pattern;
+
+ public PatternedThreadFactory(ThreadFactory factory, ThreadNamingPattern pattern)
+ {
+ f = factory;
+ this.pattern = pattern;
+ }
+
+ public void setThreadNamingPattern(ThreadNamingPattern pattern)
+ {
+ this.pattern = pattern;
+ }
+
+ public Thread newThread(Runnable r, String name)
+ {
+ Thread newThread = f.newThread(r, name);
+ renameThread(newThread);
+ return newThread;
+ }
+
+ public Thread newThread(Runnable r)
+ {
+ Thread newThread = f.newThread(r);
+ renameThread(newThread);
+ return newThread;
+ }
+
+ public Thread newThread(ThreadGroup group, Runnable r, String name)
+ {
+ Thread newThread = f.newThread(group, r, name);
+ renameThread(newThread);
+ return newThread;
+ }
+
+ protected void renameThread(Thread new_thread)
+ {
+ if(pattern!=null)
+ pattern.renameThread(new_thread);
+ }
+ }
+
+ /** This is a direct copy from the pre-2.6.3 JGroups code base -- keep it in sync!! */
+ private static class IdThreadFactory extends PatternedThreadFactory
+ {
+ short current_id=0;
+
+ public IdThreadFactory(ThreadFactory factory, ThreadNamingPattern pattern)
+ {
+ super(factory, pattern);
+ }
+
+ protected void renameThread(Thread new_thread)
+ {
+ if(pattern != null)
+ {
+ short id;
+ synchronized(this)
+ {
+ id=++current_id;
+ }
+ pattern.renameThread(new_thread.getName() + "-" + id, new_thread);
+ }
+ }
+ }
+
+ private class ClassLoaderControlThreadFactory implements ThreadFactory
+ {
+ private final ThreadFactory f;
+ private final WeakReference<ClassLoader> classLoaderRef;
+
+ public ClassLoaderControlThreadFactory(ThreadFactory factory, WeakReference<ClassLoader> classLoaderRef)
+ {
+ this.f = factory;
+ this.classLoaderRef = classLoaderRef;
+ }
+
+
+ public Thread newThread(Runnable r, String name)
+ {
+ Thread newThread = f.newThread(r, name);
+ setDefaultThreadContextClassLoader(newThread, classLoaderRef);
+ return newThread;
+ }
+
+ public Thread newThread(Runnable r)
+ {
+ Thread newThread = f.newThread(r);
+ setDefaultThreadContextClassLoader(newThread, classLoaderRef);
+ return newThread;
+ }
+
+ public Thread newThread(ThreadGroup group, Runnable r, String name)
+ {
+ Thread newThread = f.newThread(group, r, name);
+ setDefaultThreadContextClassLoader(newThread, classLoaderRef);
+ return newThread;
+ }
+ }
+
+ private class RestoreTCCLThreadPoolExecutor extends ThreadPoolExecutor
+ {
+ private final WeakReference<ClassLoader> classLoaderRef;
+
+ public RestoreTCCLThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ WeakReference<ClassLoader> classLoaderRef)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ this.classLoaderRef = classLoaderRef;
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t)
+ {
+ try
+ {
+ super.afterExecute(r, t);
+ }
+ finally
+ {
+ setDefaultThreadContextClassLoader(Thread.currentThread(), classLoaderRef);
+ }
+ }
+ }
+
+ // Yikes! We recreate all the TP.setProperties stuff here. :(
+ private class TransportThreadPoolProperties
+ {
+ private ThreadNamingPattern thread_naming_pattern = new ThreadNamingPattern("cl");
+ private boolean oob_thread_pool_enabled=true;
+ private int oob_thread_pool_min_threads=2;
+ private int oob_thread_pool_max_threads=10;
+ private long oob_thread_pool_keep_alive_time=30000;
+ private boolean oob_thread_pool_queue_enabled=true;
+ private int oob_thread_pool_queue_max_size=500;
+ private String oob_thread_pool_rejection_policy="Run";
+
+ private boolean thread_pool_enabled=true;
+ private int thread_pool_min_threads=2;
+ private int thread_pool_max_threads=10;
+ private long thread_pool_keep_alive_time=30000;
+ private boolean thread_pool_queue_enabled=true;
+ private int thread_pool_queue_max_size=500;
+ private String thread_pool_rejection_policy="Run";
+
+ TransportThreadPoolProperties(Map props)
+ {
+ String str = (String) props.get("thread_naming_pattern");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ thread_naming_pattern=new ThreadNamingPattern(str);
+ }
+
+ // ================= OOB thread pool =============================
+
+ str = (String) props.get("oob_thread_pool.enabled");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ oob_thread_pool_enabled=Boolean.valueOf(str).booleanValue();
+ }
+
+ str = (String) props.get("oob_thread_pool.min_threads");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ oob_thread_pool_min_threads=Integer.valueOf(str).intValue();
+ }
+
+ str = (String) props.get("oob_thread_pool.max_threads");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ oob_thread_pool_max_threads=Integer.valueOf(str).intValue();
+ }
+
+ str = (String) props.get("oob_thread_pool.keep_alive_time");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ oob_thread_pool_keep_alive_time=Long.valueOf(str).longValue();
+ }
+
+ str = (String) props.get("oob_thread_pool.queue_enabled");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ oob_thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
+ }
+
+ str = (String) props.get("oob_thread_pool.queue_max_size");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ oob_thread_pool_queue_max_size=Integer.valueOf(str).intValue();
+ }
+
+ str = (String) props.get("oob_thread_pool.rejection_policy");
+ if(str != null)
+ {
+ str = str.toLowerCase().trim();
+ str = StringPropertyReplacer.replaceProperties(str);
+ oob_thread_pool_rejection_policy = str;
+ }
+
+ // ================= Regular thread pool =============================
+
+ str = (String) props.get("thread_pool.enabled");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ thread_pool_enabled=Boolean.valueOf(str).booleanValue();
+ }
+
+ str = (String) props.get("thread_pool.min_threads");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ thread_pool_min_threads=Integer.valueOf(str).intValue();
+ }
+
+ str = (String) props.get("thread_pool.max_threads");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ thread_pool_max_threads=Integer.valueOf(str).intValue();
+ }
+
+ str = (String) props.get("thread_pool.keep_alive_time");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ thread_pool_keep_alive_time=Long.valueOf(str).longValue();
+ }
+
+ str = (String) props.get("thread_pool.queue_enabled");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue();
+ }
+
+ str = (String) props.get("thread_pool.queue_max_size");
+ if(str != null)
+ {
+ str = StringPropertyReplacer.replaceProperties(str);
+ thread_pool_queue_max_size=Integer.valueOf(str).intValue();
+ }
+
+ str = (String) props.get("thread_pool.rejection_policy");
+ if(str != null)
+ {
+ str=str.toLowerCase().trim();
+ str = StringPropertyReplacer.replaceProperties(str);
+ thread_pool_rejection_policy=str;
+ }
+ }
+ }
+
+
+
+ private class JmxDeregistrationChannelListener extends ChannelListenerAdapter
+ {
+ private final String channelId;
+
+ JmxDeregistrationChannelListener(String channelId)
+ {
+ this.channelId = channelId;
+ }
+
+ public void channelClosed(Channel channel)
+ {
+ unregister(channelId);
+ }
+ }
+
}
Modified: trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml
===================================================================
--- trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml 2008-04-16 20:46:57 UTC (rev 72316)
+++ trunk/cluster/src/resources/jgroups/jgroups-channelfactory-beans.xml 2008-04-16 20:48:54 UTC (rev 72317)
@@ -15,7 +15,12 @@
<property name="server"><inject bean="JMXKernel" property="mbeanServer"/></property>
<property name="domain">jboss.jgroups</property>
<property name="exposeChannels">true</property>
- <property name="exposeProtocols">true</property>
+ <property name="exposeProtocols">true</property>
+
+ <property name="manageChannelThreadPools">true</property>
+
+ <!-- Whether or not to assign a unique name to nodes -->
+ <property name="assignLogicalAddresses">true</property>
<!-- The address used to determine the node name -->
<property name="nodeAddress">${jboss.bind.address}</property>
Modified: trunk/testsuite/local.properties
===================================================================
--- trunk/testsuite/local.properties 2008-04-16 20:46:57 UTC (rev 72316)
+++ trunk/testsuite/local.properties 2008-04-16 20:48:54 UTC (rev 72317)
@@ -49,12 +49,12 @@
# (even at the same machine). Actually what we needed are just node0 and node1
# ips and the rest are optional.
#
-#node0=${env.MYTESTIP_1}
+node0=${env.MYTESTIP_1}
#node0.http.url=http://192.168.1.103:8080
#node0.jndiurl=jnp://192.168.1.103:1099
#node0.hajndi.url=jnp://192.168.1.103:1100
-#node1=${env.MYTESTIP_2}
+node1=${env.MYTESTIP_2}
#node1.http.url=http://192.168.1.113:8080
#node1.jndiurl=jnp://192.168.1.113:1099
#node1.hajndi.url=jnp://192.168.1.113:1100
Added: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java (rev 0)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryClassLoaderLeakTestCase.java 2008-04-16 20:48:54 UTC (rev 72317)
@@ -0,0 +1,448 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.cluster.defaultcfg.test;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.ha.framework.server.JChannelFactory;
+import org.jboss.logging.Logger;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestHandler;
+
+/**
+ * Tests classloader leak handling of {@link JChannelFactory}.
+ *
+ * @author Brian Stansberry
+ */
+public class JChannelFactoryClassLoaderLeakTestCase extends TestCase
+{
+ private static final Logger log = Logger.getLogger(JChannelFactoryClassLoaderLeakTestCase.class);
+
+ private static Method OBJECT_ARG = null;
+ private static Method STRING_ARG = null;
+ private static Method SIMPLE_MUX = null;
+ private static Method COMPLEX_MUX = null;
+
+ static
+ {
+ Class clazz = JChannelFactory.class;
+ try
+ {
+ OBJECT_ARG = clazz.getDeclaredMethod("createChannel", new Class[] { Object.class });
+ STRING_ARG = clazz.getDeclaredMethod("createChannel", new Class[] { String.class });
+ SIMPLE_MUX = clazz.getDeclaredMethod("createMultiplexerChannel", new Class[] { String.class, String.class });
+ COMPLEX_MUX = clazz.getDeclaredMethod("createMultiplexerChannel", new Class[] { String.class, String.class, boolean.class, String.class });
+ }
+ catch (NoSuchMethodException nsme)
+ {
+ log.error("Reflection failure", nsme);
+ }
+ }
+
+ private JChannelFactory factory1;
+ private JChannelFactory factory2;
+ private Channel channel1;
+ private Channel channel2;
+ private String jgroups_bind_addr;
+
+ /**
+ * Create a new JChannelFactoryUnitTestCase.
+ *
+ * @param name
+ */
+ public JChannelFactoryClassLoaderLeakTestCase(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ if (COMPLEX_MUX == null)
+ throw new IllegalStateException("Reflection failed in class init; see logs");
+
+ super.setUp();
+
+ String jgroups_bind_addr = System.getProperty("jgroups.bind_addr");
+ if (jgroups_bind_addr == null)
+ {
+ System.setProperty("jbosstest.cluster.node0", System.getProperty("jbosstest.cluster.node0", "localhost"));
+ }
+
+ factory1 = new JChannelFactory();
+ factory1.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
+ factory1.setAssignLogicalAddresses(false);
+ factory1.setExposeChannels(false);
+ factory1.create();
+ factory1.start();
+ factory2 = new JChannelFactory();
+ factory2.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
+ factory2.setAssignLogicalAddresses(false);
+ factory2.setExposeChannels(false);
+ factory2.create();
+ factory2.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ if (jgroups_bind_addr == null)
+ System.clearProperty("jgroups.bind_addr");
+
+ if (channel1 != null && channel1.isOpen())
+ channel1.close();
+
+ if (channel2 != null && channel2.isOpen())
+ channel2.close();
+
+ if (factory1 != null)
+ {
+ factory1.stop();
+ factory1.destroy();
+ }
+ if (factory2 != null)
+ {
+ factory2.stop();
+ factory2.destroy();
+ }
+ }
+
+ public void testClassLoaderLeakObjectShared() throws Exception
+ {
+ Object[] args1 = { factory1.getConfig("shared1") };
+ Object[] args2 = { factory2.getConfig("shared2") };
+ classloaderLeakTest(OBJECT_ARG, args1, args2);
+ }
+
+ public void testClassLoaderLeakObjectUnshared() throws Exception
+ {
+ Object[] args1 = { factory1.getConfig("unshared1") };
+ Object[] args2 = { factory2.getConfig("unshared2") };
+ classloaderLeakTest(OBJECT_ARG, args1, args2);
+ }
+
+ public void testClassLoaderLeakStringShared() throws Exception
+ {
+ Object[] args1 = { "shared1" };
+ Object[] args2 = { "shared2" };
+ classloaderLeakTest(STRING_ARG, args1, args2);
+ }
+
+ public void testClassLoaderLeakStringUnshared() throws Exception
+ {
+ Object[] args1 = { "unshared1" };
+ Object[] args2 = { "unshared2" };
+ classloaderLeakTest(STRING_ARG, args1, args2);
+ }
+
+ public void testClassLoaderLeakSimpleMuxShared() throws Exception
+ {
+ Object[] args1 = { "shared1", "leaktest" };
+ Object[] args2 = { "shared2", "leaktest" };
+ classloaderLeakTest(SIMPLE_MUX, args1, args2);
+ }
+
+ public void testClassLoaderLeakSimpleMuxUnshared() throws Exception
+ {
+ Object[] args1 = { "unshared1", "leaktest" };
+ Object[] args2 = { "unshared2", "leaktest" };
+ classloaderLeakTest(SIMPLE_MUX, args1, args2);
+ }
+
+ public void testClassLoaderLeakComplexMuxShared() throws Exception
+ {
+ Object[] args1 = { "shared1", "leaktest", Boolean.FALSE, null };
+ Object[] args2 = { "shared2", "leaktest", Boolean.FALSE, null };
+ classloaderLeakTest(COMPLEX_MUX, args1, args2);
+ }
+
+ public void testClassLoaderLeakComplexMuxUnshared() throws Exception
+ {
+ Object[] args1 = { "unshared1", "leaktest", Boolean.FALSE, null };
+ Object[] args2 = { "unshared2", "leaktest", Boolean.FALSE, null };
+ classloaderLeakTest(COMPLEX_MUX, args1, args2);
+ }
+
+ public void testClassLoaderLeakNonConcurrent() throws Exception
+ {
+ Object[] args1 = { "nonconcurrent1" };
+ Object[] args2 = { "nonconcurrent2" };
+ classloaderLeakTest(STRING_ARG, args1, args2);
+ }
+
+ private void classloaderLeakTest(Method factoryMeth, Object[] factory1Args,
+ Object[] factory2Args)
+ throws Exception
+ {
+ int numThreads = 8;
+ int numLoops = 100;
+
+ Semaphore semaphore = new Semaphore(numThreads);
+
+ ThreadGroup runnerGroup = new ThreadGroup("TestRunners");
+
+ ClassLoader ours = Thread.currentThread().getContextClassLoader();
+
+ // The classloader we want channel threads to use
+ ClassLoader cl = new ClassLoader(ours){};
+ ClassLoaderLeakHandler handler = new ClassLoaderLeakHandler(cl, semaphore, runnerGroup);
+
+ MessageDispatcher[] dispatchers = new MessageDispatcher[2];
+
+ Thread.currentThread().setContextClassLoader(cl);
+ try
+ {
+ channel1 = (Channel) factoryMeth.invoke(factory1, factory1Args);
+ dispatchers[0] = new MessageDispatcher(channel1, handler, handler, handler);
+
+ channel1.connect("leaktest");
+ assertEquals("No classloader leak on channel1 connect", null, handler.getLeakedClassLoader());
+
+ channel2 = (Channel) factoryMeth.invoke(factory2, factory2Args);
+ dispatchers[1] = new MessageDispatcher(channel2, handler, handler, handler);
+
+ channel2.connect("leaktest");
+ assertEquals("No classloader leak on channel2 connect", null, handler.getLeakedClassLoader());
+ }
+ finally
+ {
+ Thread.currentThread().setContextClassLoader(ours);
+ }
+
+ log.info("Channels connected");
+
+ ClassLoaderLeakRunner[] runners = new ClassLoaderLeakRunner[numThreads];
+
+ for (int i = 0; i < runners.length; i++)
+ {
+ MessageDispatcher disp = dispatchers[i % 2];
+
+ runners[i] = new ClassLoaderLeakRunner(disp, numLoops, runnerGroup, semaphore);
+ }
+
+ semaphore.acquire(numThreads);
+
+ for (int i = 0; i < runners.length; i++)
+ {
+ runners[i].start();
+ }
+
+ semaphore.release(numThreads);
+
+ try
+ {
+ assertTrue("messages received within 15 seconds", semaphore.tryAcquire(numThreads, 15, TimeUnit.SECONDS));
+
+ log.info("Messages received");
+ }
+ finally
+ {
+ for (int i = 0; i < runners.length; i++)
+ {
+ runners[i].stop();
+ }
+ }
+
+ log.info("Sender threads stopped");
+
+ assertEquals("No classloader leak", null, handler.getLeakedClassLoader());
+ }
+
+ private class ClassLoaderLeakRunner implements Runnable
+ {
+ private Thread thread;
+ private final MessageDispatcher dispatcher;
+ private final int numMsgs;
+ private final ThreadGroup threadGroup;
+ private final Semaphore semaphore;
+ private boolean stopped;
+ private Exception exception;
+
+ ClassLoaderLeakRunner(MessageDispatcher dispatcher, int numMsgs, ThreadGroup group, Semaphore semaphore)
+ {
+ this.dispatcher = dispatcher;
+ this.numMsgs = numMsgs;
+ this.threadGroup = group;
+ this.semaphore = semaphore;
+ }
+
+ public void run()
+ {
+ boolean acquired = false;
+ ClassLoader cl = new ClassLoader(Thread.currentThread().getContextClassLoader()){};
+ try
+ {
+ semaphore.acquire();
+ acquired = true;
+
+ log.info(Thread.currentThread().getName() + " starting");
+
+ Thread.currentThread().setContextClassLoader(cl);
+ for (int i = 0; i < numMsgs && !stopped && !Thread.interrupted(); i++)
+ {
+ Message msg = new Message(null, null, String.valueOf(i));
+ // sending this way calls receive()
+ dispatcher.send(msg);
+ // sending this way calls handle()
+ dispatcher.castMessage(null, msg, GroupRequest.GET_ALL, 0, false);
+ }
+
+ log.info(Thread.currentThread().getName() + " done");
+ }
+ catch (Exception e)
+ {
+ this.exception = e;
+ }
+ finally
+ {
+ if (acquired)
+ semaphore.release();
+
+ Thread.currentThread().setContextClassLoader(cl.getParent());
+ }
+ }
+
+ public Exception getException()
+ {
+ return exception;
+ }
+
+ public void start()
+ {
+ thread = new Thread(this.threadGroup, this);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void stop()
+ {
+ stopped = true;
+ if (thread != null && thread.isAlive())
+ {
+ try
+ {
+ thread.join(100);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ if (thread.isAlive())
+ thread.interrupt();
+ }
+ }
+ }
+
+ private class ClassLoaderLeakHandler
+ implements MembershipListener, MessageListener, RequestHandler
+ {
+ private final Semaphore semaphore;
+ private final ClassLoader expected;
+ private final ThreadGroup runnerGroup;
+ private ClassLoader leakedClassLoader;
+ private final int numPermits;
+
+ ClassLoaderLeakHandler(ClassLoader expected, Semaphore semaphore, ThreadGroup runnerGroup)
+ {
+ this.expected = expected;
+ this.semaphore = semaphore;
+ this.runnerGroup = runnerGroup;
+ this.numPermits = this.semaphore.availablePermits();
+ }
+
+ public Object handle(Message msg)
+ {
+ log.debug("handled(): " + msg.getObject());
+ checkClassLoader(true, "handle()");
+ return null;
+ }
+
+ public void block()
+ {
+ checkClassLoader(false, "block()");
+ }
+
+ public void suspect(Address suspected_mbr)
+ {
+ checkClassLoader(false, "suspect()");
+ }
+
+ public void viewAccepted(View new_view)
+ {
+ checkClassLoader(false, "viewAccepted()");
+ log.info("viewAccepted(): " + new_view);
+ }
+
+ public byte[] getState()
+ {
+ checkClassLoader(false, "getState()");
+ return new byte[1];
+ }
+
+ public void receive(Message msg)
+ {
+ checkClassLoader(false, "receive()");
+ }
+
+ public void setState(byte[] state)
+ {
+ checkClassLoader(false, "setState()");
+ }
+
+ public ClassLoader getLeakedClassLoader()
+ {
+ return leakedClassLoader;
+ }
+
+ private void checkClassLoader(boolean fromHandle, String method)
+ {
+ if (leakedClassLoader == null) // ignore msgs once we found a leak
+ {
+ // ignore runner threads that loop all the way back up
+ if (Thread.currentThread().getThreadGroup().equals(runnerGroup))
+ {
+ return;
+ }
+
+ ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ if (!expected.equals(tccl))
+ {
+ leakedClassLoader = tccl;
+ semaphore.release(numPermits);
+ log.info("ClassLoader leaked in " + method + ": " + tccl + " leaked to " + Thread.currentThread().getName());
+ }
+ }
+ }
+
+ }
+}
Added: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java (rev 0)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/JChannelFactoryUnitTestCase.java 2008-04-16 20:48:54 UTC (rev 72317)
@@ -0,0 +1,315 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.cluster.defaultcfg.test;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.jboss.ha.framework.server.JChannelFactory;
+import org.jboss.logging.Logger;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.jgroups.mux.MuxChannel;
+import org.jgroups.protocols.TP;
+import org.jgroups.stack.IpAddress;
+import org.jgroups.stack.Protocol;
+
+/**
+ * Basic tests of the AS-specific JChannelFactory
+ * @author Brian Stansberry
+ */
+public class JChannelFactoryUnitTestCase extends TestCase
+{
+ private static final Logger log = Logger.getLogger(JChannelFactoryUnitTestCase.class);
+
+ private JChannelFactory factory1;
+ private JChannelFactory factory2;
+ private Channel channel1;
+ private Channel channel2;
+ private Channel channel3;
+ private Channel channel4;
+ private String jgroups_bind_addr;
+ private MBeanServer mbeanServer;
+
+ /**
+ * Create a new JChannelFactoryUnitTestCase.
+ *
+ * @param name
+ */
+ public JChannelFactoryUnitTestCase(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ String jgroups_bind_addr = System.getProperty("jgroups.bind_addr");
+ if (jgroups_bind_addr == null)
+ {
+ System.setProperty("jbosstest.cluster.node0", System.getProperty("jbosstest.cluster.node0", "localhost"));
+ }
+
+ factory1 = new JChannelFactory();
+ factory1.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
+ factory1.setAssignLogicalAddresses(false);
+ factory1.setNodeAddress(InetAddress.getByName("localhost"));
+ factory1.setNamingServicePort(123);
+ factory1.setExposeChannels(false);
+ factory1.create();
+ factory1.start();
+ factory2 = new JChannelFactory();
+ factory2.setMultiplexerConfig("cluster/channelfactory/stacks.xml");
+ factory2.setAssignLogicalAddresses(false);
+ factory2.setNodeAddress(InetAddress.getByName("localhost"));
+ factory2.setNamingServicePort(456);
+ factory2.setNodeName("node1");
+ factory2.setExposeChannels(false);
+ factory2.create();
+ factory2.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ if (mbeanServer != null)
+ MBeanServerFactory.releaseMBeanServer(mbeanServer);
+
+ if (jgroups_bind_addr == null)
+ System.clearProperty("jgroups.bind_addr");
+
+ if (channel1 != null && channel1.isOpen())
+ channel1.close();
+
+ if (channel2 != null && channel2.isOpen())
+ channel2.close();
+
+ if (channel3 != null && channel3.isOpen())
+ channel3.close();
+
+ if (channel4 != null && channel4.isOpen())
+ channel4.close();
+
+ if (factory1 != null)
+ {
+ factory1.stop();
+ factory1.destroy();
+ }
+ if (factory2 != null)
+ {
+ factory2.stop();
+ factory1.destroy();
+ }
+ }
+
+ public void testNoSingletonName() throws Exception
+ {
+ log.info("+++ testNoSingletonName()");
+
+ // Assert the stack is as expected
+ String cfg1 = factory1.getConfig("unshared1");
+ assertTrue("no singleton_name in unshared1", cfg1.indexOf("singleton_name") < 0);
+ String cfg2 = factory1.getConfig("unshared2");
+ assertTrue("no singleton_name in unshared2", cfg2.indexOf("singleton_name") < 0);
+
+ channel1 = factory1.createChannel("unshared1");
+ assertFalse(channel1 instanceof MuxChannel);
+ channel1.connect("test");
+ TP tp1 = getTP((JChannel) channel1);
+
+ channel2 = factory2.createChannel("unshared2");
+ assertFalse(channel2 instanceof MuxChannel);
+ channel2.connect("test");
+ TP tp2 = getTP((JChannel) channel2);
+
+ assertNotSame(tp1, tp2);
+
+ channel3 = factory1.createMultiplexerChannel("unshared1", "test");
+ assertFalse(channel3 instanceof MuxChannel);
+ channel3.connect("test");
+ TP tp3 = getTP((JChannel) channel3);
+
+ assertNotSame(tp1, tp3);
+ assertNotSame(tp2, tp3);
+
+ channel4 = factory1.createMultiplexerChannel("unshared1", "test2");
+ assertFalse(channel4 instanceof MuxChannel);
+ channel4.connect("test2");
+ TP tp4 = getTP((JChannel) channel4);
+
+ assertSame(tp3, tp4);
+ }
+
+ /**
+ * Confirms that thread pool configurations are as expected following
+ * any massaging by the factory
+ */
+ public void testThreadPoolConfig() throws Exception
+ {
+ log.info("+++ testThreadPoolConfig()");
+
+ channel1 = factory1.createChannel("queues");
+ channel1.connect("test");
+ TP tp1 = getTP((JChannel) channel1);
+
+ assertEquals(3000, tp1.getIncomingKeepAliveTime());
+ assertEquals(22, tp1.getIncomingMaxPoolSize());
+ assertEquals(2, tp1.getIncomingMinPoolSize());
+ assertEquals(750, tp1.getIncomingMaxQueueSize());
+ assertEquals(4000, tp1.getOOBKeepAliveTime());
+ assertEquals(12, tp1.getOOBMaxPoolSize());
+ assertEquals(3, tp1.getOOBMinPoolSize());
+ assertEquals(75, tp1.getOOBMaxQueueSize());
+
+ Executor exec = tp1.getDefaultThreadPool();
+ assertNotNull(exec);
+ if (exec instanceof ThreadPoolExecutor)
+ {
+ assertTrue(((ThreadPoolExecutor) exec).getRejectedExecutionHandler() instanceof ThreadPoolExecutor.DiscardPolicy);
+ }
+
+ exec = tp1.getOOBThreadPool();
+ assertNotNull(exec);
+ if (exec instanceof ThreadPoolExecutor)
+ {
+ assertTrue(((ThreadPoolExecutor) exec).getRejectedExecutionHandler() instanceof ThreadPoolExecutor.AbortPolicy);
+ }
+
+ // Confirm that the no-pool config doesn't create a pool
+ channel2 = factory1.createChannel("nothreads1");
+ channel2.connect("test");
+ TP tp2 = getTP((JChannel) channel2);
+
+ assertFalse(tp2.getDefaultThreadPool() instanceof ThreadPoolExecutor);
+ assertFalse(tp2.getOOBThreadPool() instanceof ThreadPoolExecutor);
+ }
+
+ public void testJmxHandling() throws Exception
+ {
+ log.info("+++ testJmxHandling()");
+
+ mbeanServer = MBeanServerFactory.createMBeanServer("jchannelfactorytest");
+
+ ObjectName factoryName1 = new ObjectName("jboss.test:service=TestChannelFactory1");
+ ObjectName factoryName2 = new ObjectName("jboss.test:service=TestChannelFactory2");
+
+ // destroy the factories so we can start clean
+ factory1.stop();
+ factory1.destroy();
+ factory2.stop();
+ factory2.destroy();
+
+ factory1.setExposeChannels(true);
+ factory1.setExposeProtocols(false);
+
+ factory2.setExposeChannels(true);
+ factory2.setExposeProtocols(true);
+
+ mbeanServer.registerMBean(factory1, factoryName1);
+
+ assertSame(mbeanServer, factory1.getServer());
+ assertEquals(factoryName1.getDomain(), factory1.getDomain());
+
+ mbeanServer.registerMBean(factory2, factoryName2);
+
+ factory1.create();
+ factory1.start();
+ factory2.create();
+ factory2.start();
+
+ channel1 = factory1.createMultiplexerChannel("shared1", "shared");
+ ObjectName chName1 = new ObjectName("jboss.test:type=channel,cluster=shared");
+ assertTrue(chName1 + " registered", mbeanServer.isRegistered(chName1));
+ ObjectName udpName1 = new ObjectName("jboss.test:type=channel,cluster=shared,protocol=UDP");
+ assertFalse(udpName1 + " not registered", mbeanServer.isRegistered(udpName1));
+
+ channel2 = factory2.createMultiplexerChannel("unshared1", "unshared");
+ ObjectName chName2 = new ObjectName("jboss.test:type=channel,cluster=unshared");
+ assertTrue(chName2 + " registered", mbeanServer.isRegistered(chName2));
+ ObjectName udpName2 = new ObjectName("jboss.test:type=protocol,cluster=unshared,protocol=UDP");
+ assertTrue(udpName2 + " registered", mbeanServer.isRegistered(udpName2));
+
+ channel1.connect("shared");
+ assertTrue(chName1 + " still registered", mbeanServer.isRegistered(chName1));
+ assertTrue(chName2 + " still registered", mbeanServer.isRegistered(chName2));
+
+ log.info("closing channel 1");
+
+ channel1.close();
+ assertFalse(chName1 + " unregistered", mbeanServer.isRegistered(chName1));
+ assertTrue(chName2 + " not unregistered", mbeanServer.isRegistered(chName2));
+ assertTrue(udpName2 + " not unregistered", mbeanServer.isRegistered(udpName2));
+
+ log.info("stopping factory2");
+
+ factory2.stop();
+ factory2.destroy();
+ assertFalse(chName2 + " unregistered", mbeanServer.isRegistered(chName2));
+ assertFalse(udpName2 + " unregistered", mbeanServer.isRegistered(udpName2));
+ }
+
+ public void testLogicalAddressAssignment() throws Exception
+ {
+ log.info("+++ testLogicalAddressAssignment()");
+
+ channel1 = factory1.createChannel("shared1");
+ channel1.connect("shared");
+ IpAddress addr = (IpAddress) channel1.getLocalAddress();
+ assertEquals(null, addr.getAdditionalData());
+
+ factory1.setAssignLogicalAddresses(true);
+ factory2.setAssignLogicalAddresses(true);
+
+ channel2 = factory1.createChannel("shared2");
+ channel2.connect("shared");
+ addr = (IpAddress) channel2.getLocalAddress();
+ byte[] addlData = addr.getAdditionalData();
+ assertNotNull(addlData);
+ assertEquals("127.0.0.1:123", new String(addlData));
+
+ channel3 = factory2.createChannel("unshared1");
+ channel3.connect("unshared");
+ addr = (IpAddress) channel3.getLocalAddress();
+ addlData = addr.getAdditionalData();
+ assertNotNull(addlData);
+ assertEquals("node1", new String(addlData));
+ }
+
+ private TP getTP(JChannel channel)
+ {
+ List<Protocol> protocols = channel.getProtocolStack().getProtocols();
+ return (TP) protocols.get(protocols.size() -1);
+ }
+
+
+}
Added: trunk/testsuite/src/resources/cluster/channelfactory/stacks.xml
===================================================================
--- trunk/testsuite/src/resources/cluster/channelfactory/stacks.xml (rev 0)
+++ trunk/testsuite/src/resources/cluster/channelfactory/stacks.xml 2008-04-16 20:48:54 UTC (rev 72317)
@@ -0,0 +1,404 @@
+<!--
+ JGroups protocol stacks definitions used by the JChannelFactory unit tests.
+ Version: $Id:jgroups-channelfactory-stacks.xml 71313 2008-03-26 19:46:59Z bstansberry at jboss.com $
+-->
+<protocol_stacks>
+
+ <stack name="shared1"
+ description="shared stack1">
+ <config>
+ <UDP
+ singleton_name="shared1"
+ mcast_port="${jgroups.udp.mcast_port:35688}"
+ mcast_addr="${jgroups.udp.mcast_addr:228.111.111.111}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ enable_bundling="false"
+
+ use_concurrent_stack="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="200"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="1000"
+ thread_pool.rejection_policy="Run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="Run"/>
+ <PING timeout="1000" num_initial_members="1"/>
+ <MERGE2 max_interval="100000" min_interval="20000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500"/>
+ <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ shun="true"
+ view_bundling="true"
+ view_ack_collection_timeout="5000"/>
+ <FC max_credits="2000000" min_threshold="0.10"/>
+ <FRAG2 frag_size="60000"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+ </config>
+ </stack>
+
+ <stack name="shared2"
+ description="shared stack2">
+ <config>
+ <UDP
+ singleton_name="shared2"
+ mcast_port="${jgroups.udp.mcast_port:35688}"
+ mcast_addr="${jgroups.udp.mcast_addr:228.111.111.111}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ enable_bundling="false"
+
+ use_concurrent_stack="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="200"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="1000"
+ thread_pool.rejection_policy="Run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="Run"/>
+ <PING timeout="1000" num_initial_members="1"/>
+ <MERGE2 max_interval="100000" min_interval="20000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500"/>
+ <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ shun="true"
+ view_bundling="true"
+ view_ack_collection_timeout="5000"/>
+ <FC max_credits="2000000" min_threshold="0.10"/>
+ <FRAG2 frag_size="60000"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+ </config>
+ </stack>
+
+ <stack name="unshared1"
+ description="unshared stack 1">
+ <config>
+ <UDP
+ mcast_port="${jgroups.udp.mcast_port:35689}"
+ mcast_addr="${jgroups.udp.mcast_addr:228.112.112.112}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ enable_bundling="false"
+
+ use_concurrent_stack="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="200"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="1000"
+ thread_pool.rejection_policy="Run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="Run"/>
+ <PING timeout="1000" num_initial_members="1"/>
+ <MERGE2 max_interval="100000" min_interval="20000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500"/>
+ <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ shun="true"
+ view_bundling="true"
+ view_ack_collection_timeout="5000"/>
+ <FC max_credits="2000000" min_threshold="0.10"/>
+ <FRAG2 frag_size="60000"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+ </config>
+ </stack>
+
+ <stack name="unshared2"
+ description="unshared stack 2">
+ <config>
+ <UDP
+ mcast_port="${jgroups.udp.mcast_port:35689}"
+ mcast_addr="${jgroups.udp.mcast_addr:228.112.112.112}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ enable_bundling="false"
+
+ use_concurrent_stack="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="200"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="1000"
+ thread_pool.rejection_policy="Run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="Run"/>
+ <PING timeout="1000" num_initial_members="1"/>
+ <MERGE2 max_interval="100000" min_interval="20000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500"/>
+ <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ shun="true"
+ view_bundling="true"
+ view_ack_collection_timeout="5000"/>
+ <FC max_credits="2000000" min_threshold="0.10"/>
+ <FRAG2 frag_size="60000"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+ </config>
+ </stack>
+
+ <!-- Here we don't use the concurrent stack, so an incoming packet handler
+ thread carries message -->
+ <stack name="nonconcurrent1"
+ description="no concurrent stack1">
+ <config>
+ <UDP
+ singleton_name="nonconcurrent1"
+ mcast_port="${jgroups.udp.mcast_port:35799}"
+ mcast_addr="${jgroups.udp.mcast_addr:228.114.114.114}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ enable_bundling="false"
+
+ use_concurrent_stack="false"
+
+ thread_pool.enabled="false"
+
+ oob_thread_pool.enabled="false"/>
+ <PING timeout="1000" num_initial_members="1"/>
+ <MERGE2 max_interval="100000" min_interval="20000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500"/>
+ <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ shun="true"
+ view_bundling="true"
+ view_ack_collection_timeout="5000"/>
+ <FC max_credits="2000000" min_threshold="0.10"/>
+ <FRAG2 frag_size="60000"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+ </config>
+ </stack>
+
+ <stack name="nonconcurrent2"
+ description="no concurrent stack2">
+ <config>
+ <UDP
+ singleton_name="nonconcurrent2"
+ mcast_port="${jgroups.udp.mcast_port:35799}"
+ mcast_addr="${jgroups.udp.mcast_addr:228.114.114.114}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ enable_bundling="false"
+
+ use_concurrent_stack="false"
+
+ thread_pool.enabled="false"
+
+ oob_thread_pool.enabled="false"/>
+ <PING timeout="1000" num_initial_members="1"/>
+ <MERGE2 max_interval="100000" min_interval="20000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500"/>
+ <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ shun="true"
+ view_bundling="true"
+ view_ack_collection_timeout="5000"/>
+ <FC max_credits="2000000" min_threshold="0.10"/>
+ <FRAG2 frag_size="60000"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+ </config>
+ </stack>
+
+ <stack name="queues" description="to check queue configuration">
+ <config>
+ <UDP
+ singleton_name="queues"
+ mcast_port="${jgroups.udp.mcast_port:35688}"
+ mcast_addr="${jgroups.udp.mcast_addr:228.111.111.111}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="true"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ enable_bundling="false"
+
+ use_concurrent_stack="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="2"
+ thread_pool.max_threads="22"
+ thread_pool.keep_alive_time="3000"
+ thread_pool.queue_enabled="true"
+ thread_pool.queue_max_size="750"
+ thread_pool.rejection_policy="Discard"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="3"
+ oob_thread_pool.max_threads="12"
+ oob_thread_pool.keep_alive_time="4000"
+ oob_thread_pool.queue_enabled="true"
+ oob_thread_pool.queue_max_size="75"
+ oob_thread_pool.rejection_policy="Abort"/>
+ <PING timeout="1000" num_initial_members="1"/>
+ <MERGE2 max_interval="100000" min_interval="20000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500"/>
+ <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ shun="true"
+ view_bundling="true"
+ view_ack_collection_timeout="5000"/>
+ <FC max_credits="2000000" min_threshold="0.10"/>
+ <FRAG2 frag_size="60000"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+ </config>
+ </stack>
+
+</protocol_stacks>
+
+
More information about the jboss-cvs-commits
mailing list