[Jboss-cvs] JBossAS SVN: r56087 - trunk/cluster/src/main/org/jboss/ha/framework/server

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 18 19:03:05 EDT 2006


Author: bstansberry at jboss.com
Date: 2006-08-18 19:03:05 -0400 (Fri, 18 Aug 2006)
New Revision: 56087

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
   trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartitionMBean.java
Log:
[JBAS-3531] Inject ClusterTreeCache into ClusterPartition
[JBAS-3532] Inject DS into ClusterPartition

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2006-08-18 23:02:09 UTC (rev 56086)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2006-08-18 23:03:05 UTC (rev 56087)
@@ -34,7 +34,10 @@
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
+import org.jboss.cache.TreeCacheMBean;
+import org.jboss.ha.framework.interfaces.DistributedState;
 import org.jboss.ha.framework.interfaces.HAPartition;
+import org.jboss.mx.util.MBeanProxyExt;
 import org.jboss.naming.NamingServiceMBean;
 import org.jboss.system.ServiceMBean;
 import org.jboss.system.ServiceMBeanSupport;
@@ -68,41 +71,29 @@
 
    // Attributes ----------------------------------------------------
 
-   protected JChannelFactoryMBean multiplexer;
-   protected String stackName = "fc-fast-minimalthreads";
-   protected String partitionName = ServerConfigUtil.getDefaultPartitionName();
-   protected String jgProps =
-      "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=64;" +
-      "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
-      "PING(timeout=2000;num_initial_members=3):" +
-      "MERGE2(min_interval=5000;max_interval=10000):" +
-      "FD:" +
-      "VERIFY_SUSPECT(timeout=1500):" +
-      "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):" +
-      "UNICAST(timeout=600,1200,2400):" +
-      "pbcast.STABLE(desired_avg_gossip=20000):" +
-      "FRAG(down_thread=false;up_thread=false;frag_size=8192):" +
-      "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
-      "shun=false;print_local_addr=true):" +
-      "pbcast.STATE_TRANSFER";
+   private   TreeCacheMBean cache;
+   private   ObjectName multiplexerObjectName;
+   private   JChannelFactoryMBean multiplexer;
+   private   String stackName;
+   private   String partitionName = ServerConfigUtil.getDefaultPartitionName();
+   private   DistributedState dsManager;
+   private   HAPartitionImpl partition;
+   private   boolean deadlock_detection = false;
+   private   boolean allow_sync_events = false;
+   private   JChannel channel;
+   private   Debugger debugger=null;
+   private   boolean use_debugger=false;
 
-   protected HAPartitionImpl partition;
-   protected boolean deadlock_detection = false;
-   protected boolean allow_sync_events = false;
-   protected JChannel channel;
-   protected Debugger debugger=null;
-   protected boolean use_debugger=false;
+   private   String nodeName = null;
+   private   InetAddress nodeAddress = null;
 
-   protected String nodeName = null;
-   protected InetAddress nodeAddress = null;
-
    /** Number of milliseconds to wait until state has been transferred. Increase this value for large states
     * 0 = wait forever
     */
-   protected long state_transfer_timeout=60000;
+   private   long state_transfer_timeout=60000;
 
 
-   protected long method_call_timeout=60000;
+   private   long method_call_timeout=60000;
 
    // Static --------------------------------------------------------
 
@@ -122,57 +113,6 @@
       partitionName = newName;
    }
 
-   public String getPartitionProperties()
-   {
-      return jgProps;
-   }
-
-   public void setPartitionProperties(String newProps)
-   {
-      jgProps = newProps;
-   }
-
-   /** Convert a list of elements to the JG property string
-    */
-   public void setPartitionConfig(Element config)
-   {
-      StringBuffer buffer = new StringBuffer();
-      NodeList stack = config.getChildNodes();
-      int length = stack.getLength();
-      for(int s = 0; s < length; s ++)
-      {
-         Node node = stack.item(s);
-         if( node.getNodeType() != Node.ELEMENT_NODE )
-            continue;
-
-         Element tag = (Element) node;
-         String protocol = tag.getTagName();
-         buffer.append(protocol);
-         NamedNodeMap attrs = tag.getAttributes();
-         int attrLength = attrs.getLength();
-         if( attrLength > 0 )
-            buffer.append('(');
-         for(int a = 0; a < attrLength; a ++)
-         {
-            Attr attr = (Attr) attrs.item(a);
-            String name = attr.getName();
-            String value = attr.getValue();
-            buffer.append(name);
-            buffer.append('=');
-            buffer.append(value);
-            if( a < attrLength-1 )
-               buffer.append(';');
-         }
-         if( attrLength > 0 )
-            buffer.append(')');
-         buffer.append(':');
-      }
-      // Remove the trailing ':'
-      buffer.setLength(buffer.length()-1);
-      this.jgProps = buffer.toString();
-      log.debug("Setting JGProps from xml to: "+jgProps);
-   }
-
    /**
     * Uniquely identifies this node. MUST be unique accros the whole cluster!
     * Cannot be changed once the partition has been started
@@ -280,23 +220,49 @@
 
    public JChannelFactoryMBean getMultiplexer()
    {
+      if (multiplexer == null && cache != null && server != null)
+      {
+         multiplexer = (JChannelFactoryMBean) MBeanProxyExt.create(JChannelFactoryMBean.class,
+                                                                   multiplexerObjectName);
+      }
       return multiplexer;
    }
 
-   public void setMultiplexer(JChannelFactoryMBean factory)
-   {
-      multiplexer = factory;
-   }
-
    public String getMultiplexerStack()
    {
       return stackName;
    }
+   
+   public DistributedState getDistributedState()
+   {
+      return dsManager;
+   }
 
-   public void setMultiplexerStack(String name)
+   public void setDistributedState(DistributedState state)
    {
-      stackName = name;
+      this.dsManager = state;
    }
+   
+   public TreeCacheMBean getTreeCache()
+   {
+      return cache;
+   }
+   
+   public void setTreeCache(TreeCacheMBean cache)
+   {
+      this.cache = cache;
+      if (cache.getMultiplexerService() == null)
+         throw new IllegalArgumentException("Cache not configured for a multiplexer");
+      try
+      {
+         multiplexerObjectName = new ObjectName(cache.getMultiplexerService());
+      }
+      catch (MalformedObjectNameException e)
+      {
+         throw new IllegalArgumentException("Cache's MultiplexerService is invalid", e);
+      }
+      this.stackName = cache.getMultiplexerStack();
+   }
 
    // ServiceMBeanSupport overrides ---------------------------------------------------
 
@@ -309,18 +275,10 @@
    protected void createService()
       throws Exception
    {
-      if (multiplexer != null)
-      {
-         log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
+      log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
                " using stack " + getMultiplexerStack());
-         channel = (JChannel)multiplexer.createMultiplexerChannel(getMultiplexerStack(), getPartitionName());
-      }
-      else
-      {
-         log.debug("Creating JGroups JChannel for partition " + getPartitionName());
-         channel = new org.jgroups.JChannel(jgProps);
-      }      
-
+      channel = (JChannel) getMultiplexer().createMultiplexerChannel(getMultiplexerStack(), getPartitionName());
+      
       if(use_debugger && debugger == null)
       {
          debugger=new Debugger(channel);
@@ -329,10 +287,8 @@
       channel.setOpt(Channel.AUTO_RECONNECT, new Boolean(true));
       channel.setOpt(Channel.AUTO_GETSTATE, new Boolean(true));
 
-      log.debug("Creating ClusterPartition");
-      partition = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer());
-      partition.setStateTransferTimeout(this.state_transfer_timeout);
-      partition.setMethodCallTimeout(this.method_call_timeout);
+      log.debug("Creating HAPartition");
+      partition = createPartition();
       
       // JBAS-2769 Init partition in create
       log.debug("Initializing ClusterPartition: " + partition);
@@ -367,8 +323,6 @@
          staticNodeName.put("additional_data", this.nodeName.getBytes());
          this.channel.down(new Event(Event.CONFIG, staticNodeName));
       }
-      // this.channel.getProtocolStack().flushEvents(); // temporary fix for JG bug (808170) TODO: REMOVE ONCE JGROUPS IS FIXED
-
       channel.connect(partitionName);
       
       partition.startPartition();
@@ -392,6 +346,21 @@
        partition.destroyPartition();
       log.debug("Destroyed ClusterPartition: " + partitionName);
    }
+   
+   // --------------------------------------------------- Protected Methods
+   
+   /**
+    * Extension point meant for test cases; instantiates the HAPartitionImpl.
+    * Test cases can instantiate their own subclass of HAPartitionImpl.
+    */
+   protected HAPartitionImpl createPartition() throws Exception
+   {
+      HAPartitionImpl result = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer());
+      result.setStateTransferTimeout(this.state_transfer_timeout);
+      result.setMethodCallTimeout(this.method_call_timeout);
+      result.setDistributedState(dsManager);
+      return result;
+   }
 
    protected String generateUniqueNodeName () throws Exception
    {
@@ -456,6 +425,16 @@
       return hostIP + ":" + uid;
    }
 
+   protected JChannel getChannel()
+   {
+      return channel;
+   }
+
+   protected Debugger getDebugger()
+   {
+      return debugger;
+   }
+
    public String showHistory ()
    {
       StringBuffer buff = new StringBuffer();

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartitionMBean.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartitionMBean.java	2006-08-18 23:02:09 UTC (rev 56086)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartitionMBean.java	2006-08-18 23:03:05 UTC (rev 56087)
@@ -26,10 +26,11 @@
 
 import javax.management.ObjectName;
 
+import org.jboss.cache.TreeCacheMBean;
+import org.jboss.ha.framework.interfaces.DistributedState;
 import org.jboss.ha.framework.interfaces.HAPartition;
 import org.jboss.mx.util.ObjectNameFactory;
 import org.jgroups.jmx.JChannelFactoryMBean;
-import org.w3c.dom.Element;
 
 /** 
  *   Management Bean for Cluster HAPartitions.  It will start a JGroups
@@ -55,26 +56,6 @@
    void setPartitionName(String newName);
 
    /**
-    * Get JGroups property string a la JDBC
-    * see <a href="http://www.jgroups.com/">JGroups web site for more information</a>
-    */
-   String getPartitionProperties(); // i.e. JGroups properties
-   void setPartitionProperties(String newProps);
-
-   /** A write-only attribute that allows for an xml specification of the 
-    *PartitionProperties string. For example, a string like:
-    UDP(mcast_addr=228.1.2.3):PING(timeout=2000):MERGE2(min_interval=5000;max_interval=10000):FD"
-    * would be specified in xml as:
-    <JGProps>
-    <UDP mcast_addr="228.1.2.3" />
-    <PING timeout="2000" />
-    <MERGE2 min_interval="5000" max_interval="10000" />
-    <FD />
-    </JGProps>
-    */
-   void setPartitionConfig(Element config);
-
-   /**
     * Uniquely identifies this node. MUST be unique accros the whole cluster!
     * Cannot be changed once the partition has been started (otherwise an exception is thrown)
     */
@@ -149,18 +130,43 @@
    HAPartition getHAPartition ();   
    
    /**
+    * Access to the DistributedState implementation
+    */
+   DistributedState getDistributedState();
+   
+   /**
+    * Sets the DistributedState used by this partition.
+    */
+   void setDistributedState(DistributedState state);
+   
+   /**
+    * Gets the TreeCache used by this partition for state management.
+    */
+   TreeCacheMBean getTreeCache();
+   
+   /**
+    * Sets the TreeCache used by this partition for state management.
+    * 
+    * <strong>NOTE:</strong> The cache must be configured to use a JGroups
+    * multiplexer channel.
+    * @param cache the cache
+    * 
+    * @throws IllegalArgumentException if {@TreeCacheMBean#getMultiplexer
+    * @throws NullPointerException if cache is <code>null</code>
+    */
+   void setTreeCache(TreeCacheMBean cache);
+   
+   /**
     * Injects JGroups multiplexer channel into ClusterPartition.
     * This attribute is optional; if not provided, a default JGroups JChannel will be used
     */
    JChannelFactoryMBean getMultiplexer();
-   void setMultiplexer(JChannelFactoryMBean factory);
    
    /**
     * Used with multiplexer injection, specifies stack to be used (e.g., fc-fast-minimalthreads) 
     * This attribute is optional; if not provided, a default multiplexer stack will be used.
     */
    String getMultiplexerStack();
-   void setMultiplexerStack(String stackName);
 
    /** Return the list of member nodes that built from the current view
     * @return A Vector Strings representing the host:port values of the nodes




More information about the jboss-cvs-commits mailing list