[jboss-cvs] JBossAS SVN: r58577 - trunk/cluster/src/main/org/jboss/ha/framework/server
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Nov 18 06:37:03 EST 2006
Author: bstansberry at jboss.com
Date: 2006-11-18 06:37:00 -0500 (Sat, 18 Nov 2006)
New Revision: 58577
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
Log:
Combine ClusterPartition and HAPartitionImpl
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2006-11-18 11:36:36 UTC (rev 58576)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2006-11-18 11:37:00 UTC (rev 58577)
@@ -21,428 +21,1266 @@
*/
package org.jboss.ha.framework.server;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.net.InetAddress;
import java.rmi.dgc.VMID;
import java.rmi.server.UID;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Vector;
-import javax.management.Attribute;
-import javax.management.AttributeList;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NameNotFoundException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
-import org.jboss.cache.TreeCacheMBean;
+import org.jboss.cache.Cache;
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.mx.util.MBeanProxyExt;
-import org.jboss.naming.NamingServiceMBean;
-import org.jboss.system.ServiceMBean;
+import org.jboss.invocation.MarshalledValueInputStream;
+import org.jboss.invocation.MarshalledValueOutputStream;
+import org.jboss.logging.Logger;
+import org.jboss.naming.NonSerializableFactory;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.system.server.ServerConfigUtil;
import org.jgroups.Channel;
import org.jgroups.Event;
+import org.jgroups.ExtendedMessageListener;
+import org.jgroups.JChannel;
+import org.jgroups.MembershipListener;
+import org.jgroups.MergeView;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
import org.jgroups.Version;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.debug.Debugger;
-import org.jgroups.JChannel;
import org.jgroups.jmx.JChannelFactoryMBean;
import org.jgroups.stack.IpAddress;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
/**
- * Management Bean for Cluster HAPartitions. It will start a JGroups
- * channel and initialize the ReplicantManager and DistributedStateService.
+ * {@link HAPartition} implementation based on a
+ * <a href="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code>
+ * and a multiplexed <code>JChannel</code>.
*
- * @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
- * @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
- * @version $Revision$
+ * @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
+ * @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
+ * @author Scott.Stark at jboss.org
+ * @author brian.stansberry at jboss.com
+ * @version $Revision$
*/
public class ClusterPartition
extends ServiceMBeanSupport
- implements ClusterPartitionMBean
+ implements MembershipListener, HAPartition,
+ AsynchEventHandler.AsynchEventProcessor,
+ ClusterPartitionMBean
{
+ private static final byte NULL_VALUE = 0;
+ private static final byte SERIALIZABLE_VALUE = 1;
+ // TODO add Streamable support
+ // private static final byte STREAMABLE_VALUE = 2;
+
+ /**
+ * Returned when an RPC call arrives for a service that isn't registered.
+ */
+ private static class NoHandlerForRPC implements Serializable
+ {
+ static final long serialVersionUID = -1263095408483622838L;
+ }
+
+ private static class StateStreamEnd implements Serializable
+ {
+ /** The serialVersionUID */
+ private static final long serialVersionUID = -3705345735451504946L;
+ }
+
// Constants -----------------------------------------------------
+ // final MethodLookup method_lookup_clos = new MethodLookupClos();
+
// Attributes ----------------------------------------------------
- private TreeCacheMBean cache;
- private ObjectName multiplexerObjectName;
- private JChannelFactoryMBean multiplexer;
- private String stackName;
- private String partitionName = ServerConfigUtil.getDefaultPartitionName();
- private DistributedState dsManager;
- private HAPartitionImpl partition;
- private boolean deadlock_detection = false;
- private boolean allow_sync_events = false;
- private JChannel channel;
- private Debugger debugger=null;
- private boolean use_debugger=false;
+ protected ClusterPartitionConfig config;
+ protected HashMap rpcHandlers = new HashMap();
+ protected HashMap stateHandlers = new HashMap();
+ /** Do we send any membership change notifications synchronously? */
+ protected boolean allowSyncListeners = false;
+ /** The HAMembershipListener and HAMembershipExtendedListeners */
+ protected ArrayList synchListeners = new ArrayList();
+ /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
+ protected ArrayList asynchListeners = new ArrayList();
+ /** The handler used to send membership change notifications asynchronously */
+ protected AsynchEventHandler asynchHandler;
+ /** The current cluster partition members */
+ protected Vector members = null;
+ protected Vector jgmembers = null;
- private String nodeName = null;
- private InetAddress nodeAddress = null;
+ public Vector history = null;
- /** Number of milliseconds to wait until state has been transferred. Increase this value for large states
- * 0 = wait forever
+ /** The partition members other than this node */
+ protected Vector otherMembers = null;
+ protected Vector jgotherMembers = null;
+ /** the local JG IP Address */
+ protected org.jgroups.stack.IpAddress localJGAddress = null;
+ /** The cluster transport protocol address string */
+ protected String nodeName;
+ /** me as a ClusterNode */
+ protected ClusterNode me = null;
+ /** The JGroups partition channel */
+ protected JChannel channel;
+ /** The cluster replicant manager */
+ protected DistributedReplicantManager replicantManager;
+ /** The cluster instance log category */
+ protected Logger log;
+ protected Logger clusterLifeCycleLog;
+ /** The current cluster view id */
+ protected long currentViewId = -1;
+
+ private RpcDispatcher dispatcher = null;
+
+ /**
+ * True if serviceState was initialized during start-up.
*/
- private long state_transfer_timeout=60000;
+ protected boolean isStateSet = false;
+ /**
+ * An exception occuring upon fetch serviceState.
+ */
+ protected Exception setStateException;
+ private final Object stateLock = new Object();
+ private final MessageListenerAdapter messageListener = new MessageListenerAdapter();
+ private Debugger debugger;
+ private boolean selfCreatedDRM;
- private long method_call_timeout=60000;
-
// Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // ClusterPartitionMBean implementation ----------------------------------------------
-
- public String getPartitionName()
+
+ /**
+ * Creates an object from a byte buffer
+ */
+ public static Object objectFromByteBuffer (byte[] buffer) throws Exception
{
- return partitionName;
- }
+ if(buffer == null)
+ return null;
- public void setPartitionName(String newName)
- {
- partitionName = newName;
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+ MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
+ return mvis.readObject();
}
-
+
/**
- * Uniquely identifies this node. MUST be unique accros the whole cluster!
- * Cannot be changed once the partition has been started
+ * Serializes an object into a byte buffer.
+ * The object has to implement interface Serializable or Externalizable
*/
- public String getNodeName()
+ public static byte[] objectToByteBuffer (Object obj) throws Exception
{
- return this.nodeName;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
+ mvos.writeObject(obj);
+ mvos.flush();
+ return baos.toByteArray();
}
- public void setNodeName(String node) throws Exception
+ private static JChannel createMuxChannel(ClusterPartitionConfig config)
{
- if (this.getState() == ServiceMBean.CREATED ||
- this.getState() == ServiceMBean.STARTED ||
- this.getState() == ServiceMBean.STARTING)
+ JChannelFactoryMBean factory = config.getMultiplexer();
+ if (factory == null)
+ throw new IllegalStateException("HAPartitionConfig has no JChannelFactory");
+ String stack = config.getMultiplexerStack();
+ if (stack == null)
+ throw new IllegalStateException("HAPartitionConfig has no multiplexer stack");
+ try
{
- throw new Exception ("Node name cannot be changed once the partition has been started");
+ return (JChannel) factory.createMultiplexerChannel(stack, config.getMultiplexerStack());
}
- else
+ catch (RuntimeException e)
{
- this.nodeName = node;
+ throw e;
}
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failure creatig multiplexed Channel", e);
+ }
}
- public InetAddress getNodeAddress()
+ // Constructors --------------------------------------------------
+
+ public ClusterPartition(ClusterPartitionConfig config)
{
- return nodeAddress;
+ if (config == null)
+ throw new IllegalArgumentException("config cannot be null");
+
+ this.config = config;
+ setupLoggers(config.getPartitionName());
+ this.history = new Vector();
+ logHistory ("Partition object created");
}
- public void setNodeAddress(InetAddress address)
+ // ------------------------------------------------------------ ServiceMBean
+
+ // ----------------------------------------------------------------- Service
+
+ protected void createService() throws Exception
{
- this.nodeAddress = address;
+ if (config == null)
+ throw new IllegalArgumentException("config cannot be null");
+ log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
+ " using stack " + getMultiplexerStack());
+
+ channel = createMuxChannel(config);
+
+ channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
+ channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
+
+ log.info("Initializing partition " + getPartitionName());
+ logHistory ("Initializing partition " + getPartitionName());
+
+ dispatcher = new RpcHandler(channel, null, null, new Object(), config.getDeadlockDetection());
+
+ // Subscribe to events generated by the org.jgroups. protocol stack
+ log.debug("setMembershipListener");
+ dispatcher.setMembershipListener(this);
+ log.debug("setMessageListener");
+ dispatcher.setMessageListener(messageListener);
+ dispatcher.setMarshaller(new MarshallerImpl());
+
+ // FIXME remove once @JMX issues are sorted
+ if (replicantManager == null)
+ {
+ // Create the DRM and link it to this HAPartition
+ log.debug("create replicant manager");
+ DistributedReplicantManagerImpl drm = new DistributedReplicantManagerImpl(this);
+ if (server != null)
+ drm.registerWithJmx(server);
+ log.debug("create replicant manager");
+ drm.create();
+ setDistributedReplicantManager(drm);
+ this.selfCreatedDRM = true;
+ }
+
+// // Create the DS and link it to this HAPartition
+// log.debug("create distributed serviceState service");
+// this.dsManager = new DistributedStateImpl(this, this.server);
+// log.debug("init distributed serviceState service");
+// this.dsManager.init();
+
+ // Create the asynchronous handler for view changes
+ asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
+
+ log.debug("done initializing partition");
}
+
+ protected void startService() throws Exception
+ {
+ logHistory ("Starting partition");
+
+ // Store our uniqueId in the channel
+ configureUniqueId();
+
+ channel.connect(getPartitionName());
+
+ try
+ {
+ // get current JG group properties
+
+ log.debug("get nodeName");
+ this.localJGAddress = (IpAddress)channel.getLocalAddress();
+ this.me = new ClusterNode(this.localJGAddress);
+ this.nodeName = this.me.getName();
- public String getJGroupsVersion()
- {
- return Version.description + "( " + Version.cvs + ")";
+ // FIXME -- just block waiting for viewAccepted!
+ log.debug("Get current members");
+ View view = channel.getView();
+ this.jgmembers = (Vector)view.getMembers().clone();
+ this.members = translateAddresses(this.jgmembers); // TRANSLATE
+ log.info("Number of cluster members: " + members.size());
+ for(int m = 0; m > members.size(); m ++)
+ {
+ Object node = members.get(m);
+ log.debug(node);
+ }
+ // Keep a list of other members only for "exclude-self" RPC calls
+
+ this.jgotherMembers = (Vector)view.getMembers().clone();
+ this.jgotherMembers.remove (channel.getLocalAddress());
+ this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
+ log.info ("Other members: " + this.otherMembers.size ());
+
+ verifyNodeIsUnique(view.getMembers());
+
+ // Update the initial view id
+ this.currentViewId = view.getVid().getId();
+
+ // We must now synchronize new serviceState transfer subscriber
+ //
+ fetchState();
+
+ if (selfCreatedDRM)
+ {
+ // We are now able to start our DRM and DS
+ ((DistributedReplicantManagerImpl) this.replicantManager).start();
+ }
+
+ // Start the asynch listener handler thread
+ asynchHandler.start();
+
+ // Bind ourself in the public JNDI space
+ Context ctx = new InitialContext();
+ this.bind("/HAPartition/" + getPartitionName(), this, ClusterPartition.class, ctx);
+ log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+ }
+ catch (Throwable t)
+ {
+ log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
+ channel.disconnect();
+ throw (t instanceof Exception) ? (Exception) t : new RuntimeException(t);
+ }
+
}
- public long getStateTransferTimeout()
+ protected void stopService() throws Exception
{
- return state_transfer_timeout;
+ logHistory ("Stopping partition");
+ log.info("Stopping partition " + getPartitionName());
+
+ stopChannelDebugger();
+
+ try
+ {
+ asynchHandler.stop();
+ }
+ catch( Exception e)
+ {
+ log.warn("Failed to stop asynchHandler", e);
+ }
+
+
+ if (selfCreatedDRM)
+ {
+ // Stop the DRM service
+ // TODO remove when DRM is independent
+ try
+ {
+ ((DistributedReplicantManagerImpl) this.replicantManager).stop();
+ }
+ catch (Exception e)
+ {
+ log.error("operation failed", e);
+ }
+ }
+
+// NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
+// add the destroyPartition() step
+ try
+ {
+ channel.disconnect();
+ }
+ catch (Exception e)
+ {
+ log.error("operation failed", e);
+ }
+
+ String boundName = "/HAPartition/" + getPartitionName();
+
+ InitialContext ctx = new InitialContext();
+ try
+ {
+ ctx.unbind(boundName);
+ }
+ finally
+ {
+ ctx.close();
+ }
+
+ NonSerializableFactory.unbind (boundName);
+
+ log.info("Partition " + getPartitionName() + " stopped.");
}
+
+ protected void destroyService() throws Exception
+ {
+ log.debug("Destroying HAPartition: " + getPartitionName());
+
+ if (selfCreatedDRM)
+ {
+ try
+ {
+ if (server != null)
+ ((DistributedReplicantManagerImpl) replicantManager).unregisterWithJmx(server);
+ ((DistributedReplicantManagerImpl) replicantManager).destroy();
+ }
+ catch (Exception e)
+ {
+ log.error("Destroying DRM failed", e);
+ }
+ }
+ try
+ {
+ channel.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Closing channel failed", e);
+ }
- public void setStateTransferTimeout(long timeout)
+ log.info("Partition " + getPartitionName() + " destroyed.");
+ }
+
+ // ---------------------------------------------------------- State Transfer
+
+
+ protected void fetchState() throws Exception
{
- this.state_transfer_timeout=timeout;
+ log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() +
+ " milliseconds):");
+ long start, stop;
+ isStateSet = false;
+ start = System.currentTimeMillis();
+ boolean rc = channel.getState(null, getStateTransferTimeout());
+ if (rc)
+ {
+ synchronized (stateLock)
+ {
+ while (!isStateSet)
+ {
+ if (setStateException != null)
+ throw setStateException;
+
+ try
+ {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+ stop = System.currentTimeMillis();
+ log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
+ }
+ else
+ {
+ // No one provided us with serviceState.
+ // We need to find out if we are the coordinator, so we must
+ // block until viewAccepted() is called at least once
+
+ synchronized (members)
+ {
+ while (members.size() == 0)
+ {
+ log.debug("waiting on viewAccepted()");
+ try
+ {
+ members.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+
+ if (isCurrentNodeCoordinator())
+ {
+ log.info("State could not be retrieved (we are the first member in group)");
+ }
+ else
+ {
+ throw new IllegalStateException("Initial serviceState transfer failed: " +
+ "Channel.getState() returned false");
+ }
+ }
}
- public long getMethodCallTimeout() {
- return method_call_timeout;
+ private void getStateInternal(OutputStream stream) throws IOException
+ {
+ MarshalledValueOutputStream mvos = null; // don't create until we know we need it
+
+ for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)keys.next();
+ HAPartition.HAPartitionStateTransfer subscriber =
+ (HAPartition.HAPartitionStateTransfer) entry.getValue();
+ log.debug("getState for " + entry.getKey());
+ Object state = subscriber.getCurrentState();
+ if (state != null)
+ {
+ if (mvos == null)
+ {
+ // This is our first write, so need to write the header first
+ stream.write(SERIALIZABLE_VALUE);
+
+ mvos = new MarshalledValueOutputStream(stream);
+ }
+
+ mvos.writeObject(entry.getKey());
+ mvos.writeObject(state);
+ }
+ }
+
+ if (mvos == null)
+ {
+ // We never wrote any serviceState, so write the NULL header
+ stream.write(NULL_VALUE);
+ }
+ else
+ {
+ mvos.writeObject(new StateStreamEnd());
+ }
+
}
+
+ private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
+ {
+ byte type = (byte) stream.read();
+
+ if (type == NULL_VALUE)
+ {
+ log.debug("serviceState is null");
+ return;
+ }
+
+ long used_mem_before, used_mem_after;
+ Runtime rt=Runtime.getRuntime();
+ used_mem_before=rt.totalMemory() - rt.freeMemory();
+
+ MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
+
+ while (true)
+ {
+ Object obj = mvis.readObject();
+ if (obj instanceof StateStreamEnd)
+ break;
+
+ String key = (String) obj;
+ log.debug("setState for " + key);
+ Object someState = mvis.readObject();
+ HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
+ if (subscriber != null)
+ {
+ try
+ {
+ subscriber.setCurrentState((Serializable)someState);
+ }
+ catch (Exception e)
+ {
+ // Don't let issues with one subscriber affect others
+ // unless it is DRM, which is really an internal function
+ // of the HAPartition
+ // FIXME remove this once DRM is JBC-based
+ if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
+ {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
+ }
+ else
+ {
+ log.error("Caught exception setting serviceState to " + subscriber, e);
+ }
+ }
+ }
+ else
+ {
+ log.debug("There is no stateHandler for: " + key);
+ }
+ }
- public void setMethodCallTimeout(long timeout) {
- this.method_call_timeout=timeout;
+ used_mem_after=rt.totalMemory() - rt.freeMemory();
+ log.debug("received serviceState; expanded memory by " +
+ (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
+ ", used memory after: " + used_mem_after + ")");
}
-// public boolean getChannelDebugger()
-// {
-// return this.use_debugger;
-// }
-//
-// public void setChannelDebugger(boolean flag)
-// {
-// this.use_debugger=flag;
-// }
-
- public boolean getDeadlockDetection()
+ private void recordSetStateFailure(Throwable t)
{
- return deadlock_detection;
+ log.error("failed setting serviceState", t);
+ if (t instanceof Exception)
+ setStateException = (Exception) t;
+ else
+ setStateException = new Exception(t);
}
- public void setDeadlockDetection(boolean doit)
+ private void notifyStateTransferCompleted()
{
- deadlock_detection = doit;
+ synchronized (stateLock)
+ {
+ // Notify wait that serviceState has been set.
+ stateLock.notifyAll();
+ }
}
+
+ // org.jgroups.MembershipListener implementation ----------------------------------------------
+
+ public void suspect(org.jgroups.Address suspected_mbr)
+ {
+ logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
+ if (isCurrentNodeCoordinator ())
+ clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
+ else
+ log.info("Suspected member: " + suspected_mbr);
+ }
- public boolean getAllowSynchronousMembershipNotifications()
+ public void block() {}
+
+ /** Notification of a cluster view change. This is done from the JG protocol
+ * handlder thread and we must be careful to not unduly block this thread.
+ * Because of this there are two types of listeners, synchronous and
+ * asynchronous. The synchronous listeners are messaged with the view change
+ * event using the calling thread while the asynchronous listeners are
+ * messaged using a seperate thread.
+ *
+ * @param newView
+ */
+ public void viewAccepted(View newView)
{
- return allow_sync_events;
+ try
+ {
+ // we update the view id
+ //
+ this.currentViewId = newView.getVid().getId();
+
+ // Keep a list of other members only for "exclude-self" RPC calls
+ //
+ this.jgotherMembers = (Vector)newView.getMembers().clone();
+ this.jgotherMembers.remove (channel.getLocalAddress());
+ this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
+ Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
+ logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
+ " (old view: " + this.members + " )");
+
+
+ // Save the previous view and make a copy of the new view
+ Vector oldMembers = this.members;
+
+ Vector newjgMembers = (Vector)newView.getMembers().clone();
+ Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
+ if (this.members == null)
+ {
+ // Initial viewAccepted
+ this.members = newMembers;
+ this.jgmembers = newjgMembers;
+ log.debug("ViewAccepted: initial members set");
+ return;
+ }
+ this.members = newMembers;
+ this.jgmembers = newjgMembers;
+
+ int difference = 0;
+ if (oldMembers == null)
+ difference = newMembers.size () - 1;
+ else
+ difference = newMembers.size () - oldMembers.size ();
+
+ if (isCurrentNodeCoordinator ())
+ clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
+ this.currentViewId + ", delta: " + difference + ") : " + this.members);
+ else
+ log.info("New cluster view for partition " + getPartitionName() + ": " +
+ this.currentViewId + " (" + this.members + " delta: " + difference + ")");
+
+ // Build a ViewChangeEvent for the asynch listeners
+ ViewChangeEvent event = new ViewChangeEvent();
+ event.viewId = currentViewId;
+ event.allMembers = translatedNewView;
+ event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
+ event.newMembers = getNewMembers(oldMembers, event.allMembers);
+ event.originatingGroups = null;
+ // if the new view occurs because of a merge, we first inform listeners of the merge
+ if(newView instanceof MergeView)
+ {
+ MergeView mergeView = (MergeView) newView;
+ event.originatingGroups = mergeView.getSubgroups();
+ }
+
+ log.debug("membership changed from " + this.members.size() + " to "
+ + event.allMembers.size());
+ // Put the view change to the asynch queue
+ this.asynchHandler.queueEvent(event);
+
+ // Broadcast the new view to the synchronous view change listeners
+ if (this.allowSyncListeners)
+ {
+ this.notifyListeners(synchListeners, event.viewId, event.allMembers,
+ event.deadMembers, event.newMembers, event.originatingGroups);
+ }
+ }
+ catch (Exception ex)
+ {
+ log.error("ViewAccepted failed", ex);
+ }
}
- public void setAllowSynchronousMembershipNotifications(boolean allowSync)
+ // HAPartition implementation ----------------------------------------------
+
+ public String getNodeName()
{
- this.allow_sync_events = allowSync;
+ return nodeName;
}
+
+ public String getPartitionName()
+ {
+ return (config == null ? null : config.getPartitionName());
+ }
+
+ public DistributedReplicantManager getDistributedReplicantManager()
+ {
+ return replicantManager;
+ }
+
+ public DistributedState getDistributedStateService()
+ {
+ return config.getDistributedState();
+ }
- protected ObjectName getObjectName(MBeanServer server, ObjectName name)
- throws MalformedObjectNameException
+ public long getCurrentViewId()
{
- return name == null ? OBJECT_NAME : name;
+ return this.currentViewId;
}
+
+ public Vector getCurrentView()
+ {
+ Vector result = new Vector (this.members.size());
+ for (int i = 0; i < members.size(); i++)
+ {
+ result.add( ((ClusterNode) members.elementAt(i)).getName() );
+ }
+ return result;
+ }
- public HAPartition getHAPartition ()
+ public ClusterNode[] getClusterNodes ()
{
- return this.partition;
+ ClusterNode[] nodes = new ClusterNode[this.members.size()];
+ this.members.toArray(nodes);
+ return nodes;
}
- /** Return the list of member nodes that built from the current view
- * @return A Vector Strings representing the host:port values of the nodes
- */
- public Vector getCurrentView()
+ public ClusterNode getClusterNode ()
{
- return partition.getCurrentView();
+ return me;
}
- public JChannelFactoryMBean getMultiplexer()
+ public boolean isCurrentNodeCoordinator ()
{
- if (multiplexer == null && cache != null && server != null)
- {
- multiplexer = (JChannelFactoryMBean) MBeanProxyExt.create(JChannelFactoryMBean.class,
- multiplexerObjectName);
- }
- return multiplexer;
+ if(this.members == null || this.members.size() == 0 || this.me == null)
+ return false;
+ return this.members.elementAt (0).equals (this.me);
}
- public String getMultiplexerStack()
+ // ***************************
+ // ***************************
+ // RPC multicast communication
+ // ***************************
+ // ***************************
+ //
+ public void registerRPCHandler(String objName, Object subscriber)
{
- return stackName;
+ rpcHandlers.put(objName, subscriber);
}
- public DistributedState getDistributedState()
+ public void unregisterRPCHandler(String objName, Object subscriber)
{
- return dsManager;
+ rpcHandlers.remove(objName);
}
+
- public void setDistributedState(DistributedState state)
+ /**
+ *
+ * @param objName
+ * @param methodName
+ * @param args
+ * @param excludeSelf
+ * @return
+ * @throws Exception
+ * @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
+ */
+ public ArrayList callMethodOnCluster(String objName, String methodName,
+ Object[] args, boolean excludeSelf) throws Exception
{
- this.dsManager = state;
+ return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
}
-
- public TreeCacheMBean getTreeCache()
+
+ /**
+ * This function is an abstraction of RpcDispatcher.
+ */
+ public ArrayList callMethodOnCluster(String objName, String methodName,
+ Object[] args, Class[] types, boolean excludeSelf) throws Exception
{
- return cache;
+ return callMethodOnCluster(objName, methodName, args, types, excludeSelf, getMethodCallTimeout());
}
-
- public void setTreeCache(TreeCacheMBean cache)
+
+
+ public ArrayList callMethodOnCluster(String objName, String methodName,
+ Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
{
- this.cache = cache;
- if (cache.getMultiplexerService() == null)
- throw new IllegalArgumentException("Cache not configured for a multiplexer");
- try
+ ArrayList rtn = new ArrayList();
+ MethodCall m=null;
+ RspList rsp = null;
+ boolean trace = log.isTraceEnabled();
+
+ if(types != null)
+ m=new MethodCall(objName + "." + methodName, args, types);
+ else
+ m=new MethodCall(objName + "." + methodName, args);
+
+ if (excludeSelf)
{
- multiplexerObjectName = new ObjectName(cache.getMultiplexerService());
+ if( trace )
+ {
+ log.trace("callMethodOnCluster(true), objName="+objName
+ +", methodName="+methodName+", members="+jgotherMembers);
+ }
+ rsp = dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
}
- catch (MalformedObjectNameException e)
+ else
{
- throw new IllegalArgumentException("Cache's MultiplexerService is invalid", e);
+ if( trace )
+ {
+ log.trace("callMethodOnCluster(false), objName="+objName
+ +", methodName="+methodName+", members="+members);
+ }
+ rsp = dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
}
- this.stackName = cache.getMultiplexerStack();
- }
- // ServiceMBeanSupport overrides ---------------------------------------------------
+ if (rsp != null)
+ {
+ for (int i = 0; i < rsp.size(); i++)
+ {
+ Object item = rsp.elementAt(i);
+ if (item instanceof Rsp)
+ {
+ Rsp response = (Rsp) item;
+ // Only include received responses
+ boolean wasReceived = response.wasReceived();
+ if( wasReceived == true )
+ {
+ item = response.getValue();
+ if (!(item instanceof NoHandlerForRPC))
+ rtn.add(item);
+ }
+ else if( trace )
+ log.trace("Ignoring non-received response: "+response);
+ }
+ else
+ {
+ if (!(item instanceof NoHandlerForRPC))
+ rtn.add(item);
+ else if( trace )
+ log.trace("Ignoring NoHandlerForRPC");
+ }
+ }
+ }
- public String getName()
+ return rtn;
+ }
+
+ /**
+ * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
+ * cluster.
+ * and is replaced
+ * @param objName
+ * @param methodName
+ * @param args
+ * @param types
+ * @param excludeSelf
+ * @return
+ * @throws Exception
+ */
+ public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
+ Object[] args, Class[] types,boolean excludeSelf) throws Exception
{
- return partitionName;
+ return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, getMethodCallTimeout());
}
+ /**
+ * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
+ * cluster.
+ * and is replaced
+ * @param objName
+ * @param methodName
+ * @param args
+ * @param types
+ * @param excludeSelf
+ * @param methodTimeout
+ * @return
+ * @throws Exception
+ */
+ public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
+ Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
+ {
+ ArrayList rtn = new ArrayList();
+ MethodCall m=null;
+ RspList rsp = null;
+ boolean trace = log.isTraceEnabled();
- protected void createService()
- throws Exception
- {
- log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
- " using stack " + getMultiplexerStack());
- channel = (JChannel) getMultiplexer().createMultiplexerChannel(getMultiplexerStack(), getPartitionName());
-
- if(use_debugger && debugger == null)
+ if(types != null)
+ m=new MethodCall(objName + "." + methodName, args, types);
+ else
+ m=new MethodCall(objName + "." + methodName, args);
+
+ if( trace )
{
- debugger=new Debugger(channel);
- debugger.start();
+ log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+ +", methodName="+methodName);
}
- channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
- channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
- log.debug("Creating HAPartition");
- partition = createPartition();
-
- // JBAS-2769 Init partition in create
- log.debug("Initializing ClusterPartition: " + partition);
- partition.init();
- log.debug("ClusterPartition initialized");
- }
+ // the first cluster view member is the coordinator
+ Vector coordinatorOnly = new Vector();
+ // If we are the coordinator, only call ourself if 'excludeSelf' is false
+ if (false == isCurrentNodeCoordinator () ||
+ false == excludeSelf)
+ coordinatorOnly.addElement(this.jgmembers.elementAt (0));
- protected void startService()
- throws Exception
- {
- // We push the independent name in the protocol stack before connecting to the cluster
- boolean pushNodeName = true;
- if (this.nodeName == null || "".equals(this.nodeName)) {
- IpAddress ourAddr = (IpAddress) channel.getLocalAddress();
- if (ourAddr != null)
+ rsp = dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
+
+ if (rsp != null)
+ {
+ for (int i = 0; i < rsp.size(); i++)
{
- byte[] additional_data = ourAddr.getAdditionalData();
- if (additional_data != null)
+ Object item = rsp.elementAt(i);
+ if (item instanceof Rsp)
{
- nodeName = new String(additional_data);
- pushNodeName = false;
+ Rsp response = (Rsp) item;
+ // Only include received responses
+ boolean wasReceived = response.wasReceived();
+ if( wasReceived == true )
+ {
+ item = response.getValue();
+ if (!(item instanceof NoHandlerForRPC))
+ rtn.add(item);
+ }
+ else if( trace )
+ log.trace("Ignoring non-received response: "+response);
}
+ else
+ {
+ if (!(item instanceof NoHandlerForRPC))
+ rtn.add(item);
+ else if( trace )
+ log.trace("Ignoring NoHandlerForRPC");
+ }
}
}
- if (this.nodeName == null || "".equals(this.nodeName)) {
- this.nodeName = generateUniqueNodeName();
+
+ return rtn;
+ }
+
+
+ /**
+ *
+ * @param objName
+ * @param methodName
+ * @param args
+ * @param excludeSelf
+ * @throws Exception
+ * @deprecated Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead
+ */
+ public void callAsynchMethodOnCluster(String objName, String methodName,
+ Object[] args, boolean excludeSelf)
+ throws Exception
+ {
+ callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
+ }
+
+ /**
+ * This function is an abstraction of RpcDispatcher for asynchronous messages
+ */
+ public void callAsynchMethodOnCluster(String objName, String methodName,
+ Object[] args, Class[] types, boolean excludeSelf) throws Exception
+ {
+ MethodCall m = null;
+ boolean trace = log.isTraceEnabled();
+
+ if(types != null)
+ m=new MethodCall(objName + "." + methodName, args, types);
+ else
+ m=new MethodCall(objName + "." + methodName, args);
+
+ if (excludeSelf)
+ {
+ if( trace )
+ {
+ log.trace("callAsynchMethodOnCluster(true), objName="+objName
+ +", methodName="+methodName+", members="+jgotherMembers);
+ }
+ dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
}
-
- if (pushNodeName)
+ else
{
- java.util.HashMap staticNodeName = new java.util.HashMap();
- staticNodeName.put("additional_data", this.nodeName.getBytes());
- this.channel.down(new Event(Event.CONFIG, staticNodeName));
+ if( trace )
+ {
+ log.trace("callAsynchMethodOnCluster(false), objName="+objName
+ +", methodName="+methodName+", members="+members);
+ }
+ dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
}
- channel.connect(partitionName);
-
- try
- {
- partition.startPartition();
+ }
- log.debug("Started ClusterPartition: " + partitionName);
+ // *************************
+ // *************************
+ // State transfer management
+ // *************************
+ // *************************
+ //
+ public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
+ {
+ stateHandlers.put(objectName, subscriber);
+ }
+
+ public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
+ {
+ stateHandlers.remove(objectName);
+ }
+
+ // *************************
+ // *************************
+ // Group Membership listeners
+ // *************************
+ // *************************
+ //
+ public void registerMembershipListener(HAMembershipListener listener)
+ {
+ boolean isAsynch = (this.allowSyncListeners == false)
+ || (listener instanceof AsynchHAMembershipListener)
+ || (listener instanceof AsynchHAMembershipExtendedListener);
+ if( isAsynch ) {
+ synchronized(this.asynchListeners) {
+ this.asynchListeners.add(listener);
+ }
}
- catch (Exception e)
- {
- log.debug("Caught exception after channel connected; closing channel -- " + e.getLocalizedMessage());
- channel.disconnect();
- throw e;
+ else {
+ synchronized(this.synchListeners) {
+ this.synchListeners.add(listener);
+ }
}
}
- protected void stopService() throws Exception
+ public void unregisterMembershipListener(HAMembershipListener listener)
{
- stopChannelDebugger();
- log.debug("Stopping ClusterPartition: " + partitionName);
- partition.closePartition();
- log.debug("Stopped ClusterPartition: " + partitionName);
+ boolean isAsynch = (this.allowSyncListeners == false)
+ || (listener instanceof AsynchHAMembershipListener)
+ || (listener instanceof AsynchHAMembershipExtendedListener);
+ if( isAsynch ) {
+ synchronized(this.asynchListeners) {
+ this.asynchListeners.remove(listener);
+ }
+ }
+ else {
+ synchronized(this.synchListeners) {
+ this.synchListeners.remove(listener);
+ }
+ }
}
+
+ public boolean getAllowSynchronousMembershipNotifications()
+ {
+ return allowSyncListeners;
+ }
- // NR 200505 : [JBCLUSTER-38] close partition just disconnect from channel
- // destroy close it.
- protected void destroyService() throws Exception
+ public void setAllowSynchronousMembershipNotifications(boolean allowSync)
+ {
+ this.allowSyncListeners = allowSync;
+ }
+
+ // AsynchEventHandler.AsynchEventProcessor -----------------------
+
+ public void processEvent(Object event)
{
- log.debug("Destroying ClusterPartition: " + partitionName);
- partition.destroyPartition();
- log.debug("Destroyed ClusterPartition: " + partitionName);
+ ViewChangeEvent vce = (ViewChangeEvent) event;
+ notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
+ vce.deadMembers, vce.newMembers, vce.originatingGroups);
+
}
- // --------------------------------------------------- Protected Methods
- /**
- * Extension point meant for test cases; instantiates the HAPartitionImpl.
- * Test cases can instantiate their own subclass of HAPartitionImpl.
- */
- protected HAPartitionImpl createPartition() throws Exception
+ // Public ------------------------------------------------------------------
+
+ public void setDistributedReplicantManager(DistributedReplicantManager drm)
{
- HAPartitionImpl result = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer());
- result.setStateTransferTimeout(this.state_transfer_timeout);
- result.setMethodCallTimeout(this.method_call_timeout);
- result.setDistributedState(dsManager);
- return result;
+ if (this.replicantManager != null && !(replicantManager == drm))
+ throw new IllegalStateException("DistributedReplicantManager already set");
+
+ this.replicantManager = drm;
}
+
+ // Protected -----------------------------------------------------
- protected String generateUniqueNodeName () throws Exception
+ protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception
{
- // we first try to find a simple meaningful name:
- // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
- // 2nd) "local-IP:JMV_GUID" otherwise
- // 3rd) return a fully GUID-based representation
- //
-
- // Before anything we determine the local host IP (and NOT name as this could be
- // resolved differently by other nodes...)
-
- // But use the specified node address for multi-homing
-
- String hostIP = null;
- InetAddress address = ServerConfigUtil.fixRemoteAddress(nodeAddress);
- if (address == null)
+ byte[] localUniqueName = this.localJGAddress.getAdditionalData();
+ if (localUniqueName == null)
{
- log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)");
- log.debug ("using a full GUID strategy");
- return new VMID().toString();
+ log.error("No additional information has been found in the JGroups address; " +
+ "make sure you are running with a correct version of JGroups and that the protocols " +
+ "you are using support 'additionalData' behaviour.");
+ throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; local node name is missing.");
}
- else
+
+ for (int i = 0; i < javaGroupIpAddresses.size(); i++)
{
- hostIP = address.getHostAddress();
+ IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
+ if (!address.equals(this.localJGAddress))
+ {
+ if (localUniqueName.equals(address.getAdditionalData()))
+ throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; another node (" + address + ") publicizing the same name was already there.");
+ }
}
+ }
- // 1st: is JNDI up and running?
+ /**
+ * Helper method that binds the partition in the JNDI tree.
+ * @param jndiName Name under which the object must be bound
+ * @param who Object to bind in JNDI
+ * @param classType Class type under which should appear the bound object
+ * @param ctx Naming context under which we bind the object
+ * @throws Exception Thrown if a naming exception occurs during binding
+ */
+ protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
+ {
+ // Ah ! This service isn't serializable, so we use a helper class
//
- try
+ NonSerializableFactory.bind(jndiName, who);
+ Name n = ctx.getNameParser("").parse(jndiName);
+ while (n.size () > 1)
{
- AttributeList al = this.server.getAttributes(NamingServiceMBean.OBJECT_NAME,
- new String[] {"State", "Port"});
-
- int status = ((Integer)((Attribute)al.get(0)).getValue()).intValue();
- if (status == ServiceMBean.STARTED)
+ String ctxName = n.get (0);
+ try
{
- // we can proceed with the JNDI trick!
- int port = ((Integer)((Attribute)al.get(1)).getValue()).intValue();
- return hostIP + ":" + port;
+ ctx = (Context)ctx.lookup (ctxName);
}
- else
+ catch (NameNotFoundException e)
{
- log.debug("JNDI has been found but the service wasn't started so we cannot " +
- "be entirely sure we are the only one that wants to use this PORT " +
- "as a GUID on this host.");
+ log.debug ("creating Subcontext " + ctxName);
+ ctx = ctx.createSubcontext (ctxName);
}
+ n = n.getSuffix (1);
+ }
- }
- catch (InstanceNotFoundException e)
+ // The helper class NonSerializableFactory uses address type nns, we go on to
+ // use the helper class to bind the service object in JNDI
+ //
+ StringRefAddr addr = new StringRefAddr("nns", jndiName);
+ Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
+ ctx.rebind (n.get (0), ref);
+ }
+
+ /**
+ * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
+ * Dead members are old - new members.
+ * @param oldMembers Vector of old members
+ * @param newMembers Vector of new members
+ * @return Vector of members that have died between the two views, can be empty.
+ */
+ protected Vector getDeadMembers(Vector oldMembers, Vector newMembers)
+ {
+ if(oldMembers == null) oldMembers=new Vector();
+ if(newMembers == null) newMembers=new Vector();
+ Vector dead=(Vector)oldMembers.clone();
+ dead.removeAll(newMembers);
+ log.debug("dead members: " + dead);
+ return dead;
+ }
+
+ /**
+ * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
+ * @param oldMembers Vector of old members
+ * @param allMembers Vector of new members
+ * @return Vector of members that have joined the partition between the two views
+ */
+ protected Vector getNewMembers(Vector oldMembers, Vector allMembers)
+ {
+ if(oldMembers == null) oldMembers=new Vector();
+ if(allMembers == null) allMembers=new Vector();
+ Vector newMembers=(Vector)allMembers.clone();
+ newMembers.removeAll(oldMembers);
+ return newMembers;
+ }
+
+ protected void notifyListeners(ArrayList theListeners, long viewID,
+ Vector allMembers, Vector deadMembers, Vector newMembers,
+ Vector originatingGroups)
+ {
+ log.debug("Begin notifyListeners, viewID: "+viewID);
+ synchronized(theListeners)
{
- log.debug ("JNDI not running here, cannot use this strategy to find a node GUID for the cluster");
+ // JBAS-3619 -- don't hold synch lock while notifying
+ theListeners = (ArrayList) theListeners.clone();
}
- catch (ReflectionException e)
+
+ for (int i = 0; i < theListeners.size(); i++)
{
- log.debug ("JNDI querying has returned an exception, cannot use this strategy to find a node GUID for the cluster");
+ HAMembershipListener aListener = null;
+ try
+ {
+ aListener = (HAMembershipListener) theListeners.get(i);
+ if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
+ {
+ HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
+ exListener.membershipChangedDuringMerge (deadMembers, newMembers,
+ allMembers, originatingGroups);
+ }
+ else
+ {
+ aListener.membershipChanged(deadMembers, newMembers, allMembers);
+ }
+ }
+ catch (Throwable e)
+ {
+ // a problem in a listener should not prevent other members to receive the new view
+ log.warn("HAMembershipListener callback failure: "+aListener, e);
+ }
}
-
- // 2nd: host-GUID strategy
- //
- String uid = new UID().toString();
- return hostIP + ":" + uid;
+
+ log.debug("End notifyListeners, viewID: "+viewID);
}
- protected JChannel getChannel()
+ protected Vector translateAddresses (Vector jgAddresses)
{
- return channel;
+ if (jgAddresses == null)
+ return null;
+
+ Vector result = new Vector (jgAddresses.size());
+ for (int i = 0; i < jgAddresses.size(); i++)
+ {
+ IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
+ result.add(new ClusterNode (addr));
+ }
+
+ return result;
}
- protected Debugger getDebugger()
+ public void logHistory (String message)
{
- return debugger;
+ try
+ {
+ history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
+ }
+ catch (Exception ignored){}
}
+ // --------------------------------------------------- ClusterPartitionMBean
+
public String showHistory ()
{
StringBuffer buff = new StringBuffer();
- Vector data = new Vector (this.partition.history);
+ Vector data = new Vector (this.history);
for (java.util.Iterator row = data.iterator(); row.hasNext();)
{
String info = (String) row.next();
@@ -455,7 +1293,7 @@
{
StringBuffer buff = new StringBuffer();
buff.append("<events>\n");
- Vector data = new Vector (this.partition.history);
+ Vector data = new Vector (this.history);
for (java.util.Iterator row = data.iterator(); row.hasNext();)
{
buff.append(" <event>\n ");
@@ -489,4 +1327,402 @@
debugger=null;
}
}
+
+ public Cache getClusteredCache()
+ {
+ return config.getClusteredCache();
+ }
+
+ public boolean getDeadlockDetection()
+ {
+ return config.getDeadlockDetection();
+ }
+
+ public HAPartition getHAPartition()
+ {
+ return this;
+ }
+
+ public String getJGroupsVersion()
+ {
+ return Version.description + "( " + Version.cvs + ")";
+ }
+
+ public JChannelFactoryMBean getMultiplexer()
+ {
+ return config.getMultiplexer();
+ }
+
+ public String getMultiplexerStack()
+ {
+ return config.getMultiplexerStack();
+ }
+
+ public InetAddress getNodeAddress()
+ {
+ return config.getNodeAddress();
+ }
+
+ public long getStateTransferTimeout() {
+ return config.getStateTransferTimeout();
+ }
+
+ public long getMethodCallTimeout() {
+ return config.getMethodCallTimeout();
+ }
+
+ public void setMethodCallTimeout(long timeout)
+ {
+ config.setMethodCallTimeout(timeout);
+ }
+
+ public void setStateTransferTimeout(long timeout)
+ {
+ config.setStateTransferTimeout(timeout);
+ }
+
+ public String getNodeUniqueId()
+ {
+ return config.getNodeUniqueId();
+ }
+
+ // Protected --------------------------------------------------------------
+
+ protected void configureUniqueId() throws Exception
+ {
+ // We push the independent name in the protocol stack
+ // before connecting to the cluster
+ boolean pushNodeName = true;
+ String uniqueId = config.getNodeUniqueId();
+ if (uniqueId == null || "".equals(uniqueId)) {
+ IpAddress ourAddr = (IpAddress) channel.getLocalAddress();
+ if (ourAddr != null)
+ {
+ byte[] additional_data = ourAddr.getAdditionalData();
+ if (additional_data != null)
+ {
+ uniqueId = new String(additional_data);
+ config.setNodeUniqueId(uniqueId);
+ pushNodeName = false;
+ }
+ }
+ }
+ if (uniqueId == null || "".equals(uniqueId)) {
+ uniqueId = generateUniqueId();
+ config.setNodeUniqueId(uniqueId);
+ }
+
+ if (pushNodeName)
+ {
+ java.util.HashMap staticNodeName = new java.util.HashMap();
+ staticNodeName.put("additional_data", uniqueId.getBytes());
+ this.channel.down(new Event(Event.CONFIG, staticNodeName));
+ }
+
+ config.setNodeUniqueId(uniqueId);
+ }
+
+ protected String generateUniqueId() throws Exception
+ {
+ // we first try to find a simple meaningful name:
+ // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
+ // 2nd) "local-IP:JMV_GUID" otherwise
+ // 3rd) return a fully GUID-based representation
+ //
+
+ // Before anything we determine the local host IP (and NOT name as this could be
+ // resolved differently by other nodes...)
+
+ // But use the specified node address for multi-homing
+
+ String hostIP = null;
+ InetAddress address = ServerConfigUtil.fixRemoteAddress(config.getNodeAddress());
+ if (address == null)
+ {
+ log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)");
+ log.debug ("using a full GUID strategy");
+ return new VMID().toString();
+ }
+ else
+ {
+ hostIP = address.getHostAddress();
+ }
+
+ // 1st: is JNDI up and running?
+ int namingPort = config.getNamingServicePort();
+ if (namingPort > 0)
+ {
+ return hostIP + ":" + namingPort;
+ }
+
+ // 2nd: host-GUID strategy
+ //
+ String uid = new UID().toString();
+ return hostIP + ":" + uid;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private class MessageListenerAdapter
+ implements ExtendedMessageListener
+ {
+
+ public void getState(OutputStream stream)
+ {
+ logHistory ("getState called on partition");
+
+ log.debug("getState called.");
+ try
+ {
+ getStateInternal(stream);
+ }
+ catch (Exception ex)
+ {
+ log.error("getState failed", ex);
+ }
+
+ }
+
+ public void getState(String state_id, OutputStream ostream)
+ {
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ }
+
+ public byte[] getState(String state_id)
+ {
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ }
+
+ public void setState(InputStream stream)
+ {
+ logHistory ("setState called on partition");
+ try
+ {
+ if (stream == null)
+ {
+ log.debug("transferred serviceState is null (may be first member in cluster)");
+ }
+ else
+ {
+ setStateInternal(stream);
+ }
+
+ isStateSet = true;
+ }
+ catch (Throwable t)
+ {
+ recordSetStateFailure(t);
+ }
+ finally
+ {
+ notifyStateTransferCompleted();
+ }
+ }
+
+ public byte[] getState()
+ {
+ logHistory ("getState called on partition");
+
+ log.debug("getState called.");
+ try
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+ getStateInternal(baos);
+ return baos.toByteArray();
+ }
+ catch (Exception ex)
+ {
+ log.error("getState failed", ex);
+ }
+ return null; // This will cause the receiver to get a "false" on the channel.getState() call
+ }
+
+ public void setState(String state_id, byte[] state)
+ {
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ }
+
+ public void setState(String state_id, InputStream istream)
+ {
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ }
+
+ public void receive(org.jgroups.Message msg)
+ { /* complete */}
+
+ public void setState(byte[] obj)
+ {
+ logHistory ("setState called on partition");
+ try
+ {
+ if (obj == null)
+ {
+ log.debug("transferred serviceState is null (may be first member in cluster)");
+ }
+ else
+ {
+ ByteArrayInputStream bais = new ByteArrayInputStream(obj);
+ setStateInternal(bais);
+ bais.close();
+ }
+
+ isStateSet = true;
+ }
+ catch (Throwable t)
+ {
+ recordSetStateFailure(t);
+ }
+ finally
+ {
+ notifyStateTransferCompleted();
+ }
+ }
+
+ }
+
+ /**
+ * A simple data class containing the view change event needed to
+ * notify the HAMembershipListeners
+ */
+ private static class ViewChangeEvent
+ {
+ long viewId;
+ Vector deadMembers;
+ Vector newMembers;
+ Vector allMembers;
+ Vector originatingGroups;
+ }
+
+ private static class MarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
+ {
+
+ public Object objectFromByteBuffer(byte[] buf) throws Exception
+ {
+ return ClusterPartition.objectFromByteBuffer(buf);
+ }
+
+ public byte[] objectToByteBuffer(Object obj) throws Exception
+ {
+ return ClusterPartition.objectToByteBuffer(obj);
+ }
+ }
+
+ /**
+ * Overrides RpcDispatcher.Handle so that we can dispatch to many
+ * different objects.
+ */
+ private class RpcHandler extends RpcDispatcher
+ {
+ private RpcHandler(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
+ boolean deadlock_detection)
+ {
+ super(channel, l, l2, server_obj, deadlock_detection);
+ }
+
+ /**
+ * Analyze the MethodCall contained in <code>req</code> to find the
+ * registered service object to invoke against, and then execute it
+ * against *that* object and return result.
+ *
+ * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
+ * @param req The org.jgroups. representation of the method invocation
+ * @return The serializable return value from the invocation
+ */
+ public Object handle(Message req)
+ {
+ Object body = null;
+ Object retval = null;
+ MethodCall method_call = null;
+ boolean trace = log.isTraceEnabled();
+
+ if( trace )
+ log.trace("Partition " + getPartitionName() + " received msg");
+ if(req == null || req.getBuffer() == null)
+ {
+ log.warn("message or message buffer is null !");
+ return null;
+ }
+
+ try
+ {
+ body = objectFromByteBuffer(req.getBuffer());
+ }
+ catch(Exception e)
+ {
+ log.warn("failed unserializing message buffer (msg=" + req + ")", e);
+ return null;
+ }
+
+ if(body == null || !(body instanceof MethodCall))
+ {
+ log.warn("message does not contain a MethodCall object !");
+ return null;
+ }
+
+ // get method call informations
+ //
+ method_call = (MethodCall)body;
+ String methodName = method_call.getName();
+
+ if( trace )
+ log.trace("pre methodName: " + methodName);
+
+ int idx = methodName.lastIndexOf('.');
+ String handlerName = methodName.substring(0, idx);
+ String newMethodName = methodName.substring(idx + 1);
+
+ if( trace )
+ {
+ log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
+ log.trace("Handle: " + methodName);
+ }
+
+ // prepare method call
+ method_call.setName(newMethodName);
+ Object handler = rpcHandlers.get(handlerName);
+ if (handler == null)
+ {
+ if( trace )
+ log.debug("No rpc handler registered under: "+handlerName);
+ return new NoHandlerForRPC();
+ }
+
+ /* Invoke it and just return any exception with trace level logging of
+ the exception. The exception semantics of a group rpc call are weak as
+ the return value may be a normal return value or the exception thrown.
+ */
+ try
+ {
+ retval = method_call.invoke(handler);
+ if( trace )
+ log.trace("rpc call return value: "+retval);
+ }
+ catch (Throwable t)
+ {
+ if( trace )
+ log.trace("rpc call threw exception", t);
+ retval = t;
+ }
+
+ return retval;
+ }
+
+ }
+
+ private void setupLoggers(String partitionName)
+ {
+ if (partitionName == null)
+ {
+ this.log = Logger.getLogger(HAPartition.class.getName());
+ this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle");
+ }
+ else
+ {
+ this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
+ this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
+ }
+ }
+
}
More information about the jboss-cvs-commits
mailing list