[jboss-cvs] JBossAS SVN: r90169 - trunk/cluster/src/main/org/jboss/ha/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jun 12 17:10:54 EDT 2009


Author: bstansberry at jboss.com
Date: 2009-06-12 17:10:54 -0400 (Fri, 12 Jun 2009)
New Revision: 90169

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
Log:
[JBAS-7009] AS JChannelFactory shouldn't subclass the JGroups version
[JBAS-7014] Fix addSingletonName handling
[JBAS-7015] JChannelFactory should always add singleton_name

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java	2009-06-12 21:09:45 UTC (rev 90168)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java	2009-06-12 21:10:54 UTC (rev 90169)
@@ -30,12 +30,11 @@
 import java.rmi.dgc.VMID;
 import java.rmi.server.UID;
 import java.security.AccessController;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import javax.management.MBeanRegistration;
@@ -46,10 +45,8 @@
 
 import org.jboss.bootstrap.spi.util.ServerConfigUtil;
 import org.jboss.logging.Logger;
-import org.jboss.managed.api.ManagedOperation.Impact;
 import org.jboss.managed.api.annotation.ManagementComponent;
 import org.jboss.managed.api.annotation.ManagementObject;
-import org.jboss.managed.api.annotation.ManagementOperation;
 import org.jboss.managed.api.annotation.ManagementProperties;
 import org.jboss.managed.api.annotation.ManagementProperty;
 import org.jboss.managed.api.annotation.ViewUse;
@@ -64,8 +61,8 @@
 import org.jgroups.JChannel;
 import org.jgroups.conf.ConfiguratorFactory;
 import org.jgroups.conf.ProtocolData;
+import org.jgroups.conf.ProtocolParameter;
 import org.jgroups.conf.ProtocolStackConfigurator;
-import org.jgroups.conf.ProtocolParameter;
 import org.jgroups.conf.XmlConfigurator;
 import org.jgroups.jmx.JmxConfigurator;
 import org.jgroups.protocols.TP;
@@ -85,8 +82,8 @@
 import org.w3c.dom.NodeList;
 
 /**
- * Extension to the JGroups JChannelFactory that supports a number of 
- * JBoss AS-specific behaviors:
+ * Implementation of the JGroups <code>ChannelFactory</code> that supports a 
+ * number of JBoss AS-specific behaviors:
  * <p>
  * <ul>
  * <li>Passing a config event to newly created channels containing 
@@ -99,6 +96,7 @@
  * <li>Configures the channel's thread pools and thread factories to ensure
  * that application thread context classloaders don't leak to the channel
  * threads.</li>
+ * <li>Exposes a ProfileService ManagementView interface.</li>
  * </ul>
  * </p>
  * 
@@ -107,13 +105,19 @@
  * 
  * @version $Revision$
  */
- at ManagementObject(componentType=@ManagementComponent(type="MCBean", subtype="JChannelFactory"),
-      properties=ManagementProperties.CLASS_AND_EXPLICIT)
-public class JChannelFactory //extends org.jgroups.JChannelFactory
+ at ManagementObject(componentType=@ManagementComponent(type="MCBean", subtype="ChannelFactory"),
+      name="JChannelFactory", properties=ManagementProperties.CLASS_AND_EXPLICIT,
+      isRuntime=true)
+public class JChannelFactory
       implements ChannelFactory, JChannelFactoryMBean, MBeanRegistration
 {
    private static final Logger log = Logger.getLogger(JChannelFactory.class);
    
+   /** 
+    * Prefix prepended to the protocol stack name to create a synthetic
+    * transport protocol <code>singleton_name</code> value for channels
+    * that don't configure a <code>singleton_name</code>. 
+    */
    public static final String UNSHARED_TRANSPORT_NAME_BASE = "unnamed_";
    
    private static final int CREATED = ServiceMBean.CREATED;
@@ -137,9 +141,11 @@
    private boolean manageNewThreadClassLoader = true;
    private boolean manageReleasedThreadClassLoader = false;
    private boolean addMissingSingletonName = true;
-   private boolean domainSet;
    private final ContextClassLoaderSwitcher classLoaderSwitcher;
-   private final Set<String> registeredChannels = new HashSet<String>();
+   private final Map<Channel, ChannelInfo> registeredChannels = 
+      new ConcurrentHashMap<Channel, ChannelInfo>(16, 0.75f, 2);
+   
+   private ChannelCloseListener closeListener = new ChannelCloseListener();
 
    /**
     * Map<String,ProtocolStackConfigurator>. Hashmap which maps stack names to JGroups
@@ -147,7 +153,8 @@
     * configs. This is (re-)populated whenever a setMultiplexerConfig() method
     * is called
     */
-   private final Map<String,ProtocolStackConfigurator> stacks = Collections.synchronizedMap(new HashMap<String, ProtocolStackConfigurator>());
+   private final Map<String,ProtocolStackConfigurator> stacks = 
+      new ConcurrentHashMap<String, ProtocolStackConfigurator>(16, 0.75f, 2);
 
    /**
     * The MBeanServer to expose JMX management data with (no management data
@@ -157,6 +164,7 @@
 
    /** To expose the channels and protocols */
    private String domain = "jgroups";
+   private boolean domainSet = false;
 
    /** Whether or not to expose channels via JMX */
    private boolean expose_channels=true;
@@ -164,8 +172,25 @@
    /** Whether to expose the factory only, or all protocols as well */
    private boolean expose_protocols=true;
 
+   /**
+    * Parses the contents of <code>input</code> into a map of the
+    * protocol stack configurations contained in the XML.
+    * 
+    * @param input stream which must contain XML content in the JGroups 
+    *              <code>stacks.xml</code> format
+    *              
+    * @return a map of the protocol stack configurations contained in the XML
+    * 
+    * @throws IllegalArgumentException if <code>input</code> is <code>null</code>
+    * @throws Exception
+    */
    public static Map<String, ProtocolStackConfigurator> parse(InputStream input) throws Exception 
    {
+      if (input == null)
+      {
+         throw new IllegalArgumentException("null input");
+      }
+      
       DocumentBuilderFactory factory=DocumentBuilderFactory.newInstance();
       factory.setValidating(false); //for now
       DocumentBuilder builder=factory.newDocumentBuilder();
@@ -178,20 +203,30 @@
       return parse(configElement);
    }
    
+   /**
+    * Parses the contents of <code>root</code> into a map of the
+    * protocol stack configurations contained in the XML.
+    * 
+    * @param root document root node for XML content in the JGroups 
+    *              <code>stacks.xml</code> format
+    *              
+    * @return a map of the protocol stack configurations contained in the XML
+    * 
+    * @throws IllegalArgumentException if <code>input</code> is <code>null</code>
+    * @throws Exception
+    */
    public static Map<String, ProtocolStackConfigurator> parse(Element root) throws Exception 
    {
-      /**
-        * CAUTION: crappy code ahead ! I (bela) am not an XML expert, so the code below is pretty amateurish...
-        * But it seems to work, and it is executed only on startup, so no perf loss on the critical path.
-        * If somebody wants to improve this, please be my guest.
-        */
+      if (root == null)
+      {
+         throw new IllegalArgumentException("null root");
+      }
+      
       String root_name = root.getNodeName();
       if (!PROTOCOL_STACKS.equals(root_name.trim().toLowerCase()))
       {
-         String error = "XML protocol stack configuration does not start with a '<config>' element; "
-               + "maybe the XML configuration needs to be converted to the new format ?\n"
-               + "use 'java org.jgroups.conf.XmlConfigurator <old XML file> -new_format' to do so";
-         throw new IOException("invalid XML configuration: " + error);
+         throw new IOException("Invalid XML configuration: configuration does not start with a '" + 
+                        PROTOCOL_STACKS + "' element");
       }
 
       Map<String, ProtocolStackConfigurator> result = new HashMap<String, ProtocolStackConfigurator>();
@@ -207,7 +242,7 @@
          String tmp = stack.getNodeName();
          if (!STACK.equals(tmp.trim().toLowerCase()))
          {
-            throw new IOException("invalid configuration: didn't find a \"" + STACK + "\" element under \""
+            throw new IOException("Invalid configuration: didn't find a \"" + STACK + "\" element under \""
                   + PROTOCOL_STACKS + "\"");
          }
 
@@ -226,8 +261,10 @@
             Element cfg = (Element) tmp_config;
             tmp = cfg.getNodeName();
             if (!CONFIG.equals(tmp))
-               throw new IOException("invalid configuration: didn't find a \"" + CONFIG + "\" element under \"" + STACK
-                     + "\"");
+            {
+               throw new IOException("Invalid configuration: didn't find a \"" + 
+                     CONFIG + "\" element under \"" + STACK + "\"");
+            }
 
             XmlConfigurator conf = XmlConfigurator.getInstance(cfg);
             // fixes http://jira.jboss.com/jira/browse/JGRP-290
@@ -239,7 +276,50 @@
 
       return result;
    }
+   
+   public static ProtocolData[] getProtocolData(ProtocolStackConfigurator config)
+   {
+      ProtocolData[] result = null;
+      try
+      {
+         result = config.getProtocolStack();
+      }
+      catch (UnsupportedOperationException e)
+      {
+         String s = config.getProtocolStackString();
+         String[] prots = s.split(":");
+         result = new ProtocolData[prots.length];
+         for (int i = 0; i < prots.length; i++)
+         {
+            ProtocolParameter[] params = null;
+            int paren = prots[i].indexOf('(');
+            String name = paren > - 1 ? prots[i].substring(0, paren) : prots[1];
+            if (paren > -1 && paren < prots[1].length() - 2)
+            {
+               String unsplit = prots[i].substring(paren + 1, prots[i].length() -1);
+               String[] split = unsplit.split(";");
+                params = new ProtocolParameter[split.length];
+               for (int j = 0; j < split.length; j++)
+               {
+                  String[] keyVal = split[j].split("=");
+                  params[j] = new ProtocolParameter(keyVal[0], keyVal[1]);
+               }
+            }
+            else
+            {
+               params = new ProtocolParameter[0];
+            }
+            
+            result[i] = new ProtocolData(name, null, name, params);
+         }
+      }
+      
+      return result == null ? new ProtocolData[0] : result;
+   }
 
+   /**
+    * Creates a new JChannelFactory.
+    */
    @SuppressWarnings("unchecked")
    public JChannelFactory()
    {
@@ -254,65 +334,75 @@
       throw new ChannelException("No-arg createChannel() is not supported");
    }
 
+   /**
+    * Creates a channel by passing <code>properties</code> to the 
+    * <code>org.jgroups.JChannel</code> constructor.
+    * 
+    * @param properties protocol stack configuration object; can be <code>null</code>
+    *                   in which case a default stack will be used
+    * 
+    * @return the channel
+    */
    public Channel createChannel(Object properties) throws ChannelException
    {
       checkStarted();
-      
-      @SuppressWarnings("deprecation")      
-      Channel channel = new JChannel(properties);
-      
-      if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
+
+      if (properties == null)
+         properties = JChannel.DEFAULT_PROTOCOL_STACK;
+
+      ProtocolStackConfigurator config = null;
+
+      try
       {
-         fixChannelThreadManagement(channel);
+         @SuppressWarnings("deprecation")
+         ProtocolStackConfigurator c = ConfiguratorFactory.getStackConfigurator(properties);
+         config = c;
       }
-      
-      if (assignLogicalAddresses)
+      catch (Exception x)
       {
-         setChannelUniqueId(channel);
+         throw new ChannelException("unable to load protocol stack", x);
       }
-      
-      // can't register in JMX as we don't have a channel name
-      
+
+      JChannel channel = initializeChannel(config, null, false);
+
+      try
+      {
+         registerChannel(channel, null, null, getProtocolData(config));
+      }
+      catch (ChannelException ce)
+      {
+         throw ce;
+      }
+      catch (Exception e)
+      {
+         throw new ChannelException("unable to register channel", e);
+      }
+
       return channel;
    }
 
    /**
     * Create a {@link Channel} using the specified stack. Channel will use a 
-    * shared transport if the <code>singleton-name</code> attribute is
-    * set on the stack's transport protocol.
+    * shared transport.
     * 
-    * @param stack_name the name of the stack
-    * @return the channel
+    * @param stack_name 
+    *            The name of the stack to be used. All stacks are defined in
+    *            the configuration with which the factory is configured (see
+    *            {@link #setMultiplexerConfig(Object)} for example. If
+    *            clients attempt to create a Channel for an undefined stack 
+    *            name an exception will be thrown.
     * 
+    * @return an implementation of Channel configured with a shared transport.
+    * 
+    * @throws IllegalArgumentException if <code>stack_name</code> is 
+    * <code>null</code> or {@link #getConfig(String)} returns <code>null</code>
+    * when <code>stack_name</code> is used.
+    * 
     * @throws Exception
     */
    public Channel createChannel(String stack_name) throws Exception
    {
-      checkStarted();
-      
-      String props=stack_name != null? getConfig(stack_name) : null;
-      if (props == null)
-      {
-         log.warn("No protocol stack found with name " + stack_name +
-                  "; creating default channel");
-         return createChannel();
-      }
-      
-      JChannel channel = new JChannel(props);
-      
-      if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
-      {
-         fixChannelThreadManagement(channel);
-      }
-      
-      if (assignLogicalAddresses)
-      {
-         setChannelUniqueId(channel);
-      }
-      
-      // can't register in JMX as we don't have a channel name
-      
-      return channel;
+      return createChannelFromRegisteredStack(stack_name, null, false);
    }
    
    /**
@@ -342,12 +432,12 @@
     *            {@link #setMultiplexerConfig(Object)} for example. If
     *            clients attempt to create a Channel for an undefined stack 
     *            name an Exception will be thrown.
-    * @param id  Only used if the transport protocol configuration for the
-    *            specified stack does not include the <code>singleton_name</code>
-    *            attribute; then it is used to create a synthetic singleton-name
-    *            for the channel's protocol stack.
+    * @param id  Only used if {@link #isExposeChannels()} returns <code>true</code>,
+    *            in which case, if not <code>null</code>, is used as part of
+    *            the <code>ObjectName</code> for the JMX mbeans that represent
+    *            the channel and its protocols. Can be <code>null</code>.
     *            
-    * @return An implementation of Channel configured with a shared transport.
+    * @return an implementation of Channel configured with a shared transport.
     *         
     * @throws IllegalStateException if the specified protocol stack does not
     *                               declare a <code>singleton_name</code> and
@@ -357,57 +447,8 @@
     */
    public Channel createMultiplexerChannel(String stack_name, String id) throws Exception
    {
-      checkStarted();
-      
-      String configStr = getConfig(stack_name);
-      
-      if (configStr == null)
-         throw new IllegalStateException("Unknown stack_name " + stack_name);
-      
-      ProtocolStackConfigurator config = ConfiguratorFactory.getStackConfigurator(configStr);
-      Map<String, String> tpProps = getTransportProperties(config);
-      
-      if (!tpProps.containsKey(Global.SINGLETON_NAME))
-      {
-         if (addMissingSingletonName)
-         {
-            String singletonName = UNSHARED_TRANSPORT_NAME_BASE + stack_name;
-            
-            log.warn("Config for " + stack_name + " does not include " +
-                      "singleton_name; adding a name of " + singletonName +
-                      ". You should configure a singleton_name for this stack.");
-            
-            config = addSingletonName(config, singletonName);
-            log.debug("Stack config after adding singleton_name is " + config.getProtocolStackString());
-            tpProps = getTransportProperties(config);                       
-         }
-         else
-         {
-            throw new IllegalStateException("Config for " + stack_name + " does not include " +
-                      "singleton_name and MuxChannels are not supported.");
-         }
-      }
-      
-      JChannel channel = new JChannel(config);
-      
-      if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
-      {
-         fixChannelThreadManagement(channel);
-      }
-      
-      if (assignLogicalAddresses)
-      {
-         setChannelUniqueId(channel);
-      }
-      
-      if (isExposeChannels() && id != null && id.length() > 0)
-      {
-         registerChannel(channel, id);
-      }
-      
-      return channel;
+      return createChannelFromRegisteredStack(stack_name, id, true);
    }  
-
    
    /**
     * Creates and returns a shared transport Channel configured with the specified 
@@ -434,27 +475,73 @@
       return createMultiplexerChannel(stack_name, id);
    }
    
+   /**
+    * {@link #parse(Element) Parses <code>properties</code>} and then adds
+    * the resulting protocol stack configurations to the set available for use.
+    * Same as 
+    * {@link #setMultiplexerConfig(Element, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+    * 
+    * @param properties document root node for XML content in the JGroups 
+    *                   <code>stacks.xml</code> format
+    */
    public void setMultiplexerConfig(Element properties) throws Exception
    {
       setMultiplexerConfig(properties, true);
    }
 
+   /**
+    * {@link #parse(InputStream) Parses} an input stream created from 
+    * <code>properties</code> and then adds the resulting protocol stack 
+    * configurations to the set available for use. Same as 
+    * {@link #setMultiplexerConfig(File, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+    * 
+    * @param properties file which must contain XML content in the JGroups 
+    *              <code>stacks.xml</code> format
+    */
    public void setMultiplexerConfig(File properties) throws Exception
    {
-      setMultiplexerConfig(properties, true);
-      
+      setMultiplexerConfig(properties, true);      
    }
 
+   /**
+    * {@link #parse(InputStream) Parses} an input stream created from 
+    * <code>properties</code> and then adds the resulting protocol stack 
+    * configurations to the set available for use. Same as 
+    * {@link #setMultiplexerConfig(Object, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+    * 
+    * @param properties object that can be {@link ConfiguratorFactory#getConfigStream(Object) converted into a stream}
+    *                   which must contain XML content in the JGroups 
+    *                   <code>stacks.xml</code> format
+    */
    public void setMultiplexerConfig(Object properties) throws Exception
    {
       setMultiplexerConfig(properties, true);
    }
 
+   /**
+    * {@link #parse(InputStream) Parses} an input stream created from 
+    * <code>properties</code> and then adds the resulting protocol stack 
+    * configurations to the set available for use. Same as 
+    * {@link #setMultiplexerConfig(String, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+    * 
+    * @param properties string that can be {@link ConfiguratorFactory#getConfigStream(String) converted into a stream}
+    *                   which must contain XML content in the JGroups 
+    *                   <code>stacks.xml</code> format
+    */
    public void setMultiplexerConfig(String properties) throws Exception
    {
       setMultiplexerConfig(properties, true);
    }
 
+   /**
+    * {@link #parse(InputStream) Parses} an input stream created from 
+    * <code>properties</code> and then adds the resulting protocol stack 
+    * configurations to the set available for use. Same as 
+    * {@link #setMultiplexerConfig(URL, boolean) <code>setMultiplexerConfig(properties, true</code>}.
+    * 
+    * @param properties URL which must contain XML content in the JGroups 
+    *              <code>stacks.xml</code> format
+    */
    public void setMultiplexerConfig(URL properties) throws Exception
    {
       setMultiplexerConfig(properties, true);
@@ -462,38 +549,63 @@
 
    // -------------------------------------------------------------  Properties
 
-   public MBeanServer getServer() {
+   /**
+    * Gets the MBeanServer to use to register mbeans for channels and protocols
+    * we create.
+    * 
+    * @return the MBeanServer, or <code>null</code> if one isn't registered
+    */
+   public MBeanServer getServer() 
+   {
        return server;
    }
 
-   public void setServer(MBeanServer server) {
+   /**
+    * Sets the MBeanServer to use to register mbeans for channels and protocols
+    * we create.
+    * 
+    * @param server the MBeanServer. May be <code>null</code>
+    */
+   public void setServer(MBeanServer server) 
+   {
        this.server=server;
    }
 
-   public String getDomain() {
-       return domain;
+   /**
+    * Gets the domain portion of the JMX ObjectName to use when registering channels and protocols
+    * 
+    * @return the domain. Will not return <code>null</code> after {@link #create()}
+    *         has been invoked.
+    */
+   public String getDomain() 
+   {
+       return domain == null ? "jgroups" : domain;
    }
    
    @ManagementProperty(use={ViewUse.CONFIGURATION}, description="The domain portion of the JMX ObjectName to use when registering channels and protocols")
    public void setDomain(String domain)
    {
       this.domain = domain;
-      domainSet = true;
+      this.domainSet = true;
    }
 
-   public boolean isExposeChannels() {
+   public boolean isExposeChannels() 
+   {
        return expose_channels;
    }
 
-   public void setExposeChannels(boolean expose_channels) {
+   public void setExposeChannels(boolean expose_channels) 
+   {
        this.expose_channels=expose_channels;
    }
 
-   public boolean isExposeProtocols() {
+   public boolean isExposeProtocols() 
+   {
        return expose_protocols;
    }
 
-   public void setExposeProtocols(boolean expose_protocols) {
+   public void setExposeProtocols(boolean expose_protocols) 
+   {      
        this.expose_protocols=expose_protocols;
        if (expose_protocols)
           this.expose_channels=true;
@@ -685,7 +797,8 @@
     * @return <code>true</code> if synthetic singleton names should be created.
     *         Default is <code>true</code>.
     */
-   @ManagementProperty(use={ViewUse.CONFIGURATION}, description="Whether this factory should create a synthetic singleton name attribute for a channel's transport protocol if one isn't configured")
+   @ManagementProperty(use={ViewUse.CONFIGURATION}, 
+         description="Whether this factory should create a synthetic singleton name attribute for a channel's transport protocol if one isn't configured")
    public boolean getAddMissingSingletonName()
    {
       return addMissingSingletonName;
@@ -706,107 +819,129 @@
    
    // -------------------------------------------------------------  Public
 
+   /**
+    * {@link #parse(Element) Parses <code>properties</code>} and then adds
+    * the resulting protocol stack configurations to the set available for use.
+    * 
+    * @param properties document root node for XML content in the JGroups 
+    *                   <code>stacks.xml</code> format
+    * @param replace <code>true</code> if a configuration with the same
+    *                stack name as an already registered configuration should
+    *                replace that configuration; <code>false</code> if it
+    *                should be discarded.
+    */
    public void setMultiplexerConfig(Element properties, boolean replace) throws Exception
    {
       Map<String, ProtocolStackConfigurator> map = parse(properties);
       
+      for (Map.Entry<String, ProtocolStackConfigurator> entry : map.entrySet())
+      {
+         addConfig(entry.getKey(), entry.getValue(), replace);
+      }
    }
 
+   /**
+    * {@link #parse(InputStream) Parses} an input stream created from 
+    * <code>properties</code> and then adds the resulting protocol stack 
+    * configurations to the set available for use.
+    * 
+    * @param properties file which must contain XML content in the JGroups 
+    *              <code>stacks.xml</code> format
+    * @param replace <code>true</code> if a configuration with the same
+    *                stack name as an already registered configuration should
+    *                replace that configuration; <code>false</code> if it
+    *                should be discarded.
+    */
    public void setMultiplexerConfig(File properties, boolean replace) throws Exception
    {
       InputStream input=ConfiguratorFactory.getConfigStream(properties);      
       addConfigs(input, properties.toString(), replace);
    }
 
+   /**
+    * {@link #parse(InputStream) Parses} an input stream created from 
+    * <code>properties</code> and then adds the resulting protocol stack 
+    * configurations to the set available for use. 
+    * 
+    * @param properties object that can be {@link ConfiguratorFactory#getConfigStream(Object) converted into a stream}
+    *                   which must contain XML content in the JGroups 
+    *                   <code>stacks.xml</code> format
+    * @param replace <code>true</code> if a configuration with the same
+    *                stack name as an already registered configuration should
+    *                replace that configuration; <code>false</code> if it
+    *                should be discarded.
+    */
    public void setMultiplexerConfig(Object properties, boolean replace) throws Exception
    {
       InputStream input=ConfiguratorFactory.getConfigStream(properties);
       addConfigs(input, properties.toString(), replace);   
    }
 
+   /**
+    * {@link #parse(InputStream) Parses} an input stream created from 
+    * <code>properties</code> and then adds the resulting protocol stack 
+    * configurations to the set available for use.
+    * 
+    * @param properties string that can be {@link ConfiguratorFactory#getConfigStream(String) converted into a stream}
+    *                   which must contain XML content in the JGroups 
+    *                   <code>stacks.xml</code> format
+    * @param replace <code>true</code> if a configuration with the same
+    *                stack name as an already registered configuration should
+    *                replace that configuration; <code>false</code> if it
+    *                should be discarded.
+    */
    public void setMultiplexerConfig(String properties, boolean replace) throws Exception
    {
       InputStream input=ConfiguratorFactory.getConfigStream(properties);      
       addConfigs(input, properties, replace);
    }
 
+   /**
+    * {@link #parse(InputStream) Parses} an input stream created from 
+    * <code>properties</code> and then adds the resulting protocol stack 
+    * configurations to the set available for use.
+    * 
+    * @param properties URL which must contain XML content in the JGroups 
+    *              <code>stacks.xml</code> format
+    * @param replace <code>true</code> if a configuration with the same
+    *                stack name as an already registered configuration should
+    *                replace that configuration; <code>false</code> if it
+    *                should be discarded.
+    */
    public void setMultiplexerConfig(URL url, boolean replace) throws Exception
    {
       InputStream input=ConfiguratorFactory.getConfigStream(url);      
       addConfigs(input, url.toString(), replace);
    }
-
-   private void addConfigs(InputStream input, String source, boolean replace) throws Exception
-   {
-      if(input == null)
-      {
-         throw new FileNotFoundException(source);
-      }
-      
-      Map<String, ProtocolStackConfigurator> map = null;
-      try {
-          map = parse(input);
-      }
-      catch(Exception ex) {
-          throw new Exception("failed parsing " + source, ex);
-      }
-      finally {
-          Util.close(input);
-      }
-      
-      for (Map.Entry<String, ProtocolStackConfigurator> entry : map.entrySet())
-      {
-         addConfig(entry.getKey(), entry.getValue(), replace);
-      }
-   }
    
    // ---------------------------------------------------  JChannelFactoryMBean
-   
 
-
-   private boolean addConfig(String st_name, ProtocolStackConfigurator val, boolean replace)
-   {
-      boolean added = replace;
-      if(replace) {
-         stacks.put(st_name, val);
-         if(log.isTraceEnabled())
-             log.trace("added config '" + st_name + "'");
-     }
-     else {
-         if(!stacks.containsKey(st_name)) {
-             stacks.put(st_name, val);
-             if(log.isTraceEnabled())
-                 log.trace("added config '" + st_name + "'");
-             replace = true;
-         }
-         else {
-             if(log.isTraceEnabled())
-                 log.trace("didn't add config '" + st_name + " because one of the same name already existed");
-         }
-     }
-      return replace;
-      
-   }
-
+   /**
+    * {@inheritDoc}
+    */
    public void clearConfigurations()
    {
       this.stacks.clear();
    }
 
+   /**
+    * {@inheritDoc}
+    */
    public String dumpChannels()
    {
       return "";
    }
 
+   /**
+    * {@inheritDoc}
+    */
    public String dumpConfiguration()
    {
       return stacks.keySet().toString();
    }
 
    /**
-    * Returns the stack configuration as a string (to be fed into new JChannel()). Throws an exception
-    * if the stack_name is not found. One of the setMultiplexerConfig() methods had to be called beforehand
-    * @return The protocol stack config as a plain string
+    * {@inheritDoc}
     */
    public String getConfig(String stack_name) throws Exception
    {
@@ -817,7 +952,7 @@
    }
 
    /**
-    * @return Returns all configurations
+    * {@inheritDoc}
     */
    public String getMultiplexerConfig()
    {
@@ -829,6 +964,9 @@
       return sb.toString();
    }
 
+   /**
+    * {@inheritDoc}
+    */
    public boolean removeConfig(String stack_name)
    {
       return stack_name != null && this.stacks.remove(stack_name) != null;
@@ -836,6 +974,9 @@
    
    // -------------------------------------------------------------  Lifecycle
 
+   /**
+    * {@inheritDoc}
+    */
    public void create() throws Exception
    {
 
@@ -862,22 +1003,9 @@
       log.debug("Created JChannelFactory");
    }
 
-   protected void createService() throws Exception
-   {
-      if(expose_channels) 
-      {
-         if(server == null)
-            server=Util.getMBeanServer();
-         if(server == null)
-         {
-            throw new Exception("No MBeanServer found; JChannelFactory needs to be run with an MBeanServer present, " +
-                      "e.g. inside JBoss or JDK 5, or with ExposeChannels set to false");
-         }
-         if(domain == null)
-            domain="jgroups";
-      }
-   }
-
+   /**
+    * {@inheritDoc}
+    */
    public void start() throws Exception
    {
       if (state == STARTING || state == STARTED || state == STOPPING)
@@ -911,11 +1039,9 @@
       
    }
 
-   protected void startService() throws Exception
-   {
-      // no-op
-   }
-
+   /**
+    * {@inheritDoc}
+    */
    public void stop()
    {
       if (state != STARTED)
@@ -942,11 +1068,9 @@
       log.debug("Stopped JChannelFactory");
    }
 
-   protected void stopService() throws Exception
-   {
-      // no-op
-   }
-
+   /**
+    * {@inheritDoc}
+    */
    public void destroy()
    {
       if (state == DESTROYED)
@@ -975,33 +1099,18 @@
       log.debug("Destroyed JChannelFactory");
    }
 
-   protected void destroyService()
-   {
-      Set<String> toUnregister = null;
-      synchronized (registeredChannels)
-      {
-         toUnregister = new HashSet<String>(registeredChannels);
-      }
-      
-      for (String channelId : toUnregister)
-      {
-         unregister(channelId);
-      }
-   }
-
    // ------------------------------------------------------- MBeanRegistration
 
    public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception
    {
       setServer(server);
-      if (!domainSet || getDomain() == null)
+      if (!this.domainSet || this.domain == null)
       {
          setDomain(name.getDomain());
       }
       return name;
    }
 
-
    public void postRegister(Boolean registrationDone)
    {
       if (registrationDone != null && registrationDone.booleanValue()
@@ -1011,7 +1120,6 @@
       }
    }
 
-
    public void preDeregister() throws Exception
    { 
    }
@@ -1022,6 +1130,8 @@
       if (state == ServiceMBean.DESTROYED)
          state = ServiceMBean.UNREGISTERED;
    }
+
+   // --------------------------------------------------------------- Protected
    
    /**
     * Gets the classloader that channel threads should be set to if
@@ -1037,6 +1147,42 @@
       return getClass().getClassLoader();
    }
 
+   protected void createService() throws Exception
+   {
+      if(expose_channels) 
+      {
+         if(server == null)
+         {
+            throw new Exception("No MBeanServer found; JChannelFactory needs to " +
+            		"be run with an MBeanServer present, or with ExposeChannels " +
+            		"set to false");
+         }
+         
+         if(domain == null)
+         {
+            domain="jgroups";
+         }
+      }
+   }
+
+   protected void startService() throws Exception
+   {
+      // no-op
+   }
+
+   protected void stopService() throws Exception
+   {
+      // no-op
+   }
+
+   protected void destroyService()
+   {  
+      for (Channel ch : registeredChannels.keySet())
+      {
+         unregisterChannel(ch);
+      }
+   }
+
    // ----------------------------------------------------------------- Private
 
 
@@ -1046,6 +1192,142 @@
          throw new IllegalStateException("Cannot use factory; state is " + getStateString());
    }
    
+   private void addConfigs(InputStream input, String source, boolean replace) throws Exception
+   {
+      if(input == null)
+      {
+         throw new FileNotFoundException(source);
+      }
+      
+      Map<String, ProtocolStackConfigurator> map = null;
+      try 
+      {
+          map = parse(input);
+      }
+      catch(Exception ex) 
+      {
+          throw new Exception("failed parsing " + source, ex);
+      }
+      finally 
+      {
+          Util.close(input);
+      }
+      
+      for (Map.Entry<String, ProtocolStackConfigurator> entry : map.entrySet())
+      {
+         addConfig(entry.getKey(), entry.getValue(), replace);
+      }
+   }
+
+   private boolean addConfig(String st_name, ProtocolStackConfigurator val, boolean replace)
+   {
+      boolean added = replace;
+      if (replace)
+      {
+         stacks.put(st_name, val);
+         if (log.isTraceEnabled())
+            log.trace("added config '" + st_name + "'");
+      }
+      else
+      {
+         if (!stacks.containsKey(st_name))
+         {
+            stacks.put(st_name, val);
+            if (log.isTraceEnabled())
+               log.trace("added config '" + st_name + "'");
+            added = true;
+         }
+         else
+         {
+            if (log.isTraceEnabled())
+               log.trace("didn't add config '" + st_name + " because one of the same name already existed");
+         }
+      }
+      return added;      
+   }
+
+   /**
+    * Creates a channel from one of the known stack configurations
+    * FIXME Comment this
+    * 
+    * @param stack_name the name of the stack config
+    * @param id optional id for the channel
+    * @param forceSingletonStack <code>true</code> if a singleton_name must be
+    *             either configured, or addMissingSingletonName must be true
+    *             
+    * @return the channel
+    * 
+    * @throws IllegalStateException if stack_name is unknown or forceSingletonStack
+    *           is true but a singleton_name couldn't be configured
+    *           
+    * @throws Exception
+    */
+   private Channel createChannelFromRegisteredStack(String stack_name, String id, boolean forceSingletonStack) throws Exception
+   {
+      checkStarted();
+      
+      ProtocolStackConfigurator config = stacks.get(stack_name);
+      
+      if (config == null)
+         throw new IllegalStateException("Unknown stack_name " + stack_name);
+      
+      JChannel channel = initializeChannel(config, stack_name, forceSingletonStack);
+      
+      registerChannel(channel, id, stack_name, getProtocolData(config));      
+      
+      return channel;
+   }
+
+   /**
+    * Construct a JChannel from the given config and then do post-construction
+    * processing like fixing up thread managment or setting a unique id.
+    * 
+    * @param config the config
+    * 
+    * @return the channel
+    * 
+    * @throws ChannelException
+    */
+   private JChannel initializeChannel(ProtocolStackConfigurator config, String stack_name,
+         boolean forceSingletonStack) throws ChannelException
+   {  
+      Map<String, String> tpProps = getTransportProperties(config);
+   
+      if (!tpProps.containsKey(Global.SINGLETON_NAME))
+      {
+         if (addMissingSingletonName && stack_name != null)
+         {
+            String singletonName = UNSHARED_TRANSPORT_NAME_BASE + stack_name;
+            
+            log.warn("Config for " + stack_name + " does not include " +
+                      "singleton_name; adding a name of " + singletonName +
+                      ". You should configure a singleton_name for this stack.");
+            
+            config = addSingletonName(config, singletonName);
+            log.debug("Stack config after adding singleton_name is " + config.getProtocolStackString());
+            tpProps = getTransportProperties(config);                       
+         }
+         else if (forceSingletonStack)
+         {
+            throw new IllegalStateException("Config for " + stack_name + " does not include " +
+                      "singleton_name and MuxChannels are not supported.");
+         }
+      }
+      JChannel channel = new JChannel(config);
+      
+      if (manageNewThreadClassLoader || manageReleasedThreadClassLoader)
+      {
+         fixChannelThreadManagement(channel);
+      }
+      
+      if (assignLogicalAddresses)
+      {
+         setChannelUniqueId(channel);
+      }
+      
+      return channel;
+   }
+   
    private void setChannelUniqueId(Channel channel)
    {
       IpAddress address = (IpAddress) channel.getLocalAddress();
@@ -1134,31 +1416,15 @@
    private Map<String, String> getTransportProperties(ProtocolStackConfigurator config)
    {
       Map<String, String> tpProps = null;
-      try
+      ProtocolData[] protocols= getProtocolData(config);
+      ProtocolData transport=protocols[0];
+      @SuppressWarnings("unchecked")
+      Map<String,ProtocolParameter> tmp=transport.getParameters();
+      tpProps = new HashMap<String,String>();
+      for(Map.Entry<String,ProtocolParameter> entry: tmp.entrySet())
       {
-         ProtocolData[] protocols=config.getProtocolStack();
-         ProtocolData transport=protocols[0];
-         Map<String,ProtocolParameter> tmp=transport.getParameters();
-         tpProps = new HashMap<String,String>();
-         for(Map.Entry<String,ProtocolParameter> entry: tmp.entrySet())
-             tpProps.put(entry.getKey(), entry.getValue().getValue());
+          tpProps.put(entry.getKey(), entry.getValue().getValue());
       }
-      catch (UnsupportedOperationException uoe)
-      {
-         // JGroups version hasn't implemented ProtocolStackConfigurator.getProtocolStack()
-         // So we do it ourselves
-         String configStr = config.getProtocolStackString();
-         String tpConfigStr = configStr.substring(configStr.indexOf('(') + 1, configStr.indexOf(')'));
-         String[] params = tpConfigStr.split(";");
-         tpProps = new HashMap<String, String>();
-         for (String param : params)
-         {
-            String[] keyVal = param.split("=");
-            if (keyVal.length != 2)
-               throw new IllegalStateException("Invalid parameter " + param + " in stack " + configStr);
-            tpProps.put(keyVal[0], keyVal[1]);
-         }
-      }
       
       return tpProps;
    }
@@ -1171,12 +1437,8 @@
       {
          ProtocolData[] protocols=orig.getProtocolStack();
          ProtocolData transport=protocols[0];
-         Map<String,ProtocolParameter> tmp=transport.getParameters();
-         Map<String,String> tpProps = new HashMap<String,String>();
-         for(Map.Entry<String,ProtocolParameter> entry: tmp.entrySet())
-             tpProps.put(entry.getKey(), entry.getValue().getValue());
-         tpProps.put(Global.SINGLETON_NAME, singletonName);
-         // we've now updated the state of orig; just return it
+         ProtocolParameter singletonParam = new ProtocolParameter(Global.SINGLETON_NAME, singletonName);
+         transport.override(new ProtocolParameter[]{ singletonParam});
          result = orig;
       }
       catch (UnsupportedOperationException uoe)
@@ -1359,40 +1621,124 @@
       classLoaderSwitcher.setContextClassLoader(thread, classLoader);
    }
    
-   private void registerChannel(JChannel ch, String channelId) throws Exception 
+   private void registerChannel(JChannel ch, String channelId, String stackName, ProtocolData[] config) throws Exception
    {
-      if(getServer() != null)
+      // Register for channel closed notification so we can unregister
+      ch.addChannelListener(closeListener);
+      
+      ObjectName chName = null;
+      List<ObjectName> protNames = null;
+      List<ObjectName> allNames = registerInJmx(ch, channelId);
+      if (allNames != null && allNames.size() > 0)
       {
-         // Register for channel closed notification so we can unregister
-         JmxDeregistrationChannelListener listener = new JmxDeregistrationChannelListener(channelId);
-         ch.addChannelListener(listener);
-         JmxConfigurator.registerChannel(ch, getServer(), getDomain(), channelId, isExposeProtocols());
-         synchronized (registeredChannels)
+         chName = allNames.get(0);
+         if (allNames.size() > 1)
          {
-            registeredChannels.add(channelId);
+            protNames = allNames.subList(1, allNames.size());
          }
       }
+      
+      ChannelInfo info = new ChannelInfo(channelId, stackName, ch, config, chName, protNames);
+      registeredChannels.put(ch, info);
    }
    
-   private void unregister(String channelId) 
+   private List<ObjectName> registerInJmx(JChannel ch, String channelId) throws Exception 
    {
-      if(getServer() != null && registeredChannels.contains(channelId))
+      List<ObjectName> allNames = null;
+      
+      if(isExposeChannels() && getServer() != null && channelId != null && channelId.length() > 0)
       {
-         String oname = getDomain() + ":type=channel,cluster=" + channelId;
-         try
+         allNames = new ArrayList<ObjectName>();
+         ObjectName channelName = new ObjectName(getDomain() + ":type=channel,cluster=" + channelId);
+         getServer().registerMBean(new org.jgroups.jmx.JChannel(ch), channelName);
+         allNames.add(channelName);
+         if (isExposeProtocols())
          {
-            getServer().unregisterMBean(new ObjectName(oname));
-            oname = getDomain() + ":type=protocol,cluster=" + channelId + ",*";
-            JmxConfigurator.unregister(getServer(), oname);
-            synchronized (registeredChannels)
+            String baseName = getDomain() + ":type=protocol,cluster=" + channelId;
+            ProtocolStack stack=ch.getProtocolStack();
+            List<Protocol> protocols=stack.getProtocols();
+            
+            for(Protocol prot : protocols)
             {
-               registeredChannels.remove(channelId);
+                org.jgroups.jmx.Protocol p=null;
+                try {
+                   String prot_name = prot.getClass().getName();
+                   String clname = prot_name.replaceFirst("org.jgroups.", "org.jgroups.jmx.");
+                   Class<?> cl = Util.loadClass(clname, JmxConfigurator.class);
+                   if (cl != null)
+                   {
+                      p = (org.jgroups.jmx.Protocol) cl.newInstance();
+                   }
+                }
+                catch(ClassNotFoundException e) 
+                {
+                   // ignore;
+                }
+                catch(Throwable e) {
+                    log.error("failed creating a JMX wrapper instance for " + prot, e);
+                    p = null;
+                }
+                if(p == null)
+                {
+                   // Use default
+                   p = new org.jgroups.jmx.Protocol(prot);
+                }
+                ObjectName prot_name=new ObjectName(baseName + ",protocol=" + prot.getName());
+                server.registerMBean(p, prot_name);
+                allNames.add(prot_name);
             }
          }
+      }
+      
+      return allNames;
+   }
+   
+   private void unregisterChannel(Channel ch)
+   {
+      ChannelInfo info = registeredChannels.remove(ch);
+      if (info == null)
+      {
+         log.warn("Unknown channel " + ch.getClusterName());
+      }
+      else
+      {
+         unregisterFromJmx(info);
+      }
+      
+      ch.removeChannelListener(closeListener);
+   }
+   
+   private void unregisterFromJmx(ChannelInfo info) 
+   {
+      ObjectName oname = info.getChannelObjectName();
+      MBeanServer mbs = getServer();
+      if(info != null && mbs != null)
+      {
+         try
+         {
+            mbs.unregisterMBean(oname);
+         }
          catch(Exception e)
          {
             log.error("failed unregistering " + oname, e);
          }
+         
+         List<ObjectName> onames = info.getProtocolObjectNames();
+         if (onames != null)
+         {
+            for (ObjectName protName : onames)
+            {
+               try
+               {
+                  mbs.unregisterMBean(protName);
+               }
+               catch(Exception e)
+               {
+                  log.error("failed unregistering " + protName, e);
+               }
+               
+            }
+         }
       }
    }
    
@@ -1432,18 +1778,11 @@
       
    }
 
-   private class JmxDeregistrationChannelListener extends ChannelListenerAdapter
+   private class ChannelCloseListener extends ChannelListenerAdapter
    {
-      private final String channelId;
-      
-      JmxDeregistrationChannelListener(String channelId)
-      {
-         this.channelId = channelId;
-      }
-      
       public void channelClosed(Channel channel) 
       {
-         unregister(channelId);
+         unregisterChannel(channel);
       }            
    }
    




More information about the jboss-cvs-commits mailing list