[jboss-cvs] JBossAS SVN: r58577 - trunk/cluster/src/main/org/jboss/ha/framework/server

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Nov 18 06:37:03 EST 2006


Author: bstansberry at jboss.com
Date: 2006-11-18 06:37:00 -0500 (Sat, 18 Nov 2006)
New Revision: 58577

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
Log:
Combine ClusterPartition and HAPartitionImpl

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2006-11-18 11:36:36 UTC (rev 58576)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2006-11-18 11:37:00 UTC (rev 58577)
@@ -21,428 +21,1266 @@
   */
 package org.jboss.ha.framework.server;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.net.InetAddress;
 import java.rmi.dgc.VMID;
 import java.rmi.server.UID;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Vector;
 
-import javax.management.Attribute;
-import javax.management.AttributeList;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NameNotFoundException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
 
-import org.jboss.cache.TreeCacheMBean;
+import org.jboss.cache.Cache;
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
 import org.jboss.ha.framework.interfaces.DistributedState;
 import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.mx.util.MBeanProxyExt;
-import org.jboss.naming.NamingServiceMBean;
-import org.jboss.system.ServiceMBean;
+import org.jboss.invocation.MarshalledValueInputStream;
+import org.jboss.invocation.MarshalledValueOutputStream;
+import org.jboss.logging.Logger;
+import org.jboss.naming.NonSerializableFactory;
 import org.jboss.system.ServiceMBeanSupport;
 import org.jboss.system.server.ServerConfigUtil;
 import org.jgroups.Channel;
 import org.jgroups.Event;
+import org.jgroups.ExtendedMessageListener;
+import org.jgroups.JChannel;
+import org.jgroups.MembershipListener;
+import org.jgroups.MergeView;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
 import org.jgroups.Version;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.blocks.RpcDispatcher;
 import org.jgroups.debug.Debugger;
-import org.jgroups.JChannel;
 import org.jgroups.jmx.JChannelFactoryMBean;
 import org.jgroups.stack.IpAddress;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
 
 /**
- *   Management Bean for Cluster HAPartitions.  It will start a JGroups
- *   channel and initialize the ReplicantManager and DistributedStateService.
+ * {@link HAPartition} implementation based on a 
+ * <a href="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code> 
+ * and a multiplexed <code>JChannel</code>.
  *
- *   @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
- *   @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
- *   @version $Revision$
+ * @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
+ * @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
+ * @author Scott.Stark at jboss.org
+ * @author brian.stansberry at jboss.com
+ * @version $Revision$
  */
 public class ClusterPartition
    extends ServiceMBeanSupport
-   implements ClusterPartitionMBean
+   implements MembershipListener, HAPartition, 
+              AsynchEventHandler.AsynchEventProcessor,
+              ClusterPartitionMBean
 {
+   private static final byte NULL_VALUE   = 0;
+   private static final byte SERIALIZABLE_VALUE = 1;
+   // TODO add Streamable support
+   // private static final byte STREAMABLE_VALUE = 2;
+   
+   /**
+    * Returned when an RPC call arrives for a service that isn't registered.
+    */
+   private static class NoHandlerForRPC implements Serializable
+   {
+      static final long serialVersionUID = -1263095408483622838L;
+   }
+   
+   private static class StateStreamEnd implements Serializable
+   {
+      /** The serialVersionUID */
+      private static final long serialVersionUID = -3705345735451504946L;      
+   }
+
    // Constants -----------------------------------------------------
 
+   // final MethodLookup method_lookup_clos = new MethodLookupClos();
+
    // Attributes ----------------------------------------------------
 
-   private   TreeCacheMBean cache;
-   private   ObjectName multiplexerObjectName;
-   private   JChannelFactoryMBean multiplexer;
-   private   String stackName;
-   private   String partitionName = ServerConfigUtil.getDefaultPartitionName();
-   private   DistributedState dsManager;
-   private   HAPartitionImpl partition;
-   private   boolean deadlock_detection = false;
-   private   boolean allow_sync_events = false;
-   private   JChannel channel;
-   private   Debugger debugger=null;
-   private   boolean use_debugger=false;
+   protected ClusterPartitionConfig config;
+   protected HashMap rpcHandlers = new HashMap();
+   protected HashMap stateHandlers = new HashMap();
+   /** Do we send any membership change notifications synchronously? */
+   protected boolean allowSyncListeners = false;
+   /** The HAMembershipListener and HAMembershipExtendedListeners */
+   protected ArrayList synchListeners = new ArrayList();
+   /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
+   protected ArrayList asynchListeners = new ArrayList();
+   /** The handler used to send membership change notifications asynchronously */
+   protected AsynchEventHandler asynchHandler;
+   /** The current cluster partition members */
+   protected Vector members = null;
+   protected Vector jgmembers = null;
 
-   private   String nodeName = null;
-   private   InetAddress nodeAddress = null;
+   public Vector history = null;
 
-   /** Number of milliseconds to wait until state has been transferred. Increase this value for large states
-    * 0 = wait forever
+   /** The partition members other than this node */
+   protected Vector otherMembers = null;
+   protected Vector jgotherMembers = null;
+   /** the local JG IP Address */
+   protected org.jgroups.stack.IpAddress localJGAddress = null;
+   /** The cluster transport protocol address string */
+   protected String nodeName;
+   /** me as a ClusterNode */
+   protected ClusterNode me = null;
+   /** The JGroups partition channel */
+   protected JChannel channel;
+   /** The cluster replicant manager */
+   protected DistributedReplicantManager replicantManager;
+   /** The cluster instance log category */
+   protected Logger log;
+   protected Logger clusterLifeCycleLog;   
+   /** The current cluster view id */
+   protected long currentViewId = -1;
+   
+   private RpcDispatcher dispatcher = null;
+
+   /**
+    * True if serviceState was initialized during start-up.
     */
-   private   long state_transfer_timeout=60000;
+   protected boolean isStateSet = false;
 
+   /**
+    * An exception occuring upon fetch serviceState.
+    */
+   protected Exception setStateException;
+   private final Object stateLock = new Object();
+   private final MessageListenerAdapter messageListener = new MessageListenerAdapter();
+   private Debugger debugger;
+   private boolean selfCreatedDRM;
 
-   private   long method_call_timeout=60000;
-
    // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // ClusterPartitionMBean implementation ----------------------------------------------
-
-   public String getPartitionName()
+   
+   /**
+    * Creates an object from a byte buffer
+    */
+   public static Object objectFromByteBuffer (byte[] buffer) throws Exception
    {
-      return partitionName;
-   }
+      if(buffer == null) 
+         return null;
 
-   public void setPartitionName(String newName)
-   {
-      partitionName = newName;
+      ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+      MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
+      return mvis.readObject();
    }
-
+   
    /**
-    * Uniquely identifies this node. MUST be unique accros the whole cluster!
-    * Cannot be changed once the partition has been started
+    * Serializes an object into a byte buffer.
+    * The object has to implement interface Serializable or Externalizable
     */
-   public String getNodeName()
+   public static byte[] objectToByteBuffer (Object obj) throws Exception
    {
-      return this.nodeName;
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
+      mvos.writeObject(obj);
+      mvos.flush();
+      return baos.toByteArray();
    }
 
-   public void setNodeName(String node) throws Exception
+   private static JChannel createMuxChannel(ClusterPartitionConfig config)
    {
-      if (this.getState() == ServiceMBean.CREATED ||
-          this.getState() == ServiceMBean.STARTED ||
-          this.getState() == ServiceMBean.STARTING)
+      JChannelFactoryMBean factory = config.getMultiplexer();
+      if (factory == null)
+         throw new IllegalStateException("HAPartitionConfig has no JChannelFactory");
+      String stack = config.getMultiplexerStack();
+      if (stack == null)
+         throw new IllegalStateException("HAPartitionConfig has no multiplexer stack");
+      try
       {
-         throw new Exception ("Node name cannot be changed once the partition has been started");
+         return (JChannel) factory.createMultiplexerChannel(stack, config.getMultiplexerStack());
       }
-      else
+      catch (RuntimeException e)
       {
-         this.nodeName = node;
+         throw e;
       }
+      catch (Exception e)
+      {
+         throw new RuntimeException("Failure creatig multiplexed Channel", e);
+      }
    }
 
-   public InetAddress getNodeAddress()
+    // Constructors --------------------------------------------------
+   
+   public ClusterPartition(ClusterPartitionConfig config)
    {
-      return nodeAddress;
+      if (config == null)
+         throw new IllegalArgumentException("config cannot be null");
+      
+      this.config = config;
+      setupLoggers(config.getPartitionName());
+      this.history = new Vector();
+      logHistory ("Partition object created");      
    }
 
-   public void setNodeAddress(InetAddress address)
+   // ------------------------------------------------------------ ServiceMBean
+   
+   // ----------------------------------------------------------------- Service
+   
+   protected void createService() throws Exception
    {
-      this.nodeAddress = address;
+      if (config == null)
+         throw new IllegalArgumentException("config cannot be null");
+      log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
+            " using stack " + getMultiplexerStack());
+      
+      channel = createMuxChannel(config);
+      
+      channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
+      channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
+      
+      log.info("Initializing partition " + getPartitionName());
+      logHistory ("Initializing partition " + getPartitionName());
+      
+      dispatcher = new RpcHandler(channel, null, null, new Object(), config.getDeadlockDetection());
+      
+      // Subscribe to events generated by the org.jgroups. protocol stack
+      log.debug("setMembershipListener");
+      dispatcher.setMembershipListener(this);
+      log.debug("setMessageListener");
+      dispatcher.setMessageListener(messageListener);
+      dispatcher.setMarshaller(new MarshallerImpl());
+      
+      // FIXME remove once @JMX issues are sorted
+      if (replicantManager == null)
+      {
+         // Create the DRM and link it to this HAPartition
+         log.debug("create replicant manager");
+         DistributedReplicantManagerImpl drm = new DistributedReplicantManagerImpl(this);
+         if (server != null)
+            drm.registerWithJmx(server);
+         log.debug("create replicant manager");
+         drm.create();
+         setDistributedReplicantManager(drm);
+         this.selfCreatedDRM = true;
+      }
+      
+//      // Create the DS and link it to this HAPartition
+//      log.debug("create distributed serviceState service");
+//      this.dsManager = new DistributedStateImpl(this, this.server);
+//      log.debug("init distributed serviceState service");
+//      this.dsManager.init();
+
+      // Create the asynchronous handler for view changes
+      asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
+      
+      log.debug("done initializing partition");
    }
+   
+   protected void startService() throws Exception
+   {
+      logHistory ("Starting partition");
+      
+      // Store our uniqueId in the channel
+      configureUniqueId();
+      
+      channel.connect(getPartitionName());
+      
+      try
+      {
+         // get current JG group properties
+         
+         log.debug("get nodeName");
+         this.localJGAddress = (IpAddress)channel.getLocalAddress();
+         this.me = new ClusterNode(this.localJGAddress);
+         this.nodeName = this.me.getName();
 
-   public String getJGroupsVersion()
-   {
-      return Version.description + "( " + Version.cvs + ")";
+         // FIXME -- just block waiting for viewAccepted!
+         log.debug("Get current members");
+         View view = channel.getView();
+         this.jgmembers = (Vector)view.getMembers().clone();
+         this.members = translateAddresses(this.jgmembers); // TRANSLATE
+         log.info("Number of cluster members: " + members.size());
+         for(int m = 0; m > members.size(); m ++)
+         {
+            Object node = members.get(m);
+            log.debug(node);
+         }
+         // Keep a list of other members only for "exclude-self" RPC calls
+         
+         this.jgotherMembers = (Vector)view.getMembers().clone();
+         this.jgotherMembers.remove (channel.getLocalAddress());
+         this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
+         log.info ("Other members: " + this.otherMembers.size ());
+
+         verifyNodeIsUnique(view.getMembers());
+
+         // Update the initial view id
+         this.currentViewId = view.getVid().getId();
+
+         // We must now synchronize new serviceState transfer subscriber
+         //
+         fetchState();
+         
+         if (selfCreatedDRM)
+         {
+            // We are now able to start our DRM and DS
+            ((DistributedReplicantManagerImpl) this.replicantManager).start();
+         }
+         
+         // Start the asynch listener handler thread
+         asynchHandler.start();
+         
+         // Bind ourself in the public JNDI space
+         Context ctx = new InitialContext();
+         this.bind("/HAPartition/" + getPartitionName(), this, ClusterPartition.class, ctx);
+         log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+      }
+      catch (Throwable t)
+      {
+         log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
+         channel.disconnect();
+         throw (t instanceof Exception) ? (Exception) t : new RuntimeException(t);
+      }
+      
    }
 
-   public long getStateTransferTimeout()
+   protected void stopService() throws Exception
    {
-      return state_transfer_timeout;
+      logHistory ("Stopping partition");
+      log.info("Stopping partition " + getPartitionName());
+
+      stopChannelDebugger();
+      
+      try
+      {
+         asynchHandler.stop();
+      }
+      catch( Exception e)
+      {
+         log.warn("Failed to stop asynchHandler", e);
+      }
+
+      
+      if (selfCreatedDRM)
+      {
+         // Stop the DRM service
+         // TODO remove when DRM is independent
+         try
+         {
+            ((DistributedReplicantManagerImpl) this.replicantManager).stop();
+         }
+         catch (Exception e)
+         {
+            log.error("operation failed", e);
+         }
+      }      
+
+//    NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
+//    add the destroyPartition() step
+      try
+      {
+         channel.disconnect();
+      }
+      catch (Exception e)
+      {
+         log.error("operation failed", e);
+      }
+
+     String boundName = "/HAPartition/" + getPartitionName();
+
+     InitialContext ctx = new InitialContext();
+     try
+     {
+        ctx.unbind(boundName);
+     }
+     finally
+     {
+        ctx.close();
+     }
+
+     NonSerializableFactory.unbind (boundName);
+
+     log.info("Partition " + getPartitionName() + " stopped.");
    }
+   
+   protected void destroyService()  throws Exception
+   {
+      log.debug("Destroying HAPartition: " + getPartitionName());
+      
+      if (selfCreatedDRM)
+      {
+         try
+         {
+            if (server != null)
+               ((DistributedReplicantManagerImpl) replicantManager).unregisterWithJmx(server);
+            ((DistributedReplicantManagerImpl) replicantManager).destroy();
+         }
+         catch (Exception e)
+         {
+            log.error("Destroying DRM failed", e);
+         }      
+      }
+      try
+      {
+         channel.close();
+      }
+      catch (Exception e)
+      {
+         log.error("Closing channel failed", e);
+      }
 
-   public void setStateTransferTimeout(long timeout)
+      log.info("Partition " + getPartitionName() + " destroyed.");
+   }
+   
+   // ---------------------------------------------------------- State Transfer 
+
+
+   protected void fetchState() throws Exception
    {
-      this.state_transfer_timeout=timeout;
+      log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() + 
+            " milliseconds):");
+      long start, stop;
+      isStateSet = false;
+      start = System.currentTimeMillis();
+      boolean rc = channel.getState(null, getStateTransferTimeout());
+      if (rc)
+      {
+         synchronized (stateLock)
+         {
+            while (!isStateSet)
+            {
+               if (setStateException != null)
+                  throw setStateException;
+
+               try
+               {
+                  stateLock.wait();
+               }
+               catch (InterruptedException iex)
+               {
+               }
+            }
+         }
+         stop = System.currentTimeMillis();
+         log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
+      }
+      else
+      {
+         // No one provided us with serviceState.
+         // We need to find out if we are the coordinator, so we must
+         // block until viewAccepted() is called at least once
+
+         synchronized (members)
+         {
+            while (members.size() == 0)
+            {
+               log.debug("waiting on viewAccepted()");
+               try
+               {
+                  members.wait();
+               }
+               catch (InterruptedException iex)
+               {
+               }
+            }
+         }
+
+         if (isCurrentNodeCoordinator())
+         {
+            log.info("State could not be retrieved (we are the first member in group)");
+         }
+         else
+         {
+            throw new IllegalStateException("Initial serviceState transfer failed: " +
+               "Channel.getState() returned false");
+         }
+      }
    }
 
-   public long getMethodCallTimeout() {
-      return method_call_timeout;
+   private void getStateInternal(OutputStream stream) throws IOException
+   {
+      MarshalledValueOutputStream mvos = null; // don't create until we know we need it
+      
+      for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
+      {
+         Map.Entry entry = (Map.Entry)keys.next();
+         HAPartition.HAPartitionStateTransfer subscriber = 
+            (HAPartition.HAPartitionStateTransfer) entry.getValue();
+         log.debug("getState for " + entry.getKey());
+         Object state = subscriber.getCurrentState();
+         if (state != null)
+         {
+            if (mvos == null)
+            {
+               // This is our first write, so need to write the header first
+               stream.write(SERIALIZABLE_VALUE);
+               
+               mvos = new MarshalledValueOutputStream(stream);
+            }
+            
+            mvos.writeObject(entry.getKey());
+            mvos.writeObject(state);
+         }
+      }
+      
+      if (mvos == null)
+      {
+         // We never wrote any serviceState, so write the NULL header
+         stream.write(NULL_VALUE);
+      }
+      else
+      {
+         mvos.writeObject(new StateStreamEnd());
+      }
+      
    }
+   
+   private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
+   {
+      byte type = (byte) stream.read();
+         
+      if (type == NULL_VALUE)
+      {
+         log.debug("serviceState is null");
+         return;
+      }
+      
+      long used_mem_before, used_mem_after;
+      Runtime rt=Runtime.getRuntime();
+      used_mem_before=rt.totalMemory() - rt.freeMemory();
+      
+      MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
+      
+      while (true)
+      {
+         Object obj = mvis.readObject(); 
+         if (obj instanceof StateStreamEnd)
+            break;
+         
+         String key = (String) obj;
+         log.debug("setState for " + key);
+         Object someState = mvis.readObject();
+         HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
+         if (subscriber != null)
+         {
+            try
+            {
+               subscriber.setCurrentState((Serializable)someState);
+            }
+            catch (Exception e)
+            {
+               // Don't let issues with one subscriber affect others
+               // unless it is DRM, which is really an internal function
+               // of the HAPartition
+               // FIXME remove this once DRM is JBC-based
+               if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
+               {
+                  if (e instanceof RuntimeException)
+                     throw (RuntimeException) e;
+                  else
+                     throw new RuntimeException(e);
+               }
+               else
+               {
+                  log.error("Caught exception setting serviceState to " + subscriber, e);
+               }
+            }
+         }
+         else
+         {
+            log.debug("There is no stateHandler for: " + key);
+         }      
+      }
 
-   public void setMethodCallTimeout(long timeout) {
-      this.method_call_timeout=timeout;
+      used_mem_after=rt.totalMemory() - rt.freeMemory();
+      log.debug("received serviceState; expanded memory by " +
+            (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
+            ", used memory after: " + used_mem_after + ")");
    }
 
-//   public boolean getChannelDebugger()
-//   {
-//      return this.use_debugger;
-//   }
-//
-//   public void setChannelDebugger(boolean flag)
-//   {
-//      this.use_debugger=flag;
-//   }
-
-   public boolean getDeadlockDetection()
+   private void recordSetStateFailure(Throwable t)
    {
-      return deadlock_detection;
+      log.error("failed setting serviceState", t);
+      if (t instanceof Exception)
+         setStateException = (Exception) t;
+      else
+         setStateException = new Exception(t);
    }
 
-   public void setDeadlockDetection(boolean doit)
+   private void notifyStateTransferCompleted()
    {
-      deadlock_detection = doit;
+      synchronized (stateLock)
+      {
+         // Notify wait that serviceState has been set.
+         stateLock.notifyAll();
+      }
    }
+   
+   // org.jgroups.MembershipListener implementation ----------------------------------------------
+   
+   public void suspect(org.jgroups.Address suspected_mbr)
+   {      
+      logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
+      if (isCurrentNodeCoordinator ())
+         clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
+      else
+         log.info("Suspected member: " + suspected_mbr);
+   }
 
-   public boolean getAllowSynchronousMembershipNotifications()
+   public void block() {}
+   
+   /** Notification of a cluster view change. This is done from the JG protocol
+    * handlder thread and we must be careful to not unduly block this thread.
+    * Because of this there are two types of listeners, synchronous and
+    * asynchronous. The synchronous listeners are messaged with the view change
+    * event using the calling thread while the asynchronous listeners are
+    * messaged using a seperate thread.
+    *
+    * @param newView
+    */
+   public void viewAccepted(View newView)
    {
-      return allow_sync_events;
+      try
+      {
+         // we update the view id
+         //
+         this.currentViewId = newView.getVid().getId();
+
+         // Keep a list of other members only for "exclude-self" RPC calls
+         //
+         this.jgotherMembers = (Vector)newView.getMembers().clone();
+         this.jgotherMembers.remove (channel.getLocalAddress());
+         this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
+         Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
+         logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
+                     " (old view: " + this.members + " )");
+
+
+         // Save the previous view and make a copy of the new view
+         Vector oldMembers = this.members;
+
+         Vector newjgMembers = (Vector)newView.getMembers().clone();
+         Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
+         if (this.members == null)
+         {
+            // Initial viewAccepted
+            this.members = newMembers;
+            this.jgmembers = newjgMembers;
+            log.debug("ViewAccepted: initial members set");
+            return;
+         }
+         this.members = newMembers;
+         this.jgmembers = newjgMembers;
+
+         int difference = 0;
+         if (oldMembers == null)
+            difference = newMembers.size () - 1;
+         else
+            difference = newMembers.size () - oldMembers.size ();
+         
+         if (isCurrentNodeCoordinator ())
+            clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
+                                      this.currentViewId + ", delta: " + difference + ") : " + this.members);
+         else
+            log.info("New cluster view for partition " + getPartitionName() + ": " +
+                     this.currentViewId + " (" + this.members + " delta: " + difference + ")");
+
+         // Build a ViewChangeEvent for the asynch listeners
+         ViewChangeEvent event = new ViewChangeEvent();
+         event.viewId = currentViewId;
+         event.allMembers = translatedNewView;
+         event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
+         event.newMembers = getNewMembers(oldMembers, event.allMembers);
+         event.originatingGroups = null;
+         // if the new view occurs because of a merge, we first inform listeners of the merge
+         if(newView instanceof MergeView)
+         {
+            MergeView mergeView = (MergeView) newView;
+            event.originatingGroups = mergeView.getSubgroups();
+         }
+
+         log.debug("membership changed from " + this.members.size() + " to "
+            + event.allMembers.size());
+         // Put the view change to the asynch queue
+         this.asynchHandler.queueEvent(event);
+
+         // Broadcast the new view to the synchronous view change listeners
+         if (this.allowSyncListeners)
+         {
+            this.notifyListeners(synchListeners, event.viewId, event.allMembers,
+                  event.deadMembers, event.newMembers, event.originatingGroups);
+         }
+      }
+      catch (Exception ex)
+      {
+         log.error("ViewAccepted failed", ex);
+      }
    }
 
-   public void setAllowSynchronousMembershipNotifications(boolean allowSync)
+   // HAPartition implementation ----------------------------------------------
+   
+   public String getNodeName()
    {
-      this.allow_sync_events = allowSync;
+      return nodeName;
    }
+   
+   public String getPartitionName()
+   {
+      return (config == null ? null : config.getPartitionName());
+   }
+   
+   public DistributedReplicantManager getDistributedReplicantManager()
+   {
+      return replicantManager;
+   }
+   
+   public DistributedState getDistributedStateService()
+   {
+      return config.getDistributedState();
+   }
 
-   protected ObjectName getObjectName(MBeanServer server, ObjectName name)
-      throws MalformedObjectNameException
+   public long getCurrentViewId()
    {
-      return name == null ? OBJECT_NAME : name;
+      return this.currentViewId;
    }
+   
+   public Vector getCurrentView()
+   {
+      Vector result = new Vector (this.members.size());
+      for (int i = 0; i < members.size(); i++)
+      {
+         result.add( ((ClusterNode) members.elementAt(i)).getName() );
+      }
+      return result;
+   }
 
-   public HAPartition getHAPartition ()
+   public ClusterNode[] getClusterNodes ()
    {
-      return this.partition;
+      ClusterNode[] nodes = new ClusterNode[this.members.size()];
+      this.members.toArray(nodes);
+      return nodes;
    }
 
-   /** Return the list of member nodes that built from the current view
-    * @return A Vector Strings representing the host:port values of the nodes
-    */
-   public Vector getCurrentView()
+   public ClusterNode getClusterNode ()
    {
-      return partition.getCurrentView();
+      return me;
    }
 
-   public JChannelFactoryMBean getMultiplexer()
+   public boolean isCurrentNodeCoordinator ()
    {
-      if (multiplexer == null && cache != null && server != null)
-      {
-         multiplexer = (JChannelFactoryMBean) MBeanProxyExt.create(JChannelFactoryMBean.class,
-                                                                   multiplexerObjectName);
-      }
-      return multiplexer;
+      if(this.members == null || this.members.size() == 0 || this.me == null)
+         return false;
+     return this.members.elementAt (0).equals (this.me);
    }
 
-   public String getMultiplexerStack()
+   // ***************************
+   // ***************************
+   // RPC multicast communication
+   // ***************************
+   // ***************************
+   //
+   public void registerRPCHandler(String objName, Object subscriber)
    {
-      return stackName;
+      rpcHandlers.put(objName, subscriber);
    }
    
-   public DistributedState getDistributedState()
+   public void unregisterRPCHandler(String objName, Object subscriber)
    {
-      return dsManager;
+      rpcHandlers.remove(objName);
    }
+      
 
-   public void setDistributedState(DistributedState state)
+   /**
+    *
+    * @param objName
+    * @param methodName
+    * @param args
+    * @param excludeSelf
+    * @return
+    * @throws Exception
+    * @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
+    */
+   public ArrayList callMethodOnCluster(String objName, String methodName,
+      Object[] args, boolean excludeSelf) throws Exception
    {
-      this.dsManager = state;
+      return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
    }
-   
-   public TreeCacheMBean getTreeCache()
+
+   /**
+    * This function is an abstraction of RpcDispatcher.
+    */
+   public ArrayList callMethodOnCluster(String objName, String methodName,
+      Object[] args, Class[] types, boolean excludeSelf) throws Exception
    {
-      return cache;
+      return callMethodOnCluster(objName, methodName, args, types, excludeSelf, getMethodCallTimeout());
    }
-   
-   public void setTreeCache(TreeCacheMBean cache)
+
+
+   public ArrayList callMethodOnCluster(String objName, String methodName,
+       Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
    {
-      this.cache = cache;
-      if (cache.getMultiplexerService() == null)
-         throw new IllegalArgumentException("Cache not configured for a multiplexer");
-      try
+      ArrayList rtn = new ArrayList();
+      MethodCall m=null;
+      RspList rsp = null;
+      boolean trace = log.isTraceEnabled();
+
+      if(types != null)
+         m=new MethodCall(objName + "." + methodName, args, types);
+      else
+         m=new MethodCall(objName + "." + methodName, args);
+
+      if (excludeSelf)
       {
-         multiplexerObjectName = new ObjectName(cache.getMultiplexerService());
+         if( trace )
+         {
+            log.trace("callMethodOnCluster(true), objName="+objName
+               +", methodName="+methodName+", members="+jgotherMembers);
+         }
+         rsp = dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
       }
-      catch (MalformedObjectNameException e)
+      else
       {
-         throw new IllegalArgumentException("Cache's MultiplexerService is invalid", e);
+         if( trace )
+         {
+            log.trace("callMethodOnCluster(false), objName="+objName
+               +", methodName="+methodName+", members="+members);
+         }
+         rsp = dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
       }
-      this.stackName = cache.getMultiplexerStack();
-   }
 
-   // ServiceMBeanSupport overrides ---------------------------------------------------
+      if (rsp != null)
+      {
+         for (int i = 0; i < rsp.size(); i++)
+         {
+            Object item = rsp.elementAt(i);
+            if (item instanceof Rsp)
+            {
+               Rsp response = (Rsp) item;
+               // Only include received responses
+               boolean wasReceived = response.wasReceived();
+               if( wasReceived == true )
+               {
+                  item = response.getValue();
+                  if (!(item instanceof NoHandlerForRPC))
+                     rtn.add(item);
+               }
+               else if( trace )
+                  log.trace("Ignoring non-received response: "+response);
+            }
+            else
+            {
+               if (!(item instanceof NoHandlerForRPC))
+                  rtn.add(item);
+               else if( trace )
+                  log.trace("Ignoring NoHandlerForRPC");
+            }
+         }
+      }
 
-   public String getName()
+      return rtn;
+    }
+
+   /**
+    * Calls method on Cluster coordinator node only.  The cluster coordinator node is the first node to join the
+    * cluster.
+    * and is replaced
+    * @param objName
+    * @param methodName
+    * @param args
+    * @param types
+    * @param excludeSelf
+    * @return
+    * @throws Exception
+    */
+   public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
+          Object[] args, Class[] types,boolean excludeSelf) throws Exception
    {
-      return partitionName;
+      return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, getMethodCallTimeout());
    }
 
+   /**
+    * Calls method on Cluster coordinator node only.  The cluster coordinator node is the first node to join the
+    * cluster.
+    * and is replaced
+    * @param objName
+    * @param methodName
+    * @param args
+    * @param types
+    * @param excludeSelf
+    * @param methodTimeout
+    * @return
+    * @throws Exception
+    */
+   public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
+          Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
+   {
+      ArrayList rtn = new ArrayList();
+      MethodCall m=null;
+      RspList rsp = null;
+      boolean trace = log.isTraceEnabled();
 
-   protected void createService()
-      throws Exception
-   {
-      log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
-               " using stack " + getMultiplexerStack());
-      channel = (JChannel) getMultiplexer().createMultiplexerChannel(getMultiplexerStack(), getPartitionName());
-      
-      if(use_debugger && debugger == null)
+      if(types != null)
+         m=new MethodCall(objName + "." + methodName, args, types);
+      else
+         m=new MethodCall(objName + "." + methodName, args);
+
+      if( trace )
       {
-         debugger=new Debugger(channel);
-         debugger.start();
+         log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+            +", methodName="+methodName);
       }
-      channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
-      channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
 
-      log.debug("Creating HAPartition");
-      partition = createPartition();
-      
-      // JBAS-2769 Init partition in create
-      log.debug("Initializing ClusterPartition: " + partition);
-      partition.init();
-      log.debug("ClusterPartition initialized");
-   }
+      // the first cluster view member is the coordinator
+      Vector coordinatorOnly = new Vector();
+      // If we are the coordinator, only call ourself if 'excludeSelf' is false
+      if (false == isCurrentNodeCoordinator () ||
+          false == excludeSelf)
+         coordinatorOnly.addElement(this.jgmembers.elementAt (0));
 
-   protected void startService() 
-      throws Exception
-   {        
-      // We push the independent name in the protocol stack before connecting to the cluster
-      boolean pushNodeName = true;
-      if (this.nodeName == null || "".equals(this.nodeName)) {
-         IpAddress ourAddr = (IpAddress) channel.getLocalAddress();
-         if (ourAddr != null)
+      rsp = dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
+
+      if (rsp != null)
+      {
+         for (int i = 0; i < rsp.size(); i++)
          {
-            byte[] additional_data = ourAddr.getAdditionalData();
-            if (additional_data != null)
+            Object item = rsp.elementAt(i);
+            if (item instanceof Rsp)
             {
-               nodeName = new String(additional_data);
-               pushNodeName = false;
+               Rsp response = (Rsp) item;
+               // Only include received responses
+               boolean wasReceived = response.wasReceived();
+               if( wasReceived == true )
+               {
+                  item = response.getValue();
+                  if (!(item instanceof NoHandlerForRPC))
+                     rtn.add(item);
+               }
+               else if( trace )
+                  log.trace("Ignoring non-received response: "+response);
             }
+            else
+            {
+               if (!(item instanceof NoHandlerForRPC))
+                  rtn.add(item);
+               else if( trace )
+                  log.trace("Ignoring NoHandlerForRPC");
+            }
          }
       }
-      if (this.nodeName == null || "".equals(this.nodeName)) {
-         this.nodeName = generateUniqueNodeName();
+
+      return rtn;
+   }
+
+
+   /**
+    *
+    * @param objName
+    * @param methodName
+    * @param args
+    * @param excludeSelf
+    * @throws Exception
+    * @deprecated Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead
+    */
+   public void callAsynchMethodOnCluster(String objName, String methodName,
+      Object[] args, boolean excludeSelf) 
+      throws Exception
+   {
+      callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
+   }
+
+   /**
+    * This function is an abstraction of RpcDispatcher for asynchronous messages
+    */
+   public void callAsynchMethodOnCluster(String objName, String methodName,
+      Object[] args, Class[] types, boolean excludeSelf) throws Exception
+   {
+      MethodCall m = null;
+      boolean trace = log.isTraceEnabled();
+
+      if(types != null)
+         m=new MethodCall(objName + "." + methodName, args, types);
+      else
+         m=new MethodCall(objName + "." + methodName, args);
+
+      if (excludeSelf)
+      {
+         if( trace )
+         {
+            log.trace("callAsynchMethodOnCluster(true), objName="+objName
+               +", methodName="+methodName+", members="+jgotherMembers);
+         }
+         dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
       }
-      
-      if (pushNodeName)
+      else
       {
-         java.util.HashMap staticNodeName = new java.util.HashMap();
-         staticNodeName.put("additional_data", this.nodeName.getBytes());
-         this.channel.down(new Event(Event.CONFIG, staticNodeName));
+         if( trace )
+         {
+            log.trace("callAsynchMethodOnCluster(false), objName="+objName
+               +", methodName="+methodName+", members="+members);
+         }
+         dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
       }
-      channel.connect(partitionName);
-      
-      try
-      {
-         partition.startPartition();
+   }
    
-         log.debug("Started ClusterPartition: " + partitionName);
+   // *************************
+   // *************************
+   // State transfer management
+   // *************************
+   // *************************
+   //      
+   public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
+   {
+      stateHandlers.put(objectName, subscriber);
+   }
+   
+   public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
+   {
+      stateHandlers.remove(objectName);
+   }
+   
+   // *************************
+   // *************************
+   // Group Membership listeners
+   // *************************
+   // *************************
+   //   
+   public void registerMembershipListener(HAMembershipListener listener)
+   {
+      boolean isAsynch = (this.allowSyncListeners == false) 
+            || (listener instanceof AsynchHAMembershipListener)
+            || (listener instanceof AsynchHAMembershipExtendedListener);
+      if( isAsynch ) {
+         synchronized(this.asynchListeners) {
+            this.asynchListeners.add(listener);
+         }
       }
-      catch (Exception e)
-      {
-         log.debug("Caught exception after channel connected; closing channel -- " + e.getLocalizedMessage());
-         channel.disconnect();
-         throw e;
+      else  { 
+         synchronized(this.synchListeners) {
+            this.synchListeners.add(listener);
+         }
       }
    }
    
-   protected void stopService() throws Exception
+   public void unregisterMembershipListener(HAMembershipListener listener)
    {
-      stopChannelDebugger();
-      log.debug("Stopping ClusterPartition: " + partitionName);
-      partition.closePartition();
-      log.debug("Stopped ClusterPartition: " + partitionName);
+      boolean isAsynch = (this.allowSyncListeners == false) 
+            || (listener instanceof AsynchHAMembershipListener)
+            || (listener instanceof AsynchHAMembershipExtendedListener);
+      if( isAsynch ) {
+         synchronized(this.asynchListeners) {
+            this.asynchListeners.remove(listener);
+         }
+      }
+      else  { 
+         synchronized(this.synchListeners) {
+            this.synchListeners.remove(listener);
+         }
+      }
    }
+   
+   public boolean getAllowSynchronousMembershipNotifications()
+   {
+      return allowSyncListeners;
+   }
 
-   // NR 200505 : [JBCLUSTER-38] close partition just disconnect from channel
-   // destroy close it.
-   protected void destroyService() throws Exception
+   public void setAllowSynchronousMembershipNotifications(boolean allowSync)
+   {      
+      this.allowSyncListeners = allowSync;
+   }
+   
+   // AsynchEventHandler.AsynchEventProcessor -----------------------
+
+   public void processEvent(Object event)
    {
-      log.debug("Destroying ClusterPartition: " + partitionName);
-       partition.destroyPartition();
-      log.debug("Destroyed ClusterPartition: " + partitionName);
+      ViewChangeEvent vce = (ViewChangeEvent) event;
+      notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
+            vce.deadMembers, vce.newMembers, vce.originatingGroups);
+      
    }
    
-   // --------------------------------------------------- Protected Methods
    
-   /**
-    * Extension point meant for test cases; instantiates the HAPartitionImpl.
-    * Test cases can instantiate their own subclass of HAPartitionImpl.
-    */
-   protected HAPartitionImpl createPartition() throws Exception
+   // Public ------------------------------------------------------------------
+   
+   public void setDistributedReplicantManager(DistributedReplicantManager drm)
    {
-      HAPartitionImpl result = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer());
-      result.setStateTransferTimeout(this.state_transfer_timeout);
-      result.setMethodCallTimeout(this.method_call_timeout);
-      result.setDistributedState(dsManager);
-      return result;
+      if (this.replicantManager != null  && !(replicantManager == drm))
+         throw new IllegalStateException("DistributedReplicantManager already set");
+      
+      this.replicantManager = drm;
    }
+   
+   // Protected -----------------------------------------------------
 
-   protected String generateUniqueNodeName () throws Exception
+   protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception
    {
-      // we first try to find a simple meaningful name:
-      // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
-      // 2nd) "local-IP:JMV_GUID" otherwise
-      // 3rd) return a fully GUID-based representation
-      //
-
-      // Before anything we determine the local host IP (and NOT name as this could be
-      // resolved differently by other nodes...)
-
-      // But use the specified node address for multi-homing
-
-      String hostIP = null;
-      InetAddress address = ServerConfigUtil.fixRemoteAddress(nodeAddress);
-      if (address == null)
+      byte[] localUniqueName = this.localJGAddress.getAdditionalData();
+      if (localUniqueName == null)
       {
-         log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)");
-         log.debug ("using a full GUID strategy");
-         return new VMID().toString();
+         log.error("No additional information has been found in the JGroups address; " +
+                  "make sure you are running with a correct version of JGroups and that the protocols " +
+                  "you are using support 'additionalData' behaviour.");
+         throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; local node name is missing.");
       }
-      else
+
+      for (int i = 0; i < javaGroupIpAddresses.size(); i++)
       {
-         hostIP = address.getHostAddress();
+         IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
+         if (!address.equals(this.localJGAddress))
+         {
+            if (localUniqueName.equals(address.getAdditionalData()))
+               throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; another node (" + address + ") publicizing the same name was already there.");
+         }
       }
+   }
 
-      // 1st: is JNDI up and running?
+   /**
+    * Helper method that binds the partition in the JNDI tree.
+    * @param jndiName Name under which the object must be bound
+    * @param who Object to bind in JNDI
+    * @param classType Class type under which should appear the bound object
+    * @param ctx Naming context under which we bind the object
+    * @throws Exception Thrown if a naming exception occurs during binding
+    */   
+   protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
+   {
+      // Ah ! This service isn't serializable, so we use a helper class
       //
-      try
+      NonSerializableFactory.bind(jndiName, who);
+      Name n = ctx.getNameParser("").parse(jndiName);
+      while (n.size () > 1)
       {
-         AttributeList al = this.server.getAttributes(NamingServiceMBean.OBJECT_NAME,
-                                      new String[] {"State", "Port"});
-
-         int status = ((Integer)((Attribute)al.get(0)).getValue()).intValue();
-         if (status == ServiceMBean.STARTED)
+         String ctxName = n.get (0);
+         try
          {
-            // we can proceed with the JNDI trick!
-            int port = ((Integer)((Attribute)al.get(1)).getValue()).intValue();
-            return hostIP + ":" + port;
+            ctx = (Context)ctx.lookup (ctxName);
          }
-         else
+         catch (NameNotFoundException e)
          {
-            log.debug("JNDI has been found but the service wasn't started so we cannot " +
-                      "be entirely sure we are the only one that wants to use this PORT " +
-                      "as a GUID on this host.");
+            log.debug ("creating Subcontext " + ctxName);
+            ctx = ctx.createSubcontext (ctxName);
          }
+         n = n.getSuffix (1);
+      }
 
-      }
-      catch (InstanceNotFoundException e)
+      // The helper class NonSerializableFactory uses address type nns, we go on to
+      // use the helper class to bind the service object in JNDI
+      //
+      StringRefAddr addr = new StringRefAddr("nns", jndiName);
+      Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
+      ctx.rebind (n.get (0), ref);
+   }
+   
+   /**
+    * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
+    * Dead members are old - new members.
+    * @param oldMembers Vector of old members
+    * @param newMembers Vector of new members
+    * @return Vector of members that have died between the two views, can be empty.
+    */   
+   protected Vector getDeadMembers(Vector oldMembers, Vector newMembers)
+   {
+      if(oldMembers == null) oldMembers=new Vector();
+      if(newMembers == null) newMembers=new Vector();
+      Vector dead=(Vector)oldMembers.clone();
+      dead.removeAll(newMembers);
+      log.debug("dead members: " + dead);
+      return dead;
+   }
+   
+   /**
+    * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
+    * @param oldMembers Vector of old members
+    * @param allMembers Vector of new members
+    * @return Vector of members that have joined the partition between the two views
+    */   
+   protected Vector getNewMembers(Vector oldMembers, Vector allMembers)
+   {
+      if(oldMembers == null) oldMembers=new Vector();
+      if(allMembers == null) allMembers=new Vector();
+      Vector newMembers=(Vector)allMembers.clone();
+      newMembers.removeAll(oldMembers);
+      return newMembers;
+   }
+
+   protected void notifyListeners(ArrayList theListeners, long viewID,
+      Vector allMembers, Vector deadMembers, Vector newMembers,
+      Vector originatingGroups)
+   {
+      log.debug("Begin notifyListeners, viewID: "+viewID);
+      synchronized(theListeners)
       {
-         log.debug ("JNDI not running here, cannot use this strategy to find a node GUID for the cluster");
+         // JBAS-3619 -- don't hold synch lock while notifying
+         theListeners = (ArrayList) theListeners.clone();
       }
-      catch (ReflectionException e)
+      
+      for (int i = 0; i < theListeners.size(); i++)
       {
-         log.debug ("JNDI querying has returned an exception, cannot use this strategy to find a node GUID for the cluster");
+         HAMembershipListener aListener = null;
+         try
+         {
+            aListener = (HAMembershipListener) theListeners.get(i);
+            if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
+            {
+               HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
+               exListener.membershipChangedDuringMerge (deadMembers, newMembers,
+                  allMembers, originatingGroups);
+            }
+            else
+            {
+               aListener.membershipChanged(deadMembers, newMembers, allMembers);
+            }
+         }
+         catch (Throwable e)
+         {
+            // a problem in a listener should not prevent other members to receive the new view
+            log.warn("HAMembershipListener callback failure: "+aListener, e);
+         }
       }
-
-      // 2nd: host-GUID strategy
-      //
-      String uid = new UID().toString();
-      return hostIP + ":" + uid;
+      
+      log.debug("End notifyListeners, viewID: "+viewID);
    }
 
-   protected JChannel getChannel()
+   protected Vector translateAddresses (Vector jgAddresses)
    {
-      return channel;
+      if (jgAddresses == null)
+         return null;
+
+      Vector result = new Vector (jgAddresses.size());
+      for (int i = 0; i < jgAddresses.size(); i++)
+      {
+         IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
+         result.add(new ClusterNode (addr));
+      }
+
+      return result;
    }
 
-   protected Debugger getDebugger()
+   public void logHistory (String message)
    {
-      return debugger;
+      try
+      {
+         history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
+      }
+      catch (Exception ignored){}
    }
 
+   // --------------------------------------------------- ClusterPartitionMBean
+   
    public String showHistory ()
    {
       StringBuffer buff = new StringBuffer();
-      Vector data = new Vector (this.partition.history);
+      Vector data = new Vector (this.history);
       for (java.util.Iterator row = data.iterator(); row.hasNext();)
       {
          String info = (String) row.next();
@@ -455,7 +1293,7 @@
    {
       StringBuffer buff = new StringBuffer();
       buff.append("<events>\n");
-      Vector data = new Vector (this.partition.history);
+      Vector data = new Vector (this.history);
       for (java.util.Iterator row = data.iterator(); row.hasNext();)
       {
          buff.append("   <event>\n      ");
@@ -489,4 +1327,402 @@
          debugger=null;
       }
    }
+   
+   public Cache getClusteredCache()
+   {
+      return config.getClusteredCache();
+   }
+
+   public boolean getDeadlockDetection()
+   {
+      return config.getDeadlockDetection();
+   }
+
+   public HAPartition getHAPartition()
+   {
+      return this;
+   }
+
+   public String getJGroupsVersion()
+   {
+      return Version.description + "( " + Version.cvs + ")";
+   }
+
+   public JChannelFactoryMBean getMultiplexer()
+   {
+      return config.getMultiplexer();
+   }
+
+   public String getMultiplexerStack()
+   {
+      return config.getMultiplexerStack();
+   }
+
+   public InetAddress getNodeAddress()
+   {
+      return config.getNodeAddress();
+   }
+
+   public long getStateTransferTimeout() {
+      return config.getStateTransferTimeout();
+   }
+
+   public long getMethodCallTimeout() {
+      return config.getMethodCallTimeout();
+   }
+
+   public void setMethodCallTimeout(long timeout)
+   {
+      config.setMethodCallTimeout(timeout);      
+   }
+
+   public void setStateTransferTimeout(long timeout)
+   {
+      config.setStateTransferTimeout(timeout);
+   }
+
+   public String getNodeUniqueId()
+   {
+      return config.getNodeUniqueId();
+   }
+
+   // Protected --------------------------------------------------------------
+
+   protected void configureUniqueId() throws Exception
+   {
+      // We push the independent name in the protocol stack 
+      // before connecting to the cluster
+      boolean pushNodeName = true;
+      String uniqueId = config.getNodeUniqueId();
+      if (uniqueId == null || "".equals(uniqueId)) {
+         IpAddress ourAddr = (IpAddress) channel.getLocalAddress();
+         if (ourAddr != null)
+         {
+            byte[] additional_data = ourAddr.getAdditionalData();
+            if (additional_data != null)
+            {
+               uniqueId = new String(additional_data);
+               config.setNodeUniqueId(uniqueId);
+               pushNodeName = false;
+            }
+         }
+      }
+      if (uniqueId == null || "".equals(uniqueId)) {
+         uniqueId = generateUniqueId();
+         config.setNodeUniqueId(uniqueId);
+      }
+      
+      if (pushNodeName)
+      {
+         java.util.HashMap staticNodeName = new java.util.HashMap();
+         staticNodeName.put("additional_data", uniqueId.getBytes());
+         this.channel.down(new Event(Event.CONFIG, staticNodeName));
+      }
+      
+      config.setNodeUniqueId(uniqueId);
+   }
+   
+   protected String generateUniqueId() throws Exception
+   {
+      // we first try to find a simple meaningful name:
+      // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
+      // 2nd) "local-IP:JMV_GUID" otherwise
+      // 3rd) return a fully GUID-based representation
+      //
+
+      // Before anything we determine the local host IP (and NOT name as this could be
+      // resolved differently by other nodes...)
+
+      // But use the specified node address for multi-homing
+      
+      String hostIP = null;
+      InetAddress address = ServerConfigUtil.fixRemoteAddress(config.getNodeAddress());
+      if (address == null)
+      {
+         log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)");
+         log.debug ("using a full GUID strategy");
+         return new VMID().toString();
+      }
+      else
+      {
+         hostIP = address.getHostAddress();
+      }
+
+      // 1st: is JNDI up and running?
+      int namingPort = config.getNamingServicePort();
+      if (namingPort > 0)
+      {
+         return hostIP + ":" + namingPort;
+      }
+
+      // 2nd: host-GUID strategy
+      //
+      String uid = new UID().toString();
+      return hostIP + ":" + uid;
+   }
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+
+   private class MessageListenerAdapter
+         implements ExtendedMessageListener
+   {
+      
+      public void getState(OutputStream stream)
+      {
+         logHistory ("getState called on partition");
+         
+         log.debug("getState called.");
+         try
+         {
+            getStateInternal(stream);
+         }
+         catch (Exception ex)
+         {
+            log.error("getState failed", ex);
+         }
+         
+      }
+      
+      public void getState(String state_id, OutputStream ostream)
+      {
+         throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+      }
+
+      public byte[] getState(String state_id)
+      {
+         throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+      }
+      
+      public void setState(InputStream stream)
+      {
+         logHistory ("setState called on partition");
+         try
+         {
+            if (stream == null)
+            {
+               log.debug("transferred serviceState is null (may be first member in cluster)");
+            }
+            else
+            {
+               setStateInternal(stream);
+            }
+            
+            isStateSet = true;
+         }
+         catch (Throwable t)
+         {
+            recordSetStateFailure(t);
+         }
+         finally
+         {
+            notifyStateTransferCompleted();
+         }
+      }
+
+      public byte[] getState()
+      {
+         logHistory ("getState called on partition");
+         
+         log.debug("getState called.");
+         try
+         {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+            getStateInternal(baos);
+            return baos.toByteArray();
+         }
+         catch (Exception ex)
+         {
+            log.error("getState failed", ex);
+         }
+         return null; // This will cause the receiver to get a "false" on the channel.getState() call
+      }
+
+      public void setState(String state_id, byte[] state)
+      {
+         throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+      }
+
+      public void setState(String state_id, InputStream istream)
+      {
+         throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+      }
+
+      public void receive(org.jgroups.Message msg)
+      { /* complete */}
+      
+      public void setState(byte[] obj)
+      {
+         logHistory ("setState called on partition");
+         try
+         {
+            if (obj == null)
+            {
+               log.debug("transferred serviceState is null (may be first member in cluster)");
+            }
+            else
+            {
+               ByteArrayInputStream bais = new ByteArrayInputStream(obj);
+               setStateInternal(bais);
+               bais.close();
+            }
+            
+            isStateSet = true;
+         }
+         catch (Throwable t)
+         {
+            recordSetStateFailure(t);
+         }
+         finally
+         {
+            notifyStateTransferCompleted();
+         }
+      }
+      
+   }
+
+   /** 
+    * A simple data class containing the view change event needed to
+    * notify the HAMembershipListeners
+    */
+   private static class ViewChangeEvent
+   {
+      long viewId;
+      Vector deadMembers;
+      Vector newMembers;
+      Vector allMembers;
+      Vector originatingGroups;
+   }
+   
+   private static class MarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
+   {
+
+      public Object objectFromByteBuffer(byte[] buf) throws Exception
+      {
+         return ClusterPartition.objectFromByteBuffer(buf);
+      }
+
+      public byte[] objectToByteBuffer(Object obj) throws Exception
+      {
+         return ClusterPartition.objectToByteBuffer(obj);
+      }      
+   }
+   
+   /**
+    * Overrides RpcDispatcher.Handle so that we can dispatch to many 
+    * different objects.
+    */
+   private class RpcHandler extends RpcDispatcher
+   {      
+      private RpcHandler(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
+            boolean deadlock_detection)
+      {
+         super(channel, l, l2, server_obj, deadlock_detection);
+      }
+      
+      /**
+       * Analyze the MethodCall contained in <code>req</code> to find the 
+       * registered service object to invoke against, and then execute it 
+       * against *that* object and return result.
+       *
+       * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
+       * @param req The org.jgroups. representation of the method invocation
+       * @return The serializable return value from the invocation
+       */
+      public Object handle(Message req)
+      {
+         Object body = null;
+         Object retval = null;
+         MethodCall  method_call = null;
+         boolean trace = log.isTraceEnabled();
+         
+         if( trace )
+            log.trace("Partition " + getPartitionName() + " received msg");
+         if(req == null || req.getBuffer() == null)
+         {
+            log.warn("message or message buffer is null !");
+            return null;
+         }
+         
+         try
+         {
+            body = objectFromByteBuffer(req.getBuffer());
+         }
+         catch(Exception e)
+         {
+            log.warn("failed unserializing message buffer (msg=" + req + ")", e);
+            return null;
+         }
+         
+         if(body == null || !(body instanceof MethodCall))
+         {
+            log.warn("message does not contain a MethodCall object !");
+            return null;
+         }
+         
+         // get method call informations
+         //
+         method_call = (MethodCall)body;
+         String methodName = method_call.getName();      
+         
+         if( trace )
+            log.trace("pre methodName: " + methodName);
+         
+         int idx = methodName.lastIndexOf('.');
+         String handlerName = methodName.substring(0, idx);
+         String newMethodName = methodName.substring(idx + 1);
+         
+         if( trace ) 
+         {
+            log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
+            log.trace("Handle: " + methodName);
+         }
+         
+         // prepare method call
+         method_call.setName(newMethodName);
+         Object handler = rpcHandlers.get(handlerName);
+         if (handler == null)
+         {
+            if( trace )
+               log.debug("No rpc handler registered under: "+handlerName);
+            return new NoHandlerForRPC();
+         }
+
+         /* Invoke it and just return any exception with trace level logging of
+         the exception. The exception semantics of a group rpc call are weak as
+         the return value may be a normal return value or the exception thrown.
+         */
+         try
+         {
+            retval = method_call.invoke(handler);
+            if( trace )
+               log.trace("rpc call return value: "+retval);
+         }
+         catch (Throwable t)
+         {
+            if( trace )
+               log.trace("rpc call threw exception", t);
+            retval = t;
+         }
+         
+         return retval;
+      }
+      
+   }
+   
+   private void setupLoggers(String partitionName)
+   {
+      if (partitionName == null)
+      {
+         this.log = Logger.getLogger(HAPartition.class.getName());
+         this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle");
+      }
+      else
+      {
+         this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
+         this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
+      }
+   }
+   
 }




More information about the jboss-cvs-commits mailing list