[jboss-cvs] JBossAS SVN: r90169 - trunk/cluster/src/main/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 12 17:10:54 EDT 2009
Author: bstansberry at jboss.com
Date: 2009-06-12 17:10:54 -0400 (Fri, 12 Jun 2009)
New Revision: 90169
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
Log:
[JBAS-7009] AS JChannelFactory shouldn't subclass the JGroups version
[JBAS-7014] Fix addSingletonName handling
[JBAS-7015] JChannelFactory should always add singleton_name
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java 2009-06-12 21:09:45 UTC (rev 90168)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java 2009-06-12 21:10:54 UTC (rev 90169)
@@ -30,12 +30,11 @@
import java.rmi.dgc.VMID;
import java.rmi.server.UID;
import java.security.AccessController;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.management.MBeanRegistration;
@@ -46,10 +45,8 @@
import org.jboss.bootstrap.spi.util.ServerConfigUtil;
import org.jboss.logging.Logger;
-import org.jboss.managed.api.ManagedOperation.Impact;
import org.jboss.managed.api.annotation.ManagementComponent;
import org.jboss.managed.api.annotation.ManagementObject;
-import org.jboss.managed.api.annotation.ManagementOperation;
import org.jboss.managed.api.annotation.ManagementProperties;
import org.jboss.managed.api.annotation.ManagementProperty;
import org.jboss.managed.api.annotation.ViewUse;
@@ -64,8 +61,8 @@
import org.jgroups.JChannel;
import org.jgroups.conf.ConfiguratorFactory;
import org.jgroups.conf.ProtocolData;
+import org.jgroups.conf.ProtocolParameter;
import org.jgroups.conf.ProtocolStackConfigurator;
-import org.jgroups.conf.ProtocolParameter;
import org.jgroups.conf.XmlConfigurator;
import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.protocols.TP;
@@ -85,8 +82,8 @@
import org.w3c.dom.NodeList;
/**
- * Extension to the JGroups JChannelFactory that supports a number of
- * JBoss AS-specific behaviors:
+ * Implementation of the JGroups <code>ChannelFactory</code> that supports a
+ * number of JBoss AS-specific behaviors:
* <p>
* <ul>
* <li>Passing a config event to newly created channels containing
@@ -99,6 +96,7 @@
* <li>Configures the channel's thread pools and thread factories to ensure
* that application thread context classloaders don't leak to the channel
* threads.</li>
+ * <li>Exposes a ProfileService ManagementView interface.</li>
* </ul>
* </p>
*
@@ -107,13 +105,19 @@
*
* @version $Revision$
*/
- at ManagementObject(componentType=@ManagementComponent(type="MCBean", subtype="JChannelFactory"),
- properties=ManagementProperties.CLASS_AND_EXPLICIT)
-public class JChannelFactory //extends org.jgroups.JChannelFactory
+ at ManagementObject(componentType=@ManagementComponent(type="MCBean", subtype="ChannelFactory"),
+ name="JChannelFactory", properties=ManagementProperties.CLASS_AND_EXPLICIT,
+ isRuntime=true)
+public class JChannelFactory
implements ChannelFactory, JChannelFactoryMBean, MBeanRegistration
{
private static final Logger log = Logger.getLogger(JChannelFactory.class);
+ /**
+ * Prefix prepended to the protocol stack name to create a synthetic
+ * transport protocol <code>singleton_name</code> value for channels
+ * that don't configure a <code>singleton_name</code>.
+ */
public static final String UNSHARED_TRANSPORT_NAME_BASE = "unnamed_";
private static final int CREATED = ServiceMBean.CREATED;
@@ -137,9 +141,11 @@
private boolean manageNewThreadClassLoader = true;
private boolean manageReleasedThreadClassLoader = false;
private boolean addMissingSingletonName = true;
- private boolean domainSet;
private final ContextClassLoaderSwitcher classLoaderSwitcher;
- private final Set<String> registeredChannels = new HashSet<String>();
+ private final Map<Channel, ChannelInfo> registeredChannels =
+ new ConcurrentHashMap<Channel, ChannelInfo>(16, 0.75f, 2);
+
+ private ChannelCloseListener closeListener = new ChannelCloseListener();
/**
* Map<String,ProtocolStackConfigurator>. Hashmap which maps stack names to JGroups
@@ -147,7 +153,8 @@
* configs. This is (re-)populated whenever a setMultiplexerConfig() method
* is called
*/
- private final Map<String,ProtocolStackConfigurator> stacks = Collections.synchronizedMap(new HashMap<String, ProtocolStackConfigurator>());
+ private final Map<String,ProtocolStackConfigurator> stacks =
+ new ConcurrentHashMap<String, ProtocolStackConfigurator>(16, 0.75f, 2);
/**
* The MBeanServer to expose JMX management data with (no management data
@@ -157,6 +164,7 @@
/** To expose the channels and protocols */
private String domain = "jgroups";
+ private boolean domainSet = false;
/** Whether or not to expose channels via JMX */
private boolean expose_channels=true;
@@ -164,8 +172,25 @@
/** Whether to expose the factory only, or all protocols as well */
private boolean expose_protocols=true;
+ /**
+ * Parses the contents of <code>input</code> into a map of the
+ * protocol stack configurations contained in the XML.
+ *
+ * @param input stream which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ *
+ * @return a map of the protocol stack configurations contained in the XML
+ *
+ * @throws IllegalArgumentException if <code>input</code> is <code>null</code>
+ * @throws Exception
+ */
public static Map<String, ProtocolStackConfigurator> parse(InputStream input) throws Exception
{
+ if (input == null)
+ {
+ throw new IllegalArgumentException("null input");
+ }
+
DocumentBuilderFactory factory=DocumentBuilderFactory.newInstance();
factory.setValidating(false); //for now
DocumentBuilder builder=factory.newDocumentBuilder();
@@ -178,20 +203,30 @@
return parse(configElement);
}
+ /**
+ * Parses the contents of <code>root</code> into a map of the
+ * protocol stack configurations contained in the XML.
+ *
+ * @param root document root node for XML content in the JGroups
+ * <code>stacks.xml</code> format
+ *
+ * @return a map of the protocol stack configurations contained in the XML
+ *
+ * @throws IllegalArgumentException if <code>input</code> is <code>null</code>
+ * @throws Exception
+ */
public static Map<String, ProtocolStackConfigurator> parse(Element root) throws Exception
{
- /**
- * CAUTION: crappy code ahead ! I (bela) am not an XML expert, so the code below is pretty amateurish...
- * But it seems to work, and it is executed only on startup, so no perf loss on the critical path.
- * If somebody wants to improve this, please be my guest.
- */
+ if (root == null)
+ {
+ throw new IllegalArgumentException("null root");
+ }
+
String root_name = root.getNodeName();
if (!PROTOCOL_STACKS.equals(root_name.trim().toLowerCase()))
{
- String error = "XML protocol stack configuration does not start with a '<config>' element; "
- + "maybe the XML configuration needs to be converted to the new format ?\n"
- + "use 'java org.jgroups.conf.XmlConfigurator <old XML file> -new_format' to do so";
- throw new IOException("invalid XML configuration: " + error);
+ throw new IOException("Invalid XML configuration: configuration does not start with a '" +
+ PROTOCOL_STACKS + "' element");
}
Map<String, ProtocolStackConfigurator> result = new HashMap<String, ProtocolStackConfigurator>();
@@ -207,7 +242,7 @@
String tmp = stack.getNodeName();
if (!STACK.equals(tmp.trim().toLowerCase()))
{
- throw new IOException("invalid configuration: didn't find a \"" + STACK + "\" element under \""
+ throw new IOException("Invalid configuration: didn't find a \"" + STACK + "\" element under \""
+ PROTOCOL_STACKS + "\"");
}
@@ -226,8 +261,10 @@
Element cfg = (Element) tmp_config;
tmp = cfg.getNodeName();
if (!CONFIG.equals(tmp))
- throw new IOException("invalid configuration: didn't find a \"" + CONFIG + "\" element under \"" + STACK
- + "\"");
+ {
+ throw new IOException("Invalid configuration: didn't find a \"" +
+ CONFIG + "\" element under \"" + STACK + "\"");
+ }
XmlConfigurator conf = XmlConfigurator.getInstance(cfg);
// fixes http://jira.jboss.com/jira/browse/JGRP-290
@@ -239,7 +276,50 @@
return result;
}
+
+ public static ProtocolData[] getProtocolData(ProtocolStackConfigurator config)
+ {
+ ProtocolData[] result = null;
+ try
+ {
+ result = config.getProtocolStack();
+ }
+ catch (UnsupportedOperationException e)
+ {
+ String s = config.getProtocolStackString();
+ String[] prots = s.split(":");
+ result = new ProtocolData[prots.length];
+ for (int i = 0; i < prots.length; i++)
+ {
+ ProtocolParameter[] params = null;
+ int paren = prots[i].indexOf('(');
+ String name = paren > - 1 ? prots[i].substring(0, paren) : prots[1];
+ if (paren > -1 && paren < prots[1].length() - 2)
+ {
+ String unsplit = prots[i].substring(paren + 1, prots[i].length() -1);
+ String[] split = unsplit.split(";");
+ params = new ProtocolParameter[split.length];
+ for (int j = 0; j < split.length; j++)
+ {
+ String[] keyVal = split[j].split("=");
+ params[j] = new ProtocolParameter(keyVal[0], keyVal[1]);
+ }
+ }
+ else
+ {
+ params = new ProtocolParameter[0];
+ }
+
+ result[i] = new ProtocolData(name, null, name, params);
+ }
+ }
+
+ return result == null ? new ProtocolData[0] : result;
+ }
+ /**
+ * Creates a new JChannelFactory.
+ */
@SuppressWarnings("unchecked")
public JChannelFactory()
{
@@ -254,65 +334,75 @@
throw new ChannelException("No-arg createChannel() is not supported");
}
+ /**
+ * Creates a channel by passing <code>properties</code> to the
+ * <code>org.jgroups.JChannel</code> constructor.
+ *
+ * @param properties protocol stack configuration object; can be <code>null</code>
+ * in which case a default stack will be used
+ *
+ * @return the channel
+ */
public Channel createChannel(Object properties) throws ChannelException
{
checkStarted();
-
- @SuppressWarnings("deprecation")
- Channel channel = new JChannel(properties);
-
- if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
+
+ if (properties == null)
+ properties = JChannel.DEFAULT_PROTOCOL_STACK;
+
+ ProtocolStackConfigurator config = null;
+
+ try
{
- fixChannelThreadManagement(channel);
+ @SuppressWarnings("deprecation")
+ ProtocolStackConfigurator c = ConfiguratorFactory.getStackConfigurator(properties);
+ config = c;
}
-
- if (assignLogicalAddresses)
+ catch (Exception x)
{
- setChannelUniqueId(channel);
+ throw new ChannelException("unable to load protocol stack", x);
}
-
- // can't register in JMX as we don't have a channel name
-
+
+ JChannel channel = initializeChannel(config, null, false);
+
+ try
+ {
+ registerChannel(channel, null, null, getProtocolData(config));
+ }
+ catch (ChannelException ce)
+ {
+ throw ce;
+ }
+ catch (Exception e)
+ {
+ throw new ChannelException("unable to register channel", e);
+ }
+
return channel;
}
/**
* Create a {@link Channel} using the specified stack. Channel will use a
- * shared transport if the <code>singleton-name</code> attribute is
- * set on the stack's transport protocol.
+ * shared transport.
*
- * @param stack_name the name of the stack
- * @return the channel
+ * @param stack_name
+ * The name of the stack to be used. All stacks are defined in
+ * the configuration with which the factory is configured (see
+ * {@link #setMultiplexerConfig(Object)} for example. If
+ * clients attempt to create a Channel for an undefined stack
+ * name an exception will be thrown.
*
+ * @return an implementation of Channel configured with a shared transport.
+ *
+ * @throws IllegalArgumentException if <code>stack_name</code> is
+ * <code>null</code> or {@link #getConfig(String)} returns <code>null</code>
+ * when <code>stack_name</code> is used.
+ *
* @throws Exception
*/
public Channel createChannel(String stack_name) throws Exception
{
- checkStarted();
-
- String props=stack_name != null? getConfig(stack_name) : null;
- if (props == null)
- {
- log.warn("No protocol stack found with name " + stack_name +
- "; creating default channel");
- return createChannel();
- }
-
- JChannel channel = new JChannel(props);
-
- if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
- {
- fixChannelThreadManagement(channel);
- }
-
- if (assignLogicalAddresses)
- {
- setChannelUniqueId(channel);
- }
-
- // can't register in JMX as we don't have a channel name
-
- return channel;
+ return createChannelFromRegisteredStack(stack_name, null, false);
}
/**
@@ -342,12 +432,12 @@
* {@link #setMultiplexerConfig(Object)} for example. If
* clients attempt to create a Channel for an undefined stack
* name an Exception will be thrown.
- * @param id Only used if the transport protocol configuration for the
- * specified stack does not include the <code>singleton_name</code>
- * attribute; then it is used to create a synthetic singleton-name
- * for the channel's protocol stack.
+ * @param id Only used if {@link #isExposeChannels()} returns <code>true</code>,
+ * in which case, if not <code>null</code>, is used as part of
+ * the <code>ObjectName</code> for the JMX mbeans that represent
+ * the channel and its protocols. Can be <code>null</code>.
*
- * @return An implementation of Channel configured with a shared transport.
+ * @return an implementation of Channel configured with a shared transport.
*
* @throws IllegalStateException if the specified protocol stack does not
* declare a <code>singleton_name</code> and
@@ -357,57 +447,8 @@
*/
public Channel createMultiplexerChannel(String stack_name, String id) throws Exception
{
- checkStarted();
-
- String configStr = getConfig(stack_name);
-
- if (configStr == null)
- throw new IllegalStateException("Unknown stack_name " + stack_name);
-
- ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
- Map<String, String> tpProps = getTransportProperties(config);
-
- if (!tpProps.containsKey(Global.SINGLETON_NAME))
- {
- if (addMissingSingletonName)
- {
- String singletonName = UNSHARED_TRANSPORT_NAME_BASE + stack_name;
-
- log.warn("Config for " + stack_name + " does not include " +
- "singleton_name; adding a name of " + singletonName +
- ". You should configure a singleton_name for this stack.");
-
- config = addSingletonName(config, singletonName);
- log.debug("Stack config after adding singleton_name is " + config.getProtocolStackString());
- tpProps = getTransportProperties(config);
- }
- else
- {
- throw new IllegalStateException("Config for " + stack_name + " does not include " +
- "singleton_name and MuxChannels are not supported.");
- }
- }
-
- JChannel channel = new JChannel(config);
-
- if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
- {
- fixChannelThreadManagement(channel);
- }
-
- if (assignLogicalAddresses)
- {
- setChannelUniqueId(channel);
- }
-
- if (isExposeChannels() && id != null && id.length() > 0)
- {
- registerChannel(channel, id);
- }
-
- return channel;
+ return createChannelFromRegisteredStack(stack_name, id, true);
}
-
/**
* Creates and returns a shared transport Channel configured with the specified
@@ -434,27 +475,73 @@
return createMultiplexerChannel(stack_name, id);
}
+ /**
+ * {@link #parse(Element) Parses <code>properties</code>} and then adds
+ * the resulting protocol stack configurations to the set available for use.
+ * Same as
+ * {@link #setMultiplexerConfig(Element, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+ *
+ * @param properties document root node for XML content in the JGroups
+ * <code>stacks.xml</code> format
+ */
public void setMultiplexerConfig(Element properties) throws Exception
{
setMultiplexerConfig(properties, true);
}
+ /**
+ * {@link #parse(InputStream) Parses} an input stream created from
+ * <code>properties</code> and then adds the resulting protocol stack
+ * configurations to the set available for use. Same as
+ * {@link #setMultiplexerConfig(File, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+ *
+ * @param properties file which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ */
public void setMultiplexerConfig(File properties) throws Exception
{
- setMultiplexerConfig(properties, true);
-
+ setMultiplexerConfig(properties, true);
}
+ /**
+ * {@link #parse(InputStream) Parses} an input stream created from
+ * <code>properties</code> and then adds the resulting protocol stack
+ * configurations to the set available for use. Same as
+ * {@link #setMultiplexerConfig(Object, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+ *
+ * @param properties object that can be {@link ConfiguratorFactory#getConfigStream(Object) converted into a stream}
+ * which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ */
public void setMultiplexerConfig(Object properties) throws Exception
{
setMultiplexerConfig(properties, true);
}
+ /**
+ * {@link #parse(InputStream) Parses} an input stream created from
+ * <code>properties</code> and then adds the resulting protocol stack
+ * configurations to the set available for use. Same as
+ * {@link #setMultiplexerConfig(String, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+ *
+ * @param properties string that can be {@link ConfiguratorFactory#getConfigStream(String) converted into a stream}
+ * which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ */
public void setMultiplexerConfig(String properties) throws Exception
{
setMultiplexerConfig(properties, true);
}
+ /**
+ * {@link #parse(InputStream) Parses} an input stream created from
+ * <code>properties</code> and then adds the resulting protocol stack
+ * configurations to the set available for use. Same as
+ * {@link #setMultiplexerConfig(URL, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+ *
+ * @param properties URL which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ */
public void setMultiplexerConfig(URL properties) throws Exception
{
setMultiplexerConfig(properties, true);
@@ -462,38 +549,63 @@
// ------------------------------------------------------------- Properties
- public MBeanServer getServer() {
+ /**
+ * Gets the MBeanServer to use to register mbeans for channels and protocols
+ * we create.
+ *
+ * @return the MBeanServer, or <code>null</code> if one isn't registered
+ */
+ public MBeanServer getServer()
+ {
return server;
}
- public void setServer(MBeanServer server) {
+ /**
+ * Sets the MBeanServer to use to register mbeans for channels and protocols
+ * we create.
+ *
+ * @param server the MBeanServer. May be <code>null</code>
+ */
+ public void setServer(MBeanServer server)
+ {
this.server=server;
}
- public String getDomain() {
- return domain;
+ /**
+ * Gets the domain portion of the JMX ObjectName to use when registering channels and protocols
+ *
+ * @return the domain. Will not return <code>null</code> after {@link #create()}
+ * has been invoked.
+ */
+ public String getDomain()
+ {
+ return domain == null ? "jgroups" : domain;
}
@ManagementProperty(use={ViewUse.CONFIGURATION}, description="The domain portion of the JMX ObjectName to use when registering channels and protocols")
public void setDomain(String domain)
{
this.domain = domain;
- domainSet = true;
+ this.domainSet = true;
}
- public boolean isExposeChannels() {
+ public boolean isExposeChannels()
+ {
return expose_channels;
}
- public void setExposeChannels(boolean expose_channels) {
+ public void setExposeChannels(boolean expose_channels)
+ {
this.expose_channels=expose_channels;
}
- public boolean isExposeProtocols() {
+ public boolean isExposeProtocols()
+ {
return expose_protocols;
}
- public void setExposeProtocols(boolean expose_protocols) {
+ public void setExposeProtocols(boolean expose_protocols)
+ {
this.expose_protocols=expose_protocols;
if (expose_protocols)
this.expose_channels=true;
@@ -685,7 +797,8 @@
* @return <code>true</code> if synthetic singleton names should be created.
* Default is <code>true</code>.
*/
- @ManagementProperty(use={ViewUse.CONFIGURATION}, description="Whether this factory should create a synthetic singleton name attribute for a channel's transport protocol if one isn't configured")
+ @ManagementProperty(use={ViewUse.CONFIGURATION},
+ description="Whether this factory should create a synthetic singleton name attribute for a channel's transport protocol if one isn't configured")
public boolean getAddMissingSingletonName()
{
return addMissingSingletonName;
@@ -706,107 +819,129 @@
// ------------------------------------------------------------- Public
+ /**
+ * {@link #parse(Element) Parses <code>properties</code>} and then adds
+ * the resulting protocol stack configurations to the set available for use.
+ *
+ * @param properties document root node for XML content in the JGroups
+ * <code>stacks.xml</code> format
+ * @param replace <code>true</code> if a configuration with the same
+ * stack name as an already registered configuration should
+ * replace that configuration; <code>false</code> if it
+ * should be discarded.
+ */
public void setMultiplexerConfig(Element properties, boolean replace) throws Exception
{
Map<String, ProtocolStackConfigurator> map = parse(properties);
+ for (Map.Entry<String, ProtocolStackConfigurator> entry : map.entrySet())
+ {
+ addConfig(entry.getKey(), entry.getValue(), replace);
+ }
}
+ /**
+ * {@link #parse(InputStream) Parses} an input stream created from
+ * <code>properties</code> and then adds the resulting protocol stack
+ * configurations to the set available for use.
+ *
+ * @param properties file which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ * @param replace <code>true</code> if a configuration with the same
+ * stack name as an already registered configuration should
+ * replace that configuration; <code>false</code> if it
+ * should be discarded.
+ */
public void setMultiplexerConfig(File properties, boolean replace) throws Exception
{
InputStream input=ConfiguratorFactory.getConfigStream(properties);
addConfigs(input, properties.toString(), replace);
}
+ /**
+ * {@link #parse(InputStream) Parses} an input stream created from
+ * <code>properties</code> and then adds the resulting protocol stack
+ * configurations to the set available for use.
+ *
+ * @param properties object that can be {@link ConfiguratorFactory#getConfigStream(Object) converted into a stream}
+ * which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ * @param replace <code>true</code> if a configuration with the same
+ * stack name as an already registered configuration should
+ * replace that configuration; <code>false</code> if it
+ * should be discarded.
+ */
public void setMultiplexerConfig(Object properties, boolean replace) throws Exception
{
InputStream input=ConfiguratorFactory.getConfigStream(properties);
addConfigs(input, properties.toString(), replace);
}
+ /**
+ * {@link #parse(InputStream) Parses} an input stream created from
+ * <code>properties</code> and then adds the resulting protocol stack
+ * configurations to the set available for use.
+ *
+ * @param properties string that can be {@link ConfiguratorFactory#getConfigStream(String) converted into a stream}
+ * which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ * @param replace <code>true</code> if a configuration with the same
+ * stack name as an already registered configuration should
+ * replace that configuration; <code>false</code> if it
+ * should be discarded.
+ */
public void setMultiplexerConfig(String properties, boolean replace) throws Exception
{
InputStream input=ConfiguratorFactory.getConfigStream(properties);
addConfigs(input, properties, replace);
}
+ /**
+ * {@link #parse(InputStream) Parses} an input stream created from
+ * <code>properties</code> and then adds the resulting protocol stack
+ * configurations to the set available for use.
+ *
+ * @param properties URL which must contain XML content in the JGroups
+ * <code>stacks.xml</code> format
+ * @param replace <code>true</code> if a configuration with the same
+ * stack name as an already registered configuration should
+ * replace that configuration; <code>false</code> if it
+ * should be discarded.
+ */
public void setMultiplexerConfig(URL url, boolean replace) throws Exception
{
InputStream input=ConfiguratorFactory.getConfigStream(url);
addConfigs(input, url.toString(), replace);
}
-
- private void addConfigs(InputStream input, String source, boolean replace) throws Exception
- {
- if(input == null)
- {
- throw new FileNotFoundException(source);
- }
-
- Map<String, ProtocolStackConfigurator> map = null;
- try {
- map = parse(input);
- }
- catch(Exception ex) {
- throw new Exception("failed parsing " + source, ex);
- }
- finally {
- Util.close(input);
- }
-
- for (Map.Entry<String, ProtocolStackConfigurator> entry : map.entrySet())
- {
- addConfig(entry.getKey(), entry.getValue(), replace);
- }
- }
// --------------------------------------------------- JChannelFactoryMBean
-
-
- private boolean addConfig(String st_name, ProtocolStackConfigurator val, boolean replace)
- {
- boolean added = replace;
- if(replace) {
- stacks.put(st_name, val);
- if(log.isTraceEnabled())
- log.trace("added config '" + st_name + "'");
- }
- else {
- if(!stacks.containsKey(st_name)) {
- stacks.put(st_name, val);
- if(log.isTraceEnabled())
- log.trace("added config '" + st_name + "'");
- replace = true;
- }
- else {
- if(log.isTraceEnabled())
- log.trace("didn't add config '" + st_name + " because one of the same name already existed");
- }
- }
- return replace;
-
- }
-
+ /**
+ * {@inheritDoc}
+ */
public void clearConfigurations()
{
this.stacks.clear();
}
+ /**
+ * {@inheritDoc}
+ */
public String dumpChannels()
{
return "";
}
+ /**
+ * {@inheritDoc}
+ */
public String dumpConfiguration()
{
return stacks.keySet().toString();
}
/**
- * Returns the stack configuration as a string (to be fed into new JChannel()). Throws an exception
- * if the stack_name is not found. One of the setMultiplexerConfig() methods had to be called beforehand
- * @return The protocol stack config as a plain string
+ * {@inheritDoc}
*/
public String getConfig(String stack_name) throws Exception
{
@@ -817,7 +952,7 @@
}
/**
- * @return Returns all configurations
+ * {@inheritDoc}
*/
public String getMultiplexerConfig()
{
@@ -829,6 +964,9 @@
return sb.toString();
}
+ /**
+ * {@inheritDoc}
+ */
public boolean removeConfig(String stack_name)
{
return stack_name != null && this.stacks.remove(stack_name) != null;
@@ -836,6 +974,9 @@
// ------------------------------------------------------------- Lifecycle
+ /**
+ * {@inheritDoc}
+ */
public void create() throws Exception
{
@@ -862,22 +1003,9 @@
log.debug("Created JChannelFactory");
}
- protected void createService() throws Exception
- {
- if(expose_channels)
- {
- if(server == null)
- server=Util.getMBeanServer();
- if(server == null)
- {
- throw new Exception("No MBeanServer found; JChannelFactory needs to be run with an MBeanServer present, " +
- "e.g. inside JBoss or JDK 5, or with ExposeChannels set to false");
- }
- if(domain == null)
- domain="jgroups";
- }
- }
-
+ /**
+ * {@inheritDoc}
+ */
public void start() throws Exception
{
if (state == STARTING || state == STARTED || state == STOPPING)
@@ -911,11 +1039,9 @@
}
- protected void startService() throws Exception
- {
- // no-op
- }
-
+ /**
+ * {@inheritDoc}
+ */
public void stop()
{
if (state != STARTED)
@@ -942,11 +1068,9 @@
log.debug("Stopped JChannelFactory");
}
- protected void stopService() throws Exception
- {
- // no-op
- }
-
+ /**
+ * {@inheritDoc}
+ */
public void destroy()
{
if (state == DESTROYED)
@@ -975,33 +1099,18 @@
log.debug("Destroyed JChannelFactory");
}
- protected void destroyService()
- {
- Set<String> toUnregister = null;
- synchronized (registeredChannels)
- {
- toUnregister = new HashSet<String>(registeredChannels);
- }
-
- for (String channelId : toUnregister)
- {
- unregister(channelId);
- }
- }
-
// ------------------------------------------------------- MBeanRegistration
public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception
{
setServer(server);
- if (!domainSet || getDomain() == null)
+ if (!this.domainSet || this.domain == null)
{
setDomain(name.getDomain());
}
return name;
}
-
public void postRegister(Boolean registrationDone)
{
if (registrationDone != null && registrationDone.booleanValue()
@@ -1011,7 +1120,6 @@
}
}
-
public void preDeregister() throws Exception
{
}
@@ -1022,6 +1130,8 @@
if (state == ServiceMBean.DESTROYED)
state = ServiceMBean.UNREGISTERED;
}
+
+ // --------------------------------------------------------------- Protected
/**
* Gets the classloader that channel threads should be set to if
@@ -1037,6 +1147,42 @@
return getClass().getClassLoader();
}
+ protected void createService() throws Exception
+ {
+ if(expose_channels)
+ {
+ if(server == null)
+ {
+ throw new Exception("No MBeanServer found; JChannelFactory needs to " +
+ "be run with an MBeanServer present, or with ExposeChannels " +
+ "set to false");
+ }
+
+ if(domain == null)
+ {
+ domain="jgroups";
+ }
+ }
+ }
+
+ protected void startService() throws Exception
+ {
+ // no-op
+ }
+
+ protected void stopService() throws Exception
+ {
+ // no-op
+ }
+
+ protected void destroyService()
+ {
+ for (Channel ch : registeredChannels.keySet())
+ {
+ unregisterChannel(ch);
+ }
+ }
+
// ----------------------------------------------------------------- Private
@@ -1046,6 +1192,142 @@
throw new IllegalStateException("Cannot use factory; state is " + getStateString());
}
+ private void addConfigs(InputStream input, String source, boolean replace) throws Exception
+ {
+ if(input == null)
+ {
+ throw new FileNotFoundException(source);
+ }
+
+ Map<String, ProtocolStackConfigurator> map = null;
+ try
+ {
+ map = parse(input);
+ }
+ catch(Exception ex)
+ {
+ throw new Exception("failed parsing " + source, ex);
+ }
+ finally
+ {
+ Util.close(input);
+ }
+
+ for (Map.Entry<String, ProtocolStackConfigurator> entry : map.entrySet())
+ {
+ addConfig(entry.getKey(), entry.getValue(), replace);
+ }
+ }
+
+ private boolean addConfig(String st_name, ProtocolStackConfigurator val, boolean replace)
+ {
+ boolean added = replace;
+ if (replace)
+ {
+ stacks.put(st_name, val);
+ if (log.isTraceEnabled())
+ log.trace("added config '" + st_name + "'");
+ }
+ else
+ {
+ if (!stacks.containsKey(st_name))
+ {
+ stacks.put(st_name, val);
+ if (log.isTraceEnabled())
+ log.trace("added config '" + st_name + "'");
+ added = true;
+ }
+ else
+ {
+ if (log.isTraceEnabled())
+ log.trace("didn't add config '" + st_name + " because one of the same name already existed");
+ }
+ }
+ return added;
+ }
+
+ /**
+ * Creates a channel from one of the known stack configurations
+ * FIXME Comment this
+ *
+ * @param stack_name the name of the stack config
+ * @param id optional id for the channel
+ * @param forceSingletonStack <code>true</code> if a singleton_name must be
+ * either configured, or addMissingSingletonName must be true
+ *
+ * @return the channel
+ *
+ * @throws IllegalStateException if stack_name is unknown or forceSingletonStack
+ * is true but a singleton_name couldn't be configured
+ *
+ * @throws Exception
+ */
+ private Channel createChannelFromRegisteredStack(String stack_name, String id, boolean forceSingletonStack) throws Exception
+ {
+ checkStarted();
+
+ ProtocolStackConfigurator config = stacks.get(stack_name);
+
+ if (config == null)
+ throw new IllegalStateException("Unknown stack_name " + stack_name);
+
+ JChannel channel = initializeChannel(config, stack_name, forceSingletonStack);
+
+ registerChannel(channel, id, stack_name, getProtocolData(config));
+
+ return channel;
+ }
+
+ /**
+ * Construct a JChannel from the given config and then do post-construction
+ * processing like fixing up thread managment or setting a unique id.
+ *
+ * @param config the config
+ *
+ * @return the channel
+ *
+ * @throws ChannelException
+ */
+ private JChannel initializeChannel(ProtocolStackConfigurator config, String stack_name,
+ boolean forceSingletonStack) throws ChannelException
+ {
+ Map<String, String> tpProps = getTransportProperties(config);
+
+ if (!tpProps.containsKey(Global.SINGLETON_NAME))
+ {
+ if (addMissingSingletonName && stack_name != null)
+ {
+ String singletonName = UNSHARED_TRANSPORT_NAME_BASE + stack_name;
+
+ log.warn("Config for " + stack_name + " does not include " +
+ "singleton_name; adding a name of " + singletonName +
+ ". You should configure a singleton_name for this stack.");
+
+ config = addSingletonName(config, singletonName);
+ log.debug("Stack config after adding singleton_name is " + config.getProtocolStackString());
+ tpProps = getTransportProperties(config);
+ }
+ else if (forceSingletonStack)
+ {
+ throw new IllegalStateException("Config for " + stack_name + " does not include " +
+ "singleton_name and MuxChannels are not supported.");
+ }
+ }
+ JChannel channel = new JChannel(config);
+
+ if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
+ {
+ fixChannelThreadManagement(channel);
+ }
+
+ if (assignLogicalAddresses)
+ {
+ setChannelUniqueId(channel);
+ }
+
+ return channel;
+ }
+
private void setChannelUniqueId(Channel channel)
{
IpAddress address = (IpAddress) channel.getLocalAddress();
@@ -1134,31 +1416,15 @@
private Map<String, String> getTransportProperties(ProtocolStackConfigurator config)
{
Map<String, String> tpProps = null;
- try
+ ProtocolData[] protocols= getProtocolData(config);
+ ProtocolData transport=protocols[0];
+ @SuppressWarnings("unchecked")
+ Map<String,ProtocolParameter> tmp=transport.getParameters();
+ tpProps = new HashMap<String,String>();
+ for(Map.Entry<String,ProtocolParameter> entry: tmp.entrySet())
{
- ProtocolData[] protocols=config.getProtocolStack();
- ProtocolData transport=protocols[0];
- Map<String,ProtocolParameter> tmp=transport.getParameters();
- tpProps = new HashMap<String,String>();
- for(Map.Entry<String,ProtocolParameter> entry: tmp.entrySet())
- tpProps.put(entry.getKey(), entry.getValue().getValue());
+ tpProps.put(entry.getKey(), entry.getValue().getValue());
}
- catch (UnsupportedOperationException uoe)
- {
- // JGroups version hasn't implemented ProtocolStackConfigurator.getProtocolStack()
- // So we do it ourselves
- String configStr = config.getProtocolStackString();
- String tpConfigStr = configStr.substring(configStr.indexOf('(') + 1, configStr.indexOf(')'));
- String[] params = tpConfigStr.split(";");
- tpProps = new HashMap<String, String>();
- for (String param : params)
- {
- String[] keyVal = param.split("=");
- if (keyVal.length != 2)
- throw new IllegalStateException("Invalid parameter " + param + " in stack " + configStr);
- tpProps.put(keyVal[0], keyVal[1]);
- }
- }
return tpProps;
}
@@ -1171,12 +1437,8 @@
{
ProtocolData[] protocols=orig.getProtocolStack();
ProtocolData transport=protocols[0];
- Map<String,ProtocolParameter> tmp=transport.getParameters();
- Map<String,String> tpProps = new HashMap<String,String>();
- for(Map.Entry<String,ProtocolParameter> entry: tmp.entrySet())
- tpProps.put(entry.getKey(), entry.getValue().getValue());
- tpProps.put(Global.SINGLETON_NAME, singletonName);
- // we've now updated the state of orig; just return it
+ ProtocolParameter singletonParam = new ProtocolParameter(Global.SINGLETON_NAME, singletonName);
+ transport.override(new ProtocolParameter[]{ singletonParam});
result = orig;
}
catch (UnsupportedOperationException uoe)
@@ -1359,40 +1621,124 @@
classLoaderSwitcher.setContextClassLoader(thread, classLoader);
}
- private void registerChannel(JChannel ch, String channelId) throws Exception
+ private void registerChannel(JChannel ch, String channelId, String stackName, ProtocolData[] config) throws Exception
{
- if(getServer() != null)
+ // Register for channel closed notification so we can unregister
+ ch.addChannelListener(closeListener);
+
+ ObjectName chName = null;
+ List<ObjectName> protNames = null;
+ List<ObjectName> allNames = registerInJmx(ch, channelId);
+ if (allNames != null && allNames.size() > 0)
{
- // Register for channel closed notification so we can unregister
- JmxDeregistrationChannelListener listener = new JmxDeregistrationChannelListener(channelId);
- ch.addChannelListener(listener);
- JmxConfigurator.registerChannel(ch, getServer(), getDomain(), channelId, isExposeProtocols());
- synchronized (registeredChannels)
+ chName = allNames.get(0);
+ if (allNames.size() > 1)
{
- registeredChannels.add(channelId);
+ protNames = allNames.subList(1, allNames.size());
}
}
+
+ ChannelInfo info = new ChannelInfo(channelId, stackName, ch, config, chName, protNames);
+ registeredChannels.put(ch, info);
}
- private void unregister(String channelId)
+ private List<ObjectName> registerInJmx(JChannel ch, String channelId) throws Exception
{
- if(getServer() != null && registeredChannels.contains(channelId))
+ List<ObjectName> allNames = null;
+
+ if(isExposeChannels() && getServer() != null && channelId != null && channelId.length() > 0)
{
- String oname = getDomain() + ":type=channel,cluster=" + channelId;
- try
+ allNames = new ArrayList<ObjectName>();
+ ObjectName channelName = new ObjectName(getDomain() + ":type=channel,cluster=" + channelId);
+ getServer().registerMBean(new org.jgroups.jmx.JChannel(ch), channelName);
+ allNames.add(channelName);
+ if (isExposeProtocols())
{
- getServer().unregisterMBean(new ObjectName(oname));
- oname = getDomain() + ":type=protocol,cluster=" + channelId + ",*";
- JmxConfigurator.unregister(getServer(), oname);
- synchronized (registeredChannels)
+ String baseName = getDomain() + ":type=protocol,cluster=" + channelId;
+ ProtocolStack stack=ch.getProtocolStack();
+ List<Protocol> protocols=stack.getProtocols();
+
+ for(Protocol prot : protocols)
{
- registeredChannels.remove(channelId);
+ org.jgroups.jmx.Protocol p=null;
+ try {
+ String prot_name = prot.getClass().getName();
+ String clname = prot_name.replaceFirst("org.jgroups.", "org.jgroups.jmx.");
+ Class<?> cl = Util.loadClass(clname, JmxConfigurator.class);
+ if (cl != null)
+ {
+ p = (org.jgroups.jmx.Protocol) cl.newInstance();
+ }
+ }
+ catch(ClassNotFoundException e)
+ {
+ // ignore;
+ }
+ catch(Throwable e) {
+ log.error("failed creating a JMX wrapper instance for " + prot, e);
+ p = null;
+ }
+ if(p == null)
+ {
+ // Use default
+ p = new org.jgroups.jmx.Protocol(prot);
+ }
+ ObjectName prot_name=new ObjectName(baseName + ",protocol=" + prot.getName());
+ server.registerMBean(p, prot_name);
+ allNames.add(prot_name);
}
}
+ }
+
+ return allNames;
+ }
+
+ private void unregisterChannel(Channel ch)
+ {
+ ChannelInfo info = registeredChannels.remove(ch);
+ if (info == null)
+ {
+ log.warn("Unknown channel " + ch.getClusterName());
+ }
+ else
+ {
+ unregisterFromJmx(info);
+ }
+
+ ch.removeChannelListener(closeListener);
+ }
+
+ private void unregisterFromJmx(ChannelInfo info)
+ {
+ ObjectName oname = info.getChannelObjectName();
+ MBeanServer mbs = getServer();
+ if(info != null && mbs != null)
+ {
+ try
+ {
+ mbs.unregisterMBean(oname);
+ }
catch(Exception e)
{
log.error("failed unregistering " + oname, e);
}
+
+ List<ObjectName> onames = info.getProtocolObjectNames();
+ if (onames != null)
+ {
+ for (ObjectName protName : onames)
+ {
+ try
+ {
+ mbs.unregisterMBean(protName);
+ }
+ catch(Exception e)
+ {
+ log.error("failed unregistering " + protName, e);
+ }
+
+ }
+ }
}
}
@@ -1432,18 +1778,11 @@
}
- private class JmxDeregistrationChannelListener extends ChannelListenerAdapter
+ private class ChannelCloseListener extends ChannelListenerAdapter
{
- private final String channelId;
-
- JmxDeregistrationChannelListener(String channelId)
- {
- this.channelId = channelId;
- }
-
public void channelClosed(Channel channel)
{
- unregister(channelId);
+ unregisterChannel(channel);
}
}
More information about the jboss-cvs-commits
mailing list