[jboss-cvs] JBossAS SVN: r58567 - trunk/cluster/src/main/org/jboss/ha/framework/server

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Nov 18 06:28:22 EST 2006


Author: bstansberry at jboss.com
Date: 2006-11-18 06:28:21 -0500 (Sat, 18 Nov 2006)
New Revision: 58567

Removed:
   trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Log:
ClusterPartition now directly implements HAPartition

Deleted: trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-11-18 11:27:50 UTC (rev 58566)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-11-18 11:28:21 UTC (rev 58567)
@@ -1,1446 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.ha.framework.server;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Vector;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.Name;
-import javax.naming.NameNotFoundException;
-import javax.naming.Reference;
-import javax.naming.StringRefAddr;
-import javax.management.MBeanServer;
-
-import org.jgroups.ExtendedMessageListener;
-import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
-import org.jgroups.MergeView;
-import org.jgroups.View;
-import org.jgroups.Message;
-import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.stack.IpAddress;
-import org.jgroups.util.Rsp;
-import org.jgroups.util.RspList;
-
-import org.jboss.invocation.MarshalledValueInputStream;
-import org.jboss.invocation.MarshalledValueOutputStream;
-import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
-import org.jboss.ha.framework.interfaces.DistributedState;
-import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.interfaces.ClusterNode;
-
-import org.jboss.naming.NonSerializableFactory;
-import org.jboss.logging.Logger;
-
-/**
- * This class is an abstraction class for a JGroups RPCDispatcher and JChannel.
- * It is a default implementation of HAPartition for the
- * <a href="http://www.jgroups.com/">JGroups</A> framework
- *
- * @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
- * @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
- * @author Scott.Stark at jboss.org
- * @author brian.stansberry at jboss.com
- * @version $Revision$
- */
-public class HAPartitionImpl
-   extends RpcDispatcher
-   implements ExtendedMessageListener, MembershipListener,
-      HAPartition, AsynchEventHandler.AsynchEventProcessor
-{
-   private static final byte NULL_VALUE   = 0;
-   private static final byte SERIALIZABLE_VALUE = 1;
-   // TODO add Streamable support
-   // private static final byte STREAMABLE_VALUE = 2;
-   
-   /**
-    * Returned when an RPC call arrives for a service that isn't registered.
-    */
-   private static class NoHandlerForRPC implements Serializable
-   {
-      static final long serialVersionUID = -1263095408483622838L;
-   }
-   
-   private static class StateStreamEnd implements Serializable
-   {
-      /** The serialVersionUID */
-      private static final long serialVersionUID = -3705345735451504946L;      
-   }
-
-   // Constants -----------------------------------------------------
-
-   // final MethodLookup method_lookup_clos = new MethodLookupClos();
-
-   // Attributes ----------------------------------------------------
-
-   protected HashMap rpcHandlers = new HashMap();
-   protected HashMap stateHandlers = new HashMap();
-   /** Do we send any membership change notifications synchronously? */
-   protected boolean allowSyncListeners = false;
-   /** The HAMembershipListener and HAMembershipExtendedListeners */
-   protected ArrayList synchListeners = new ArrayList();
-   /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
-   protected ArrayList asynchListeners = new ArrayList();
-   /** The handler used to send membership change notifications asynchronously */
-   protected AsynchEventHandler asynchHandler;
-   /** The current cluster partition members */
-   protected Vector members = null;
-   protected Vector jgmembers = null;
-
-   public Vector history = null;
-
-   /** The partition members other than this node */
-   protected Vector otherMembers = null;
-   protected Vector jgotherMembers = null;
-   /** The JChannel name */
-   protected String partitionName;
-   /** the local JG IP Address */
-   protected org.jgroups.stack.IpAddress localJGAddress = null;
-   /** The cluster transport protocol address string */
-   protected String nodeName;
-   /** me as a ClusterNode */
-   protected ClusterNode me = null;
-   /** The timeout for cluster RPC calls */
-   protected long timeout = 60000;
-   /** The JGroups partition channel */
-   protected JChannel channel;
-   /** The cluster replicant manager */
-   protected DistributedReplicantManagerImpl replicantManager;
-   /** The cluster state manager */
-   protected DistributedState dsManager;
-   /** The cluster instance log category */
-   protected Logger log;
-   protected Logger clusterLifeCycleLog;   
-   /** The current cluster view id */
-   protected long currentViewId = -1;
-   /** The JMX MBeanServer to use for registrations */
-   protected MBeanServer server;
-   /** Number of ms to wait for state */
-   protected long state_transfer_timeout=60000;
-
-   /**
-    * True if state was initialized during start-up.
-    */
-   protected boolean isStateSet = false;
-
-   /**
-    * An exception occuring upon fetch state.
-    */
-   protected Exception setStateException;
-   private final Object stateLock = new Object();
-
-   // Static --------------------------------------------------------
-   
-   /**
-    * Creates an object from a byte buffer
-    */
-   public static Object objectFromByteBuffer (byte[] buffer) throws Exception
-   {
-      if(buffer == null) 
-         return null;
-
-      ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-      MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
-      return mvis.readObject();
-   }
-   
-   /**
-    * Serializes an object into a byte buffer.
-    * The object has to implement interface Serializable or Externalizable
-    */
-   public static byte[] objectToByteBuffer (Object obj) throws Exception
-   {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
-      mvos.writeObject(obj);
-      mvos.flush();
-      return baos.toByteArray();
-   }
-
-   public long getStateTransferTimeout() {
-      return state_transfer_timeout;
-   }
-
-   public void setStateTransferTimeout(long state_transfer_timeout) {
-      this.state_transfer_timeout=state_transfer_timeout;
-   }
-
-   public long getMethodCallTimeout() {
-      return timeout;
-   }
-
-   public void setMethodCallTimeout(long timeout) {
-      this.timeout=timeout;
-   }
-
-    // Constructors --------------------------------------------------
-       
-   public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection, MBeanServer server) throws Exception
-   {
-      this(partitionName, channel, deadlock_detection);
-      this.server = server;
-   }
-   
-   public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection) throws Exception
-   {
-      super(channel, null, null, new Object(), deadlock_detection); // init RpcDispatcher with a fake target object
-      this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
-      this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
-      this.channel = channel;
-      this.partitionName = partitionName;
-      this.history = new Vector();
-      this.setMarshaller(new MarshallerImpl());
-      logHistory ("Partition object created");
-   }
-   
-    // Public --------------------------------------------------------
-   
-   public void init() throws Exception
-   {
-      log.info("Initializing partition");
-      logHistory ("Initializing partition");
-
-      // Subscribe to events generated by the org.jgroups. protocol stack
-      log.debug("setMembershipListener");
-      setMembershipListener(this);
-      log.debug("setMessageListener");
-      setMessageListener(this);
-      
-      // Create the DRM and link it to this HAPartition
-      log.debug("create replicant manager");
-      this.replicantManager = new DistributedReplicantManagerImpl(this, this.server);
-      log.debug("init replicant manager");
-      this.replicantManager.init();
-
-      
-//      // Create the DS and link it to this HAPartition
-//      log.debug("create distributed state service");
-//      this.dsManager = new DistributedStateImpl(this, this.server);
-//      log.debug("init distributed state service");
-//      this.dsManager.init();
-
-      // Create the asynchronous handler for view changes
-      asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
-      
-      log.debug("done initializing partition");
-   }
-   
-   public void startPartition() throws Exception
-   {
-      // get current JG group properties
-      //
-      logHistory ("Starting partition");
-      log.debug("get nodeName");
-      this.localJGAddress = (IpAddress)channel.getLocalAddress();
-      this.me = new ClusterNode(this.localJGAddress);
-      this.nodeName = this.me.getName();
-
-      log.debug("Get current members");
-      View view = channel.getView();
-      this.jgmembers = (Vector)view.getMembers().clone();
-      this.members = translateAddresses(this.jgmembers); // TRANSLATE
-      log.info("Number of cluster members: " + members.size());
-      for(int m = 0; m > members.size(); m ++)
-      {
-         Object node = members.get(m);
-         log.debug(node);
-      }
-      // Keep a list of other members only for "exclude-self" RPC calls
-      //
-      this.jgotherMembers = (Vector)view.getMembers().clone();
-      this.jgotherMembers.remove (channel.getLocalAddress());
-      this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
-      log.info ("Other members: " + this.otherMembers.size ());
-
-      verifyNodeIsUnique(view.getMembers());
-
-      // Update the initial view id
-      this.currentViewId = view.getVid().getId();
-
-      // We must now synchronize new state transfer subscriber
-      //
-      fetchState();
-      
-      // We are now able to start our DRM and DS
-      this.replicantManager.start();
-
-      // Start the asynch listener handler thread
-      asynchHandler.start();
-      
-      // Bind ourself in the public JNDI space
-      Context ctx = new InitialContext();
-      this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
-      log.debug("Bound in JNDI under /HAPartition/" + partitionName);
-      
-   }
-
-
-   protected void fetchState() throws Exception
-   {
-      log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
-      long start, stop;
-      isStateSet = false;
-      start = System.currentTimeMillis();
-      boolean rc = channel.getState(null, this.state_transfer_timeout);
-      if (rc)
-      {
-         synchronized (stateLock)
-         {
-            while (!isStateSet)
-            {
-               if (setStateException != null)
-                  throw setStateException;
-
-               try
-               {
-                  stateLock.wait();
-               }
-               catch (InterruptedException iex)
-               {
-               }
-            }
-         }
-         stop = System.currentTimeMillis();
-         log.info("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
-      }
-      else
-      {
-         // No one provided us with state.
-         // We need to find out if we are the coordinator, so we must
-         // block until viewAccepted() is called at least once
-
-         synchronized (members)
-         {
-            while (members.size() == 0)
-            {
-               log.debug("waiting on viewAccepted()");
-               try
-               {
-                  members.wait();
-               }
-               catch (InterruptedException iex)
-               {
-               }
-            }
-         }
-
-         if (isCurrentNodeCoordinator())
-         {
-            log.info("State could not be retrieved (we are the first member in group)");
-         }
-         else
-         {
-            throw new IllegalStateException("Initial state transfer failed: " +
-               "Channel.getState() returned false");
-         }
-      }
-   }
-
-   public void closePartition() throws Exception
-   {
-      logHistory ("Closing partition");
-      log.info("Closing partition " + partitionName);
-
-      try
-      {
-         asynchHandler.stop();
-      }
-      catch( Exception e)
-      {
-         log.warn("Failed to stop asynchHandler", e);
-      }
-
-      // Stop the DRM service
-      // TODO remove when DRM is independent
-      try
-      {
-         this.replicantManager.stop();
-      }
-      catch (Exception e)
-      {
-         log.error("operation failed", e);
-      }
-
-//    NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
-//    add the destroyPartition() step
-      try
-      {
-         channel.disconnect();
-      }
-      catch (Exception e)
-      {
-         log.error("operation failed", e);
-      }
-
-     String boundName = "/HAPartition/" + partitionName;
-
-     InitialContext ctx = new InitialContext();
-     try
-     {
-        ctx.unbind(boundName);
-     }
-     finally
-     {
-        ctx.close();
-     }
-     NonSerializableFactory.unbind (boundName);
-
-      log.info("Partition " + partitionName + " closed.");
-   }
-   
-   public void destroyPartition()  throws Exception
-   {
-
-      try
-      {
-         this.replicantManager.destroy();
-      }
-      catch (Exception e)
-      {
-         log.error("Destroying DRM failed", e);
-      }      
-      
-      try
-      {
-         channel.close();
-      }
-      catch (Exception e)
-      {
-         log.error("Closing channel failed", e);
-      }
-
-      log.info("Partition " + partitionName + " destroyed.");
-   }
-   
-   // org.jgroups.MessageListener implementation ----------------------------------------------
-
-   public byte[] getState()
-   {
-      logHistory ("getState called on partition");
-      
-      log.debug("getState called.");
-      try
-      {
-         ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
-         getStateInternal(baos);
-         return baos.toByteArray();
-      }
-      catch (Exception ex)
-      {
-         log.error("getState failed", ex);
-      }
-      return null; // This will cause the receiver to get a "false" on the channel.getState() call
-   }
-   
-   public void getState(OutputStream stream)
-   {
-      logHistory ("getState called on partition");
-      
-      log.debug("getState called.");
-      try
-      {
-         getStateInternal(stream);
-      }
-      catch (Exception ex)
-      {
-         log.error("getState failed", ex);
-      }
-      
-   }
-   
-   private void getStateInternal(OutputStream stream) throws IOException
-   {
-      MarshalledValueOutputStream mvos = null; // don't create until we know we need it
-      
-      for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
-      {
-         Map.Entry entry = (Map.Entry)keys.next();
-         HAPartition.HAPartitionStateTransfer subscriber = 
-            (HAPartition.HAPartitionStateTransfer) entry.getValue();
-         log.debug("getState for " + entry.getKey());
-         Object state = subscriber.getCurrentState();
-         if (state != null)
-         {
-            if (mvos == null)
-            {
-               // This is our first write, so need to write the header first
-               stream.write(SERIALIZABLE_VALUE);
-               
-               mvos = new MarshalledValueOutputStream(stream);
-            }
-            
-            mvos.writeObject(entry.getKey());
-            mvos.writeObject(state);
-         }
-      }
-      
-      if (mvos == null)
-      {
-         // We never wrote any state, so write the NULL header
-         stream.write(NULL_VALUE);
-      }
-      else
-      {
-         mvos.writeObject(new StateStreamEnd());
-      }
-      
-   }
-   
-   public void setState(byte[] obj)
-   {
-      logHistory ("setState called on partition");
-      try
-      {
-         if (obj == null)
-         {
-            log.debug("transferred state is null (may be first member in cluster)");
-         }
-         else
-         {
-            ByteArrayInputStream bais = new ByteArrayInputStream(obj);
-            setStateInternal(bais);
-            bais.close();
-         }
-         
-         isStateSet = true;
-      }
-      catch (Throwable t)
-      {
-         recordSetStateFailure(t);
-      }
-      finally
-      {
-         notifyStateTransferCompleted();
-      }
-   }
-   
-   public void setState(InputStream stream)
-   {
-      logHistory ("setState called on partition");
-      try
-      {
-         if (stream == null)
-         {
-            log.debug("transferred state is null (may be first member in cluster)");
-         }
-         else
-         {
-            setStateInternal(stream);
-         }
-         
-         isStateSet = true;
-      }
-      catch (Throwable t)
-      {
-         recordSetStateFailure(t);
-      }
-      finally
-      {
-         notifyStateTransferCompleted();
-      }
-   }
-   
-   private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
-   {
-      byte type = (byte) stream.read();
-         
-      if (type == NULL_VALUE)
-      {
-         log.debug("state is null");
-         return;
-      }
-      
-      long used_mem_before, used_mem_after;
-      Runtime rt=Runtime.getRuntime();
-      used_mem_before=rt.totalMemory() - rt.freeMemory();
-      
-      MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
-      
-      while (true)
-      {
-         Object obj = mvis.readObject(); 
-         if (obj instanceof StateStreamEnd)
-            break;
-         
-         String key = (String) obj;
-         log.debug("setState for " + key);
-         Object someState = mvis.readObject();
-         HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
-         if (subscriber != null)
-         {
-            try
-            {
-               subscriber.setCurrentState((Serializable)someState);
-            }
-            catch (Exception e)
-            {
-               // Don't let issues with one subscriber affect others
-               // unless it is DRM, which is really an internal function
-               // of the HAPartition
-               // FIXME remove this once DRM is JBC-based
-               if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
-               {
-                  if (e instanceof RuntimeException)
-                     throw (RuntimeException) e;
-                  else
-                     throw new RuntimeException(e);
-               }
-               else
-               {
-                  log.error("Caught exception setting state to " + subscriber, e);
-               }
-            }
-         }
-         else
-         {
-            log.debug("There is no stateHandler for: " + key);
-         }      
-      }
-
-      used_mem_after=rt.totalMemory() - rt.freeMemory();
-      log.debug("received state; expanded memory by " +
-            (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
-            ", used memory after: " + used_mem_after + ")");
-   }
-
-   private void recordSetStateFailure(Throwable t)
-   {
-      log.error("failed setting state", t);
-      if (t instanceof Exception)
-         setStateException = (Exception) t;
-      else
-         setStateException = new Exception(t);
-   }
-
-   private void notifyStateTransferCompleted()
-   {
-      synchronized (stateLock)
-      {
-         // Notify wait that state has been set.
-         stateLock.notifyAll();
-      }
-   }
-   
-   public void getState(String state_id, OutputStream ostream)
-   {
-      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
-   }
-
-   public byte[] getState(String state_id)
-   {
-      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
-   }
-
-   public void setState(String state_id, byte[] state)
-   {
-      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
-   }
-
-   public void setState(String state_id, InputStream istream)
-   {
-      throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
-   }
-
-   public void receive(org.jgroups.Message msg)
-   { /* complete */}
-   
-   // org.jgroups.MembershipListener implementation ----------------------------------------------
-   
-   public void suspect(org.jgroups.Address suspected_mbr)
-   {      
-      logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
-      if (isCurrentNodeCoordinator ())
-         clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
-      else
-         log.info("Suspected member: " + suspected_mbr);
-   }
-
-   public void block() {}
-   
-   /** Notification of a cluster view change. This is done from the JG protocol
-    * handlder thread and we must be careful to not unduly block this thread.
-    * Because of this there are two types of listeners, synchronous and
-    * asynchronous. The synchronous listeners are messaged with the view change
-    * event using the calling thread while the asynchronous listeners are
-    * messaged using a seperate thread.
-    *
-    * @param newView
-    */
-   public void viewAccepted(View newView)
-   {
-      try
-      {
-         // we update the view id
-         //
-         this.currentViewId = newView.getVid().getId();
-
-         // Keep a list of other members only for "exclude-self" RPC calls
-         //
-         this.jgotherMembers = (Vector)newView.getMembers().clone();
-         this.jgotherMembers.remove (channel.getLocalAddress());
-         this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
-         Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
-         logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
-                     " (old view: " + this.members + " )");
-
-
-         // Save the previous view and make a copy of the new view
-         Vector oldMembers = this.members;
-
-         Vector newjgMembers = (Vector)newView.getMembers().clone();
-         Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
-         if (this.members == null)
-         {
-            // Initial viewAccepted
-            this.members = newMembers;
-            this.jgmembers = newjgMembers;
-            log.debug("ViewAccepted: initial members set");
-            return;
-         }
-         this.members = newMembers;
-         this.jgmembers = newjgMembers;
-
-         int difference = 0;
-         if (oldMembers == null)
-            difference = newMembers.size () - 1;
-         else
-            difference = newMembers.size () - oldMembers.size ();
-         
-         if (isCurrentNodeCoordinator ())
-            clusterLifeCycleLog.info ("New cluster view for partition " + this.partitionName + " (id: " +
-                                      this.currentViewId + ", delta: " + difference + ") : " + this.members);
-         else
-            log.info("New cluster view for partition " + this.partitionName + ": " +
-                     this.currentViewId + " (" + this.members + " delta: " + difference + ")");
-
-         // Build a ViewChangeEvent for the asynch listeners
-         ViewChangeEvent event = new ViewChangeEvent();
-         event.viewId = currentViewId;
-         event.allMembers = translatedNewView;
-         event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
-         event.newMembers = getNewMembers(oldMembers, event.allMembers);
-         event.originatingGroups = null;
-         // if the new view occurs because of a merge, we first inform listeners of the merge
-         if(newView instanceof MergeView)
-         {
-            MergeView mergeView = (MergeView) newView;
-            event.originatingGroups = mergeView.getSubgroups();
-         }
-
-         log.debug("membership changed from " + this.members.size() + " to "
-            + event.allMembers.size());
-         // Put the view change to the asynch queue
-         this.asynchHandler.queueEvent(event);
-
-         // Broadcast the new view to the synchronous view change listeners
-         if (this.allowSyncListeners)
-         {
-            this.notifyListeners(synchListeners, event.viewId, event.allMembers,
-                  event.deadMembers, event.newMembers, event.originatingGroups);
-         }
-      }
-      catch (Exception ex)
-      {
-         log.error("ViewAccepted failed", ex);
-      }
-   }
-
-   // HAPartition implementation ----------------------------------------------
-   
-   public String getNodeName()
-   {
-      return nodeName;
-   }
-   
-   public String getPartitionName()
-   {
-      return partitionName;
-   }
-   
-   public DistributedReplicantManager getDistributedReplicantManager()
-   {
-      return replicantManager;
-   }
-   
-   public DistributedState getDistributedStateService()
-   {
-      return this.dsManager;
-   }
-
-   public long getCurrentViewId()
-   {
-      return this.currentViewId;
-   }
-   
-   public Vector getCurrentView()
-   {
-      Vector result = new Vector (this.members.size());
-      for (int i = 0; i < members.size(); i++)
-      {
-         result.add( ((ClusterNode) members.elementAt(i)).getName() );
-      }
-      return result;
-   }
-
-   public ClusterNode[] getClusterNodes ()
-   {
-      ClusterNode[] nodes = new ClusterNode[this.members.size()];
-      this.members.toArray(nodes);
-      return nodes;
-   }
-
-   public ClusterNode getClusterNode ()
-   {
-      return me;
-   }
-
-   public boolean isCurrentNodeCoordinator ()
-   {
-      if(this.members == null || this.members.size() == 0 || this.me == null)
-         return false;
-     return this.members.elementAt (0).equals (this.me);
-   }
-
-   // ***************************
-   // ***************************
-   // RPC multicast communication
-   // ***************************
-   // ***************************
-   //
-   public void registerRPCHandler(String objName, Object subscriber)
-   {
-      rpcHandlers.put(objName, subscriber);
-   }
-   
-   public void unregisterRPCHandler(String objName, Object subscriber)
-   {
-      rpcHandlers.remove(objName);
-   }
-      
-
-   /**
-    *
-    * @param objName
-    * @param methodName
-    * @param args
-    * @param excludeSelf
-    * @return
-    * @throws Exception
-    * @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
-    */
-   public ArrayList callMethodOnCluster(String objName, String methodName,
-      Object[] args, boolean excludeSelf) throws Exception
-   {
-      return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
-   }
-
-   /**
-    * This function is an abstraction of RpcDispatcher.
-    */
-   public ArrayList callMethodOnCluster(String objName, String methodName,
-      Object[] args, Class[] types, boolean excludeSelf) throws Exception
-   {
-      return callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.timeout);
-   }
-
-
-   public ArrayList callMethodOnCluster(String objName, String methodName,
-       Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
-   {
-      ArrayList rtn = new ArrayList();
-      MethodCall m=null;
-      RspList rsp = null;
-      boolean trace = log.isTraceEnabled();
-
-      if(types != null)
-         m=new MethodCall(objName + "." + methodName, args, types);
-      else
-         m=new MethodCall(objName + "." + methodName, args);
-
-      if (excludeSelf)
-      {
-         if( trace )
-         {
-            log.trace("callMethodOnCluster(true), objName="+objName
-               +", methodName="+methodName+", members="+jgotherMembers);
-         }
-         rsp = this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
-      }
-      else
-      {
-         if( trace )
-         {
-            log.trace("callMethodOnCluster(false), objName="+objName
-               +", methodName="+methodName+", members="+members);
-         }
-         rsp = this.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
-      }
-
-      if (rsp != null)
-      {
-         for (int i = 0; i < rsp.size(); i++)
-         {
-            Object item = rsp.elementAt(i);
-            if (item instanceof Rsp)
-            {
-               Rsp response = (Rsp) item;
-               // Only include received responses
-               boolean wasReceived = response.wasReceived();
-               if( wasReceived == true )
-               {
-                  item = response.getValue();
-                  if (!(item instanceof NoHandlerForRPC))
-                     rtn.add(item);
-               }
-               else if( trace )
-                  log.trace("Ignoring non-received response: "+response);
-            }
-            else
-            {
-               if (!(item instanceof NoHandlerForRPC))
-                  rtn.add(item);
-               else if( trace )
-                  log.trace("Ignoring NoHandlerForRPC");
-            }
-         }
-      }
-
-      return rtn;
-    }
-
-   /**
-    * Calls method on Cluster coordinator node only.  The cluster coordinator node is the first node to join the
-    * cluster.
-    * and is replaced
-    * @param objName
-    * @param methodName
-    * @param args
-    * @param types
-    * @param excludeSelf
-    * @return
-    * @throws Exception
-    */
-   public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
-          Object[] args, Class[] types,boolean excludeSelf) throws Exception
-      {
-      return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf,this.timeout);
-      }
-
-   /**
-    * Calls method on Cluster coordinator node only.  The cluster coordinator node is the first node to join the
-    * cluster.
-    * and is replaced
-    * @param objName
-    * @param methodName
-    * @param args
-    * @param types
-    * @param excludeSelf
-    * @param methodTimeout
-    * @return
-    * @throws Exception
-    */
-   public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
-          Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
-      {
-         ArrayList rtn = new ArrayList();
-         MethodCall m=null;
-         RspList rsp = null;
-         boolean trace = log.isTraceEnabled();
-
-         if(types != null)
-            m=new MethodCall(objName + "." + methodName, args, types);
-         else
-            m=new MethodCall(objName + "." + methodName, args);
-
-         if( trace )
-         {
-            log.trace("callMethodOnCoordinatorNode(false), objName="+objName
-               +", methodName="+methodName);
-         }
-
-         // the first cluster view member is the coordinator
-         Vector coordinatorOnly = new Vector();
-         // If we are the coordinator, only call ourself if 'excludeSelf' is false
-         if (false == isCurrentNodeCoordinator () ||
-             false == excludeSelf)
-            coordinatorOnly.addElement(this.jgmembers.elementAt (0));
-
-         rsp = this.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
-
-         if (rsp != null)
-         {
-            for (int i = 0; i < rsp.size(); i++)
-            {
-               Object item = rsp.elementAt(i);
-               if (item instanceof Rsp)
-               {
-                  Rsp response = (Rsp) item;
-                  // Only include received responses
-                  boolean wasReceived = response.wasReceived();
-                  if( wasReceived == true )
-                  {
-                     item = response.getValue();
-                     if (!(item instanceof NoHandlerForRPC))
-                        rtn.add(item);
-                  }
-                  else if( trace )
-                     log.trace("Ignoring non-received response: "+response);
-               }
-               else
-               {
-                  if (!(item instanceof NoHandlerForRPC))
-                     rtn.add(item);
-                  else if( trace )
-                     log.trace("Ignoring NoHandlerForRPC");
-               }
-            }
-         }
-
-         return rtn;
-       }
-
-
-   /**
-    *
-    * @param objName
-    * @param methodName
-    * @param args
-    * @param excludeSelf
-    * @throws Exception
-    * @deprecated Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead
-    */
-   public void callAsynchMethodOnCluster(String objName, String methodName,
-      Object[] args, boolean excludeSelf) throws Exception {
-      callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
-   }
-
-   /**
-    * This function is an abstraction of RpcDispatcher for asynchronous messages
-    */
-   public void callAsynchMethodOnCluster(String objName, String methodName,
-      Object[] args, Class[] types, boolean excludeSelf) throws Exception
-   {
-      MethodCall m = null;
-      boolean trace = log.isTraceEnabled();
-
-      if(types != null)
-         m=new MethodCall(objName + "." + methodName, args, types);
-      else
-         m=new MethodCall(objName + "." + methodName, args);
-
-      if (excludeSelf)
-      {
-         if( trace )
-         {
-            log.trace("callAsynchMethodOnCluster(true), objName="+objName
-               +", methodName="+methodName+", members="+jgotherMembers);
-         }
-         this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, timeout);
-      }
-      else
-      {
-         if( trace )
-         {
-            log.trace("callAsynchMethodOnCluster(false), objName="+objName
-               +", methodName="+methodName+", members="+members);
-         }
-         this.callRemoteMethods(null, m, GroupRequest.GET_NONE, timeout);
-      }
-   }
-   
-   // *************************
-   // *************************
-   // State transfer management
-   // *************************
-   // *************************
-   //      
-   public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
-   {
-      stateHandlers.put(objectName, subscriber);
-   }
-   
-   public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
-   {
-      stateHandlers.remove(objectName);
-   }
-   
-   // *************************
-   // *************************
-   // Group Membership listeners
-   // *************************
-   // *************************
-   //   
-   public void registerMembershipListener(HAMembershipListener listener)
-   {
-      boolean isAsynch = (this.allowSyncListeners == false) 
-            || (listener instanceof AsynchHAMembershipListener)
-            || (listener instanceof AsynchHAMembershipExtendedListener);
-      if( isAsynch ) {
-         synchronized(this.asynchListeners) {
-            this.asynchListeners.add(listener);
-         }
-      }
-      else  { 
-         synchronized(this.synchListeners) {
-            this.synchListeners.add(listener);
-         }
-      }
-   }
-   
-   public void unregisterMembershipListener(HAMembershipListener listener)
-   {
-      boolean isAsynch = (this.allowSyncListeners == false) 
-            || (listener instanceof AsynchHAMembershipListener)
-            || (listener instanceof AsynchHAMembershipExtendedListener);
-      if( isAsynch ) {
-         synchronized(this.asynchListeners) {
-            this.asynchListeners.remove(listener);
-         }
-      }
-      else  { 
-         synchronized(this.synchListeners) {
-            this.synchListeners.remove(listener);
-         }
-      }
-   }
-   
-   public boolean getAllowSynchronousMembershipNotifications()
-   {
-      return allowSyncListeners;
-   }
-
-   public void setAllowSynchronousMembershipNotifications(boolean allowSync)
-   {      
-      this.allowSyncListeners = allowSync;
-   }
-   
-   // org.jgroups.RpcDispatcher overrides ---------------------------------------------------
-   
-   /**
-    * Message contains MethodCall. Execute it against *this* object and return result.
-    * Use MethodCall.Invoke() to do this. Return result.
-    *
-    * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
-    * @param req The org.jgroups. representation of the method invocation
-    * @return The serializable return value from the invocation
-    */
-   public Object handle(Message req)
-   {
-      Object body = null;
-      Object retval = null;
-      MethodCall  method_call = null;
-      boolean trace = log.isTraceEnabled();
-      
-      if( trace )
-         log.trace("Partition " + partitionName + " received msg");
-      if(req == null || req.getBuffer() == null)
-      {
-         log.warn("message or message buffer is null !");
-         return null;
-      }
-      
-      try
-      {
-         body = objectFromByteBuffer(req.getBuffer());
-      }
-      catch(Exception e)
-      {
-         log.warn("failed unserializing message buffer (msg=" + req + ")", e);
-         return null;
-      }
-      
-      if(body == null || !(body instanceof MethodCall))
-      {
-         log.warn("message does not contain a MethodCall object !");
-         return null;
-      }
-      
-      // get method call informations
-      //
-      method_call = (MethodCall)body;
-      String methodName = method_call.getName();      
-      
-      if( trace )
-         log.trace("pre methodName: " + methodName);
-      
-      int idx = methodName.lastIndexOf('.');
-      String handlerName = methodName.substring(0, idx);
-      String newMethodName = methodName.substring(idx + 1);
-      
-      if( trace ) 
-      {
-         log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
-         log.trace("Handle: " + methodName);
-      }
-      
-      // prepare method call
-      method_call.setName(newMethodName);
-      Object handler = rpcHandlers.get(handlerName);
-      if (handler == null)
-      {
-         if( trace )
-            log.debug("No rpc handler registered under: "+handlerName);
-         return new NoHandlerForRPC();
-      }
-
-      /* Invoke it and just return any exception with trace level logging of
-      the exception. The exception semantics of a group rpc call are weak as
-      the return value may be a normal return value or the exception thrown.
-      */
-      try
-      {
-         retval = method_call.invoke(handler);
-         if( trace )
-            log.trace("rpc call return value: "+retval);
-      }
-      catch (Throwable t)
-      {
-         if( trace )
-            log.trace("rpc call threw exception", t);
-         retval = t;
-      }
-      
-      return retval;
-   }
-   
-   // AsynchEventHandler.AsynchEventProcessor -----------------------
-
-   public void processEvent(Object event)
-   {
-      ViewChangeEvent vce = (ViewChangeEvent) event;
-      notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
-            vce.deadMembers, vce.newMembers, vce.originatingGroups);
-      
-   }
-   
-   
-   // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-
-   protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception
-   {
-      byte[] localUniqueName = this.localJGAddress.getAdditionalData();
-      if (localUniqueName == null)
-      {
-         log.error("No additional information has been found in the JGroups address; " +
-                  "make sure you are running with a correct version of JGroups and that the protocols " +
-                  "you are using support 'additionalData' behaviour.");
-         throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; local node name is missing.");
-      }
-
-      for (int i = 0; i < javaGroupIpAddresses.size(); i++)
-      {
-         IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
-         if (!address.equals(this.localJGAddress))
-         {
-            if (localUniqueName.equals(address.getAdditionalData()))
-               throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; another node (" + address + ") publicizing the same name was already there.");
-         }
-      }
-   }
-
-   /**
-    * Helper method that binds the partition in the JNDI tree.
-    * @param jndiName Name under which the object must be bound
-    * @param who Object to bind in JNDI
-    * @param classType Class type under which should appear the bound object
-    * @param ctx Naming context under which we bind the object
-    * @throws Exception Thrown if a naming exception occurs during binding
-    */   
-   protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
-   {
-      // Ah ! This service isn't serializable, so we use a helper class
-      //
-      NonSerializableFactory.bind(jndiName, who);
-      Name n = ctx.getNameParser("").parse(jndiName);
-      while (n.size () > 1)
-      {
-         String ctxName = n.get (0);
-         try
-         {
-            ctx = (Context)ctx.lookup (ctxName);
-         }
-         catch (NameNotFoundException e)
-         {
-            log.debug ("creating Subcontext " + ctxName);
-            ctx = ctx.createSubcontext (ctxName);
-         }
-         n = n.getSuffix (1);
-      }
-
-      // The helper class NonSerializableFactory uses address type nns, we go on to
-      // use the helper class to bind the service object in JNDI
-      //
-      StringRefAddr addr = new StringRefAddr("nns", jndiName);
-      Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
-      ctx.rebind (n.get (0), ref);
-   }
-   
-   /**
-    * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
-    * Dead members are old - new members.
-    * @param oldMembers Vector of old members
-    * @param newMembers Vector of new members
-    * @return Vector of members that have died between the two views, can be empty.
-    */   
-   protected Vector getDeadMembers(Vector oldMembers, Vector newMembers)
-   {
-      boolean debug = log.isDebugEnabled();
-      if(oldMembers == null) oldMembers=new Vector();
-      if(newMembers == null) newMembers=new Vector();
-      Vector dead=(Vector)oldMembers.clone();
-      dead.removeAll(newMembers);
-      if(dead.size() > 0 && debug)
-         log.debug("dead members: " + dead);
-      return dead;
-   }
-   
-   /**
-    * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
-    * @param oldMembers Vector of old members
-    * @param allMembers Vector of new members
-    * @return Vector of members that have joined the partition between the two views
-    */   
-   protected Vector getNewMembers(Vector oldMembers, Vector allMembers)
-   {
-      if(oldMembers == null) oldMembers=new Vector();
-      if(allMembers == null) allMembers=new Vector();
-      Vector newMembers=(Vector)allMembers.clone();
-      newMembers.removeAll(oldMembers);
-      return newMembers;
-   }
-
-   protected void notifyListeners(ArrayList theListeners, long viewID,
-      Vector allMembers, Vector deadMembers, Vector newMembers,
-      Vector originatingGroups)
-   {
-      log.debug("Begin notifyListeners, viewID: "+viewID);
-      synchronized(theListeners)
-      {
-         // JBAS-3619 -- don't hold synch lock while notifying
-         theListeners = (ArrayList) theListeners.clone();
-      }
-      
-      for (int i = 0; i < theListeners.size(); i++)
-      {
-         HAMembershipListener aListener = null;
-         try
-         {
-            aListener = (HAMembershipListener) theListeners.get(i);
-            if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
-            {
-               HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
-               exListener.membershipChangedDuringMerge (deadMembers, newMembers,
-                  allMembers, originatingGroups);
-            }
-            else
-            {
-               aListener.membershipChanged(deadMembers, newMembers, allMembers);
-            }
-         }
-         catch (Throwable e)
-         {
-            // a problem in a listener should not prevent other members to receive the new view
-            log.warn("HAMembershipListener callback failure: "+aListener, e);
-         }
-      }
-      
-      log.debug("End notifyListeners, viewID: "+viewID);
-   }
-
-   protected Vector translateAddresses (Vector jgAddresses)
-   {
-      if (jgAddresses == null)
-         return null;
-
-      Vector result = new Vector (jgAddresses.size());
-      for (int i = 0; i < jgAddresses.size(); i++)
-      {
-         IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
-         result.add(new ClusterNode (addr));
-      }
-
-      return result;
-   }
-   
-   protected void setDistributedState(DistributedState distState)
-   {
-      this.dsManager = distState;
-   }
-
-   public void logHistory (String message)
-   {
-      try
-      {
-         history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
-      }
-      catch (Exception ignored){}
-   }
-
-   /** A simply data class containing the view change event needed to
-    * message the HAMembershipListeners
-    */
-   private static class ViewChangeEvent
-   {
-      long viewId;
-      Vector deadMembers;
-      Vector newMembers;
-      Vector allMembers;
-      Vector originatingGroups;
-   }
-   
-   private class MarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
-   {
-
-      public Object objectFromByteBuffer(byte[] buf) throws Exception
-      {
-         return HAPartitionImpl.objectFromByteBuffer(buf);
-      }
-
-      public byte[] objectToByteBuffer(Object obj) throws Exception
-      {
-         return HAPartitionImpl.objectToByteBuffer(obj);
-      }
-      
-   }
-
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------
-
-}




More information about the jboss-cvs-commits mailing list