[jboss-cvs] JBossAS SVN: r58567 - trunk/cluster/src/main/org/jboss/ha/framework/server
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Nov 18 06:28:22 EST 2006
Author: bstansberry at jboss.com
Date: 2006-11-18 06:28:21 -0500 (Sat, 18 Nov 2006)
New Revision: 58567
Removed:
trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Log:
ClusterPartition now directly implements HAPartition
Deleted: trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-11-18 11:27:50 UTC (rev 58566)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-11-18 11:28:21 UTC (rev 58567)
@@ -1,1446 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.ha.framework.server;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Vector;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.Name;
-import javax.naming.NameNotFoundException;
-import javax.naming.Reference;
-import javax.naming.StringRefAddr;
-import javax.management.MBeanServer;
-
-import org.jgroups.ExtendedMessageListener;
-import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
-import org.jgroups.MergeView;
-import org.jgroups.View;
-import org.jgroups.Message;
-import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.stack.IpAddress;
-import org.jgroups.util.Rsp;
-import org.jgroups.util.RspList;
-
-import org.jboss.invocation.MarshalledValueInputStream;
-import org.jboss.invocation.MarshalledValueOutputStream;
-import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
-import org.jboss.ha.framework.interfaces.DistributedState;
-import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.interfaces.ClusterNode;
-
-import org.jboss.naming.NonSerializableFactory;
-import org.jboss.logging.Logger;
-
-/**
- * This class is an abstraction class for a JGroups RPCDispatcher and JChannel.
- * It is a default implementation of HAPartition for the
- * <a href="http://www.jgroups.com/">JGroups</A> framework
- *
- * @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
- * @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
- * @author Scott.Stark at jboss.org
- * @author brian.stansberry at jboss.com
- * @version $Revision$
- */
-public class HAPartitionImpl
- extends RpcDispatcher
- implements ExtendedMessageListener, MembershipListener,
- HAPartition, AsynchEventHandler.AsynchEventProcessor
-{
- private static final byte NULL_VALUE = 0;
- private static final byte SERIALIZABLE_VALUE = 1;
- // TODO add Streamable support
- // private static final byte STREAMABLE_VALUE = 2;
-
- /**
- * Returned when an RPC call arrives for a service that isn't registered.
- */
- private static class NoHandlerForRPC implements Serializable
- {
- static final long serialVersionUID = -1263095408483622838L;
- }
-
- private static class StateStreamEnd implements Serializable
- {
- /** The serialVersionUID */
- private static final long serialVersionUID = -3705345735451504946L;
- }
-
- // Constants -----------------------------------------------------
-
- // final MethodLookup method_lookup_clos = new MethodLookupClos();
-
- // Attributes ----------------------------------------------------
-
- protected HashMap rpcHandlers = new HashMap();
- protected HashMap stateHandlers = new HashMap();
- /** Do we send any membership change notifications synchronously? */
- protected boolean allowSyncListeners = false;
- /** The HAMembershipListener and HAMembershipExtendedListeners */
- protected ArrayList synchListeners = new ArrayList();
- /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
- protected ArrayList asynchListeners = new ArrayList();
- /** The handler used to send membership change notifications asynchronously */
- protected AsynchEventHandler asynchHandler;
- /** The current cluster partition members */
- protected Vector members = null;
- protected Vector jgmembers = null;
-
- public Vector history = null;
-
- /** The partition members other than this node */
- protected Vector otherMembers = null;
- protected Vector jgotherMembers = null;
- /** The JChannel name */
- protected String partitionName;
- /** the local JG IP Address */
- protected org.jgroups.stack.IpAddress localJGAddress = null;
- /** The cluster transport protocol address string */
- protected String nodeName;
- /** me as a ClusterNode */
- protected ClusterNode me = null;
- /** The timeout for cluster RPC calls */
- protected long timeout = 60000;
- /** The JGroups partition channel */
- protected JChannel channel;
- /** The cluster replicant manager */
- protected DistributedReplicantManagerImpl replicantManager;
- /** The cluster state manager */
- protected DistributedState dsManager;
- /** The cluster instance log category */
- protected Logger log;
- protected Logger clusterLifeCycleLog;
- /** The current cluster view id */
- protected long currentViewId = -1;
- /** The JMX MBeanServer to use for registrations */
- protected MBeanServer server;
- /** Number of ms to wait for state */
- protected long state_transfer_timeout=60000;
-
- /**
- * True if state was initialized during start-up.
- */
- protected boolean isStateSet = false;
-
- /**
- * An exception occuring upon fetch state.
- */
- protected Exception setStateException;
- private final Object stateLock = new Object();
-
- // Static --------------------------------------------------------
-
- /**
- * Creates an object from a byte buffer
- */
- public static Object objectFromByteBuffer (byte[] buffer) throws Exception
- {
- if(buffer == null)
- return null;
-
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
- MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
- return mvis.readObject();
- }
-
- /**
- * Serializes an object into a byte buffer.
- * The object has to implement interface Serializable or Externalizable
- */
- public static byte[] objectToByteBuffer (Object obj) throws Exception
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
- mvos.writeObject(obj);
- mvos.flush();
- return baos.toByteArray();
- }
-
- public long getStateTransferTimeout() {
- return state_transfer_timeout;
- }
-
- public void setStateTransferTimeout(long state_transfer_timeout) {
- this.state_transfer_timeout=state_transfer_timeout;
- }
-
- public long getMethodCallTimeout() {
- return timeout;
- }
-
- public void setMethodCallTimeout(long timeout) {
- this.timeout=timeout;
- }
-
- // Constructors --------------------------------------------------
-
- public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection, MBeanServer server) throws Exception
- {
- this(partitionName, channel, deadlock_detection);
- this.server = server;
- }
-
- public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection) throws Exception
- {
- super(channel, null, null, new Object(), deadlock_detection); // init RpcDispatcher with a fake target object
- this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
- this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
- this.channel = channel;
- this.partitionName = partitionName;
- this.history = new Vector();
- this.setMarshaller(new MarshallerImpl());
- logHistory ("Partition object created");
- }
-
- // Public --------------------------------------------------------
-
- public void init() throws Exception
- {
- log.info("Initializing partition");
- logHistory ("Initializing partition");
-
- // Subscribe to events generated by the org.jgroups. protocol stack
- log.debug("setMembershipListener");
- setMembershipListener(this);
- log.debug("setMessageListener");
- setMessageListener(this);
-
- // Create the DRM and link it to this HAPartition
- log.debug("create replicant manager");
- this.replicantManager = new DistributedReplicantManagerImpl(this, this.server);
- log.debug("init replicant manager");
- this.replicantManager.init();
-
-
-// // Create the DS and link it to this HAPartition
-// log.debug("create distributed state service");
-// this.dsManager = new DistributedStateImpl(this, this.server);
-// log.debug("init distributed state service");
-// this.dsManager.init();
-
- // Create the asynchronous handler for view changes
- asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
-
- log.debug("done initializing partition");
- }
-
- public void startPartition() throws Exception
- {
- // get current JG group properties
- //
- logHistory ("Starting partition");
- log.debug("get nodeName");
- this.localJGAddress = (IpAddress)channel.getLocalAddress();
- this.me = new ClusterNode(this.localJGAddress);
- this.nodeName = this.me.getName();
-
- log.debug("Get current members");
- View view = channel.getView();
- this.jgmembers = (Vector)view.getMembers().clone();
- this.members = translateAddresses(this.jgmembers); // TRANSLATE
- log.info("Number of cluster members: " + members.size());
- for(int m = 0; m > members.size(); m ++)
- {
- Object node = members.get(m);
- log.debug(node);
- }
- // Keep a list of other members only for "exclude-self" RPC calls
- //
- this.jgotherMembers = (Vector)view.getMembers().clone();
- this.jgotherMembers.remove (channel.getLocalAddress());
- this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
- log.info ("Other members: " + this.otherMembers.size ());
-
- verifyNodeIsUnique(view.getMembers());
-
- // Update the initial view id
- this.currentViewId = view.getVid().getId();
-
- // We must now synchronize new state transfer subscriber
- //
- fetchState();
-
- // We are now able to start our DRM and DS
- this.replicantManager.start();
-
- // Start the asynch listener handler thread
- asynchHandler.start();
-
- // Bind ourself in the public JNDI space
- Context ctx = new InitialContext();
- this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
- log.debug("Bound in JNDI under /HAPartition/" + partitionName);
-
- }
-
-
- protected void fetchState() throws Exception
- {
- log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
- long start, stop;
- isStateSet = false;
- start = System.currentTimeMillis();
- boolean rc = channel.getState(null, this.state_transfer_timeout);
- if (rc)
- {
- synchronized (stateLock)
- {
- while (!isStateSet)
- {
- if (setStateException != null)
- throw setStateException;
-
- try
- {
- stateLock.wait();
- }
- catch (InterruptedException iex)
- {
- }
- }
- }
- stop = System.currentTimeMillis();
- log.info("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
- }
- else
- {
- // No one provided us with state.
- // We need to find out if we are the coordinator, so we must
- // block until viewAccepted() is called at least once
-
- synchronized (members)
- {
- while (members.size() == 0)
- {
- log.debug("waiting on viewAccepted()");
- try
- {
- members.wait();
- }
- catch (InterruptedException iex)
- {
- }
- }
- }
-
- if (isCurrentNodeCoordinator())
- {
- log.info("State could not be retrieved (we are the first member in group)");
- }
- else
- {
- throw new IllegalStateException("Initial state transfer failed: " +
- "Channel.getState() returned false");
- }
- }
- }
-
- public void closePartition() throws Exception
- {
- logHistory ("Closing partition");
- log.info("Closing partition " + partitionName);
-
- try
- {
- asynchHandler.stop();
- }
- catch( Exception e)
- {
- log.warn("Failed to stop asynchHandler", e);
- }
-
- // Stop the DRM service
- // TODO remove when DRM is independent
- try
- {
- this.replicantManager.stop();
- }
- catch (Exception e)
- {
- log.error("operation failed", e);
- }
-
-// NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
-// add the destroyPartition() step
- try
- {
- channel.disconnect();
- }
- catch (Exception e)
- {
- log.error("operation failed", e);
- }
-
- String boundName = "/HAPartition/" + partitionName;
-
- InitialContext ctx = new InitialContext();
- try
- {
- ctx.unbind(boundName);
- }
- finally
- {
- ctx.close();
- }
- NonSerializableFactory.unbind (boundName);
-
- log.info("Partition " + partitionName + " closed.");
- }
-
- public void destroyPartition() throws Exception
- {
-
- try
- {
- this.replicantManager.destroy();
- }
- catch (Exception e)
- {
- log.error("Destroying DRM failed", e);
- }
-
- try
- {
- channel.close();
- }
- catch (Exception e)
- {
- log.error("Closing channel failed", e);
- }
-
- log.info("Partition " + partitionName + " destroyed.");
- }
-
- // org.jgroups.MessageListener implementation ----------------------------------------------
-
- public byte[] getState()
- {
- logHistory ("getState called on partition");
-
- log.debug("getState called.");
- try
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
- getStateInternal(baos);
- return baos.toByteArray();
- }
- catch (Exception ex)
- {
- log.error("getState failed", ex);
- }
- return null; // This will cause the receiver to get a "false" on the channel.getState() call
- }
-
- public void getState(OutputStream stream)
- {
- logHistory ("getState called on partition");
-
- log.debug("getState called.");
- try
- {
- getStateInternal(stream);
- }
- catch (Exception ex)
- {
- log.error("getState failed", ex);
- }
-
- }
-
- private void getStateInternal(OutputStream stream) throws IOException
- {
- MarshalledValueOutputStream mvos = null; // don't create until we know we need it
-
- for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
- {
- Map.Entry entry = (Map.Entry)keys.next();
- HAPartition.HAPartitionStateTransfer subscriber =
- (HAPartition.HAPartitionStateTransfer) entry.getValue();
- log.debug("getState for " + entry.getKey());
- Object state = subscriber.getCurrentState();
- if (state != null)
- {
- if (mvos == null)
- {
- // This is our first write, so need to write the header first
- stream.write(SERIALIZABLE_VALUE);
-
- mvos = new MarshalledValueOutputStream(stream);
- }
-
- mvos.writeObject(entry.getKey());
- mvos.writeObject(state);
- }
- }
-
- if (mvos == null)
- {
- // We never wrote any state, so write the NULL header
- stream.write(NULL_VALUE);
- }
- else
- {
- mvos.writeObject(new StateStreamEnd());
- }
-
- }
-
- public void setState(byte[] obj)
- {
- logHistory ("setState called on partition");
- try
- {
- if (obj == null)
- {
- log.debug("transferred state is null (may be first member in cluster)");
- }
- else
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(obj);
- setStateInternal(bais);
- bais.close();
- }
-
- isStateSet = true;
- }
- catch (Throwable t)
- {
- recordSetStateFailure(t);
- }
- finally
- {
- notifyStateTransferCompleted();
- }
- }
-
- public void setState(InputStream stream)
- {
- logHistory ("setState called on partition");
- try
- {
- if (stream == null)
- {
- log.debug("transferred state is null (may be first member in cluster)");
- }
- else
- {
- setStateInternal(stream);
- }
-
- isStateSet = true;
- }
- catch (Throwable t)
- {
- recordSetStateFailure(t);
- }
- finally
- {
- notifyStateTransferCompleted();
- }
- }
-
- private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
- {
- byte type = (byte) stream.read();
-
- if (type == NULL_VALUE)
- {
- log.debug("state is null");
- return;
- }
-
- long used_mem_before, used_mem_after;
- Runtime rt=Runtime.getRuntime();
- used_mem_before=rt.totalMemory() - rt.freeMemory();
-
- MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
-
- while (true)
- {
- Object obj = mvis.readObject();
- if (obj instanceof StateStreamEnd)
- break;
-
- String key = (String) obj;
- log.debug("setState for " + key);
- Object someState = mvis.readObject();
- HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
- if (subscriber != null)
- {
- try
- {
- subscriber.setCurrentState((Serializable)someState);
- }
- catch (Exception e)
- {
- // Don't let issues with one subscriber affect others
- // unless it is DRM, which is really an internal function
- // of the HAPartition
- // FIXME remove this once DRM is JBC-based
- if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
- {
- if (e instanceof RuntimeException)
- throw (RuntimeException) e;
- else
- throw new RuntimeException(e);
- }
- else
- {
- log.error("Caught exception setting state to " + subscriber, e);
- }
- }
- }
- else
- {
- log.debug("There is no stateHandler for: " + key);
- }
- }
-
- used_mem_after=rt.totalMemory() - rt.freeMemory();
- log.debug("received state; expanded memory by " +
- (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
- ", used memory after: " + used_mem_after + ")");
- }
-
- private void recordSetStateFailure(Throwable t)
- {
- log.error("failed setting state", t);
- if (t instanceof Exception)
- setStateException = (Exception) t;
- else
- setStateException = new Exception(t);
- }
-
- private void notifyStateTransferCompleted()
- {
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
-
- public void getState(String state_id, OutputStream ostream)
- {
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
- }
-
- public byte[] getState(String state_id)
- {
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
- }
-
- public void setState(String state_id, byte[] state)
- {
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
- }
-
- public void setState(String state_id, InputStream istream)
- {
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
- }
-
- public void receive(org.jgroups.Message msg)
- { /* complete */}
-
- // org.jgroups.MembershipListener implementation ----------------------------------------------
-
- public void suspect(org.jgroups.Address suspected_mbr)
- {
- logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
- if (isCurrentNodeCoordinator ())
- clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
- else
- log.info("Suspected member: " + suspected_mbr);
- }
-
- public void block() {}
-
- /** Notification of a cluster view change. This is done from the JG protocol
- * handlder thread and we must be careful to not unduly block this thread.
- * Because of this there are two types of listeners, synchronous and
- * asynchronous. The synchronous listeners are messaged with the view change
- * event using the calling thread while the asynchronous listeners are
- * messaged using a seperate thread.
- *
- * @param newView
- */
- public void viewAccepted(View newView)
- {
- try
- {
- // we update the view id
- //
- this.currentViewId = newView.getVid().getId();
-
- // Keep a list of other members only for "exclude-self" RPC calls
- //
- this.jgotherMembers = (Vector)newView.getMembers().clone();
- this.jgotherMembers.remove (channel.getLocalAddress());
- this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
- Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
- logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
- " (old view: " + this.members + " )");
-
-
- // Save the previous view and make a copy of the new view
- Vector oldMembers = this.members;
-
- Vector newjgMembers = (Vector)newView.getMembers().clone();
- Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
- if (this.members == null)
- {
- // Initial viewAccepted
- this.members = newMembers;
- this.jgmembers = newjgMembers;
- log.debug("ViewAccepted: initial members set");
- return;
- }
- this.members = newMembers;
- this.jgmembers = newjgMembers;
-
- int difference = 0;
- if (oldMembers == null)
- difference = newMembers.size () - 1;
- else
- difference = newMembers.size () - oldMembers.size ();
-
- if (isCurrentNodeCoordinator ())
- clusterLifeCycleLog.info ("New cluster view for partition " + this.partitionName + " (id: " +
- this.currentViewId + ", delta: " + difference + ") : " + this.members);
- else
- log.info("New cluster view for partition " + this.partitionName + ": " +
- this.currentViewId + " (" + this.members + " delta: " + difference + ")");
-
- // Build a ViewChangeEvent for the asynch listeners
- ViewChangeEvent event = new ViewChangeEvent();
- event.viewId = currentViewId;
- event.allMembers = translatedNewView;
- event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
- event.newMembers = getNewMembers(oldMembers, event.allMembers);
- event.originatingGroups = null;
- // if the new view occurs because of a merge, we first inform listeners of the merge
- if(newView instanceof MergeView)
- {
- MergeView mergeView = (MergeView) newView;
- event.originatingGroups = mergeView.getSubgroups();
- }
-
- log.debug("membership changed from " + this.members.size() + " to "
- + event.allMembers.size());
- // Put the view change to the asynch queue
- this.asynchHandler.queueEvent(event);
-
- // Broadcast the new view to the synchronous view change listeners
- if (this.allowSyncListeners)
- {
- this.notifyListeners(synchListeners, event.viewId, event.allMembers,
- event.deadMembers, event.newMembers, event.originatingGroups);
- }
- }
- catch (Exception ex)
- {
- log.error("ViewAccepted failed", ex);
- }
- }
-
- // HAPartition implementation ----------------------------------------------
-
- public String getNodeName()
- {
- return nodeName;
- }
-
- public String getPartitionName()
- {
- return partitionName;
- }
-
- public DistributedReplicantManager getDistributedReplicantManager()
- {
- return replicantManager;
- }
-
- public DistributedState getDistributedStateService()
- {
- return this.dsManager;
- }
-
- public long getCurrentViewId()
- {
- return this.currentViewId;
- }
-
- public Vector getCurrentView()
- {
- Vector result = new Vector (this.members.size());
- for (int i = 0; i < members.size(); i++)
- {
- result.add( ((ClusterNode) members.elementAt(i)).getName() );
- }
- return result;
- }
-
- public ClusterNode[] getClusterNodes ()
- {
- ClusterNode[] nodes = new ClusterNode[this.members.size()];
- this.members.toArray(nodes);
- return nodes;
- }
-
- public ClusterNode getClusterNode ()
- {
- return me;
- }
-
- public boolean isCurrentNodeCoordinator ()
- {
- if(this.members == null || this.members.size() == 0 || this.me == null)
- return false;
- return this.members.elementAt (0).equals (this.me);
- }
-
- // ***************************
- // ***************************
- // RPC multicast communication
- // ***************************
- // ***************************
- //
- public void registerRPCHandler(String objName, Object subscriber)
- {
- rpcHandlers.put(objName, subscriber);
- }
-
- public void unregisterRPCHandler(String objName, Object subscriber)
- {
- rpcHandlers.remove(objName);
- }
-
-
- /**
- *
- * @param objName
- * @param methodName
- * @param args
- * @param excludeSelf
- * @return
- * @throws Exception
- * @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
- */
- public ArrayList callMethodOnCluster(String objName, String methodName,
- Object[] args, boolean excludeSelf) throws Exception
- {
- return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
- }
-
- /**
- * This function is an abstraction of RpcDispatcher.
- */
- public ArrayList callMethodOnCluster(String objName, String methodName,
- Object[] args, Class[] types, boolean excludeSelf) throws Exception
- {
- return callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.timeout);
- }
-
-
- public ArrayList callMethodOnCluster(String objName, String methodName,
- Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
- {
- ArrayList rtn = new ArrayList();
- MethodCall m=null;
- RspList rsp = null;
- boolean trace = log.isTraceEnabled();
-
- if(types != null)
- m=new MethodCall(objName + "." + methodName, args, types);
- else
- m=new MethodCall(objName + "." + methodName, args);
-
- if (excludeSelf)
- {
- if( trace )
- {
- log.trace("callMethodOnCluster(true), objName="+objName
- +", methodName="+methodName+", members="+jgotherMembers);
- }
- rsp = this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
- }
- else
- {
- if( trace )
- {
- log.trace("callMethodOnCluster(false), objName="+objName
- +", methodName="+methodName+", members="+members);
- }
- rsp = this.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
- }
-
- if (rsp != null)
- {
- for (int i = 0; i < rsp.size(); i++)
- {
- Object item = rsp.elementAt(i);
- if (item instanceof Rsp)
- {
- Rsp response = (Rsp) item;
- // Only include received responses
- boolean wasReceived = response.wasReceived();
- if( wasReceived == true )
- {
- item = response.getValue();
- if (!(item instanceof NoHandlerForRPC))
- rtn.add(item);
- }
- else if( trace )
- log.trace("Ignoring non-received response: "+response);
- }
- else
- {
- if (!(item instanceof NoHandlerForRPC))
- rtn.add(item);
- else if( trace )
- log.trace("Ignoring NoHandlerForRPC");
- }
- }
- }
-
- return rtn;
- }
-
- /**
- * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
- * cluster.
- * and is replaced
- * @param objName
- * @param methodName
- * @param args
- * @param types
- * @param excludeSelf
- * @return
- * @throws Exception
- */
- public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
- Object[] args, Class[] types,boolean excludeSelf) throws Exception
- {
- return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf,this.timeout);
- }
-
- /**
- * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
- * cluster.
- * and is replaced
- * @param objName
- * @param methodName
- * @param args
- * @param types
- * @param excludeSelf
- * @param methodTimeout
- * @return
- * @throws Exception
- */
- public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
- Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
- {
- ArrayList rtn = new ArrayList();
- MethodCall m=null;
- RspList rsp = null;
- boolean trace = log.isTraceEnabled();
-
- if(types != null)
- m=new MethodCall(objName + "." + methodName, args, types);
- else
- m=new MethodCall(objName + "." + methodName, args);
-
- if( trace )
- {
- log.trace("callMethodOnCoordinatorNode(false), objName="+objName
- +", methodName="+methodName);
- }
-
- // the first cluster view member is the coordinator
- Vector coordinatorOnly = new Vector();
- // If we are the coordinator, only call ourself if 'excludeSelf' is false
- if (false == isCurrentNodeCoordinator () ||
- false == excludeSelf)
- coordinatorOnly.addElement(this.jgmembers.elementAt (0));
-
- rsp = this.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
-
- if (rsp != null)
- {
- for (int i = 0; i < rsp.size(); i++)
- {
- Object item = rsp.elementAt(i);
- if (item instanceof Rsp)
- {
- Rsp response = (Rsp) item;
- // Only include received responses
- boolean wasReceived = response.wasReceived();
- if( wasReceived == true )
- {
- item = response.getValue();
- if (!(item instanceof NoHandlerForRPC))
- rtn.add(item);
- }
- else if( trace )
- log.trace("Ignoring non-received response: "+response);
- }
- else
- {
- if (!(item instanceof NoHandlerForRPC))
- rtn.add(item);
- else if( trace )
- log.trace("Ignoring NoHandlerForRPC");
- }
- }
- }
-
- return rtn;
- }
-
-
- /**
- *
- * @param objName
- * @param methodName
- * @param args
- * @param excludeSelf
- * @throws Exception
- * @deprecated Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead
- */
- public void callAsynchMethodOnCluster(String objName, String methodName,
- Object[] args, boolean excludeSelf) throws Exception {
- callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
- }
-
- /**
- * This function is an abstraction of RpcDispatcher for asynchronous messages
- */
- public void callAsynchMethodOnCluster(String objName, String methodName,
- Object[] args, Class[] types, boolean excludeSelf) throws Exception
- {
- MethodCall m = null;
- boolean trace = log.isTraceEnabled();
-
- if(types != null)
- m=new MethodCall(objName + "." + methodName, args, types);
- else
- m=new MethodCall(objName + "." + methodName, args);
-
- if (excludeSelf)
- {
- if( trace )
- {
- log.trace("callAsynchMethodOnCluster(true), objName="+objName
- +", methodName="+methodName+", members="+jgotherMembers);
- }
- this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, timeout);
- }
- else
- {
- if( trace )
- {
- log.trace("callAsynchMethodOnCluster(false), objName="+objName
- +", methodName="+methodName+", members="+members);
- }
- this.callRemoteMethods(null, m, GroupRequest.GET_NONE, timeout);
- }
- }
-
- // *************************
- // *************************
- // State transfer management
- // *************************
- // *************************
- //
- public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
- {
- stateHandlers.put(objectName, subscriber);
- }
-
- public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
- {
- stateHandlers.remove(objectName);
- }
-
- // *************************
- // *************************
- // Group Membership listeners
- // *************************
- // *************************
- //
- public void registerMembershipListener(HAMembershipListener listener)
- {
- boolean isAsynch = (this.allowSyncListeners == false)
- || (listener instanceof AsynchHAMembershipListener)
- || (listener instanceof AsynchHAMembershipExtendedListener);
- if( isAsynch ) {
- synchronized(this.asynchListeners) {
- this.asynchListeners.add(listener);
- }
- }
- else {
- synchronized(this.synchListeners) {
- this.synchListeners.add(listener);
- }
- }
- }
-
- public void unregisterMembershipListener(HAMembershipListener listener)
- {
- boolean isAsynch = (this.allowSyncListeners == false)
- || (listener instanceof AsynchHAMembershipListener)
- || (listener instanceof AsynchHAMembershipExtendedListener);
- if( isAsynch ) {
- synchronized(this.asynchListeners) {
- this.asynchListeners.remove(listener);
- }
- }
- else {
- synchronized(this.synchListeners) {
- this.synchListeners.remove(listener);
- }
- }
- }
-
- public boolean getAllowSynchronousMembershipNotifications()
- {
- return allowSyncListeners;
- }
-
- public void setAllowSynchronousMembershipNotifications(boolean allowSync)
- {
- this.allowSyncListeners = allowSync;
- }
-
- // org.jgroups.RpcDispatcher overrides ---------------------------------------------------
-
- /**
- * Message contains MethodCall. Execute it against *this* object and return result.
- * Use MethodCall.Invoke() to do this. Return result.
- *
- * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
- * @param req The org.jgroups. representation of the method invocation
- * @return The serializable return value from the invocation
- */
- public Object handle(Message req)
- {
- Object body = null;
- Object retval = null;
- MethodCall method_call = null;
- boolean trace = log.isTraceEnabled();
-
- if( trace )
- log.trace("Partition " + partitionName + " received msg");
- if(req == null || req.getBuffer() == null)
- {
- log.warn("message or message buffer is null !");
- return null;
- }
-
- try
- {
- body = objectFromByteBuffer(req.getBuffer());
- }
- catch(Exception e)
- {
- log.warn("failed unserializing message buffer (msg=" + req + ")", e);
- return null;
- }
-
- if(body == null || !(body instanceof MethodCall))
- {
- log.warn("message does not contain a MethodCall object !");
- return null;
- }
-
- // get method call informations
- //
- method_call = (MethodCall)body;
- String methodName = method_call.getName();
-
- if( trace )
- log.trace("pre methodName: " + methodName);
-
- int idx = methodName.lastIndexOf('.');
- String handlerName = methodName.substring(0, idx);
- String newMethodName = methodName.substring(idx + 1);
-
- if( trace )
- {
- log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
- log.trace("Handle: " + methodName);
- }
-
- // prepare method call
- method_call.setName(newMethodName);
- Object handler = rpcHandlers.get(handlerName);
- if (handler == null)
- {
- if( trace )
- log.debug("No rpc handler registered under: "+handlerName);
- return new NoHandlerForRPC();
- }
-
- /* Invoke it and just return any exception with trace level logging of
- the exception. The exception semantics of a group rpc call are weak as
- the return value may be a normal return value or the exception thrown.
- */
- try
- {
- retval = method_call.invoke(handler);
- if( trace )
- log.trace("rpc call return value: "+retval);
- }
- catch (Throwable t)
- {
- if( trace )
- log.trace("rpc call threw exception", t);
- retval = t;
- }
-
- return retval;
- }
-
- // AsynchEventHandler.AsynchEventProcessor -----------------------
-
- public void processEvent(Object event)
- {
- ViewChangeEvent vce = (ViewChangeEvent) event;
- notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
- vce.deadMembers, vce.newMembers, vce.originatingGroups);
-
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception
- {
- byte[] localUniqueName = this.localJGAddress.getAdditionalData();
- if (localUniqueName == null)
- {
- log.error("No additional information has been found in the JGroups address; " +
- "make sure you are running with a correct version of JGroups and that the protocols " +
- "you are using support 'additionalData' behaviour.");
- throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; local node name is missing.");
- }
-
- for (int i = 0; i < javaGroupIpAddresses.size(); i++)
- {
- IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
- if (!address.equals(this.localJGAddress))
- {
- if (localUniqueName.equals(address.getAdditionalData()))
- throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; another node (" + address + ") publicizing the same name was already there.");
- }
- }
- }
-
- /**
- * Helper method that binds the partition in the JNDI tree.
- * @param jndiName Name under which the object must be bound
- * @param who Object to bind in JNDI
- * @param classType Class type under which should appear the bound object
- * @param ctx Naming context under which we bind the object
- * @throws Exception Thrown if a naming exception occurs during binding
- */
- protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
- {
- // Ah ! This service isn't serializable, so we use a helper class
- //
- NonSerializableFactory.bind(jndiName, who);
- Name n = ctx.getNameParser("").parse(jndiName);
- while (n.size () > 1)
- {
- String ctxName = n.get (0);
- try
- {
- ctx = (Context)ctx.lookup (ctxName);
- }
- catch (NameNotFoundException e)
- {
- log.debug ("creating Subcontext " + ctxName);
- ctx = ctx.createSubcontext (ctxName);
- }
- n = n.getSuffix (1);
- }
-
- // The helper class NonSerializableFactory uses address type nns, we go on to
- // use the helper class to bind the service object in JNDI
- //
- StringRefAddr addr = new StringRefAddr("nns", jndiName);
- Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
- ctx.rebind (n.get (0), ref);
- }
-
- /**
- * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
- * Dead members are old - new members.
- * @param oldMembers Vector of old members
- * @param newMembers Vector of new members
- * @return Vector of members that have died between the two views, can be empty.
- */
- protected Vector getDeadMembers(Vector oldMembers, Vector newMembers)
- {
- boolean debug = log.isDebugEnabled();
- if(oldMembers == null) oldMembers=new Vector();
- if(newMembers == null) newMembers=new Vector();
- Vector dead=(Vector)oldMembers.clone();
- dead.removeAll(newMembers);
- if(dead.size() > 0 && debug)
- log.debug("dead members: " + dead);
- return dead;
- }
-
- /**
- * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
- * @param oldMembers Vector of old members
- * @param allMembers Vector of new members
- * @return Vector of members that have joined the partition between the two views
- */
- protected Vector getNewMembers(Vector oldMembers, Vector allMembers)
- {
- if(oldMembers == null) oldMembers=new Vector();
- if(allMembers == null) allMembers=new Vector();
- Vector newMembers=(Vector)allMembers.clone();
- newMembers.removeAll(oldMembers);
- return newMembers;
- }
-
- protected void notifyListeners(ArrayList theListeners, long viewID,
- Vector allMembers, Vector deadMembers, Vector newMembers,
- Vector originatingGroups)
- {
- log.debug("Begin notifyListeners, viewID: "+viewID);
- synchronized(theListeners)
- {
- // JBAS-3619 -- don't hold synch lock while notifying
- theListeners = (ArrayList) theListeners.clone();
- }
-
- for (int i = 0; i < theListeners.size(); i++)
- {
- HAMembershipListener aListener = null;
- try
- {
- aListener = (HAMembershipListener) theListeners.get(i);
- if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
- {
- HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
- exListener.membershipChangedDuringMerge (deadMembers, newMembers,
- allMembers, originatingGroups);
- }
- else
- {
- aListener.membershipChanged(deadMembers, newMembers, allMembers);
- }
- }
- catch (Throwable e)
- {
- // a problem in a listener should not prevent other members to receive the new view
- log.warn("HAMembershipListener callback failure: "+aListener, e);
- }
- }
-
- log.debug("End notifyListeners, viewID: "+viewID);
- }
-
- protected Vector translateAddresses (Vector jgAddresses)
- {
- if (jgAddresses == null)
- return null;
-
- Vector result = new Vector (jgAddresses.size());
- for (int i = 0; i < jgAddresses.size(); i++)
- {
- IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
- result.add(new ClusterNode (addr));
- }
-
- return result;
- }
-
- protected void setDistributedState(DistributedState distState)
- {
- this.dsManager = distState;
- }
-
- public void logHistory (String message)
- {
- try
- {
- history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
- }
- catch (Exception ignored){}
- }
-
- /** A simply data class containing the view change event needed to
- * message the HAMembershipListeners
- */
- private static class ViewChangeEvent
- {
- long viewId;
- Vector deadMembers;
- Vector newMembers;
- Vector allMembers;
- Vector originatingGroups;
- }
-
- private class MarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
- {
-
- public Object objectFromByteBuffer(byte[] buf) throws Exception
- {
- return HAPartitionImpl.objectFromByteBuffer(buf);
- }
-
- public byte[] objectToByteBuffer(Object obj) throws Exception
- {
- return HAPartitionImpl.objectToByteBuffer(obj);
- }
-
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
More information about the jboss-cvs-commits
mailing list