[jboss-cvs] JBossAS SVN: r86131 - in trunk/cluster/src/main/org/jboss/ha: timestamp and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 19 17:46:15 EDT 2009
Author: bstansberry at jboss.com
Date: 2009-03-19 17:46:14 -0400 (Thu, 19 Mar 2009)
New Revision: 86131
Added:
trunk/cluster/src/main/org/jboss/ha/timestamp/
trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancy.java
trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyObserver.java
trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyService.java
Log:
[JBAS-5552] Add timestamp discrepancy tracking
Added: trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancy.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancy.java (rev 0)
+++ trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancy.java 2009-03-19 21:46:14 UTC (rev 86131)
@@ -0,0 +1,248 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.timestamp;
+
+/**
+ * Provides information on possible system timestamp discrepancies between
+ * a remote node and the local node.
+ * <p>
+ * <strong>Usage:</strong> The local node should record the current system
+ * time and then request the current system time from the remote node. The
+ * local node should then record the current system time when the response
+ * is received from the remote node. The three values are then passed to
+ * this class' constructor.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision: $
+ */
+public class TimestampDiscrepancy
+{
+ /** Fake discrepancy that indicates no system clock difference */
+ public static final TimestampDiscrepancy NO_DISCREPANCY;
+ static
+ {
+ long now = System.currentTimeMillis();
+ NO_DISCREPANCY = new TimestampDiscrepancy(now, now, now);
+ }
+
+ private final long fastRequestLimit;
+ private final long fastResponseLimit;
+ private final long minDiscrepancy;
+ private final long maxDiscrepancy;
+ private final long remoteTimestamp;
+ private final long requestRoundtripTime;
+
+ /**
+ * Create a new TimestampDiscrepancy using the value returned by a remote
+ * request plus the local timestamps for when the request started and
+ * completed.
+ *
+ * @param remoteTimestamp the timestamp returned by the remote node
+ * @param requestSent local timestamp immediately before the timestamp request was made
+ * @param responseReceived local timestamp immediately after receipt of response
+ * to the timestamp request
+ */
+ public TimestampDiscrepancy(long remoteTimestamp, long requestSent, long responseReceived)
+ {
+ if (responseReceived < requestSent)
+ {
+ throw new IllegalArgumentException("Apparent time travel: " +
+ responseReceived + " is less than " + requestSent);
+ }
+
+ // Limit 1: assume the remote node responded immediately when the
+ // request was sent, i.e. 0 time to transmit request
+ fastRequestLimit = remoteTimestamp - requestSent;
+
+ // Limit 2: assume the remote node responded immediately before the
+ // response was received, i.e. 0 time to transmit response
+ fastResponseLimit = responseReceived - remoteTimestamp;
+
+ this.minDiscrepancy = Math.min(fastRequestLimit, fastResponseLimit);
+ this.maxDiscrepancy = Math.max(fastRequestLimit, fastResponseLimit);
+
+ this.remoteTimestamp = remoteTimestamp;
+ this.requestRoundtripTime = responseReceived - requestSent;
+ }
+
+ /**
+ * Generates a synthetic TimestampDiscrepancy based on a value provided
+ * by another node adjusted for the discrepancy between this node and
+ * the
+
+ private TimestampDiscrepancy(long now)
+ {
+ this(now, now, now);
+ } node that provided the base value. Used to create an estimated
+ * discrepancy between this node and a node that can no longer be contacted
+ * directly (e.g. because it has shut down). Necessarily less accurate
+ * than a TimestampDiscrepancy constructed via the normal method.
+ *
+ * @param base
+ * @param intermediary
+ */
+ public TimestampDiscrepancy(TimestampDiscrepancy base, TimestampDiscrepancy intermediary)
+ {
+ if (base == null)
+ {
+ throw new IllegalArgumentException("Null base");
+ }
+ if (intermediary == null)
+ {
+ throw new IllegalArgumentException("Null intermediary");
+ }
+
+ fastRequestLimit = base.fastRequestLimit + intermediary.fastRequestLimit;
+ fastResponseLimit = base.fastResponseLimit + intermediary.fastResponseLimit;
+
+ this.minDiscrepancy = Math.min(fastRequestLimit, fastResponseLimit);
+ this.maxDiscrepancy = Math.max(fastRequestLimit, fastResponseLimit);
+
+ this.remoteTimestamp = base.remoteTimestamp;
+ this.requestRoundtripTime = base.requestRoundtripTime + intermediary.requestRoundtripTime;
+ }
+
+ /**
+ * Gets the timestamp that the remote node returned.
+ *
+ * @return the remote timestamp.
+ */
+ public long getRemoteTimestamp()
+ {
+ return remoteTimestamp;
+ }
+
+ /**
+ * Minimum offset that would be applied to a local timestamp to obtain
+ * the timestamp on the remote system of a simultaneously occurring event.
+ *
+ * @return the minimum discrepancy
+ */
+ public long getMinDiscrepancy()
+ {
+ return minDiscrepancy;
+ }
+
+ /**
+ * Maximum offset that would be applied to a remote timestamp to obtain
+ * the timestamp on the local system of a simultaneously occurring event.
+ *
+ * @return the maximum discrepancy
+ */
+ public long getMaxDiscrepancy()
+ {
+ return maxDiscrepancy;
+ }
+
+ /**
+ * Gets the higher of the absolute value of {@link #getMinDiscrepancy()}
+ * or the absolute value of {@link #getMaxDiscrepancy()}.
+ */
+ public long getAbsoluteMaxDiscrepancy()
+ {
+ return Math.max(Math.abs(minDiscrepancy), Math.abs(maxDiscrepancy));
+ }
+
+ /**
+ * Gets the difference between {@link #getMinDiscrepancy()} and
+ * {@link #getMaxDiscrepancy()}
+ *
+ * @return
+ */
+ public long getDiscrepancyRange()
+ {
+ return maxDiscrepancy - minDiscrepancy;
+ }
+
+ /**
+ * Gets a rough estimate of the timestamp discrepancy between the systems.
+ *
+ * @return the average between {@link #getMinDiscrepancy()} and
+ * {@link #getMaxDiscrepancy()}
+ */
+ public long getEstimatedDiscrepancy()
+ {
+ return (maxDiscrepancy + minDiscrepancy) / 2;
+ }
+
+ /**
+ * Gets the number of ms it took for the remote request that returned
+ * {@link #getRemoteTimestamp() the remote timestamp}. The longer the
+ * request took to execute, the less accurate the timestamp discrepancy.
+ *
+ * @return number of ms it took to execute the timestamp request
+ */
+ public long getRequestRoundtripTime()
+ {
+ return requestRoundtripTime;
+ }
+
+ /**
+ * Gets the minimum value for a local timestamp that would correspond
+ * to the remote timestamp.
+ *
+ * @param remoteTimestamp the remote timestamp
+ * @return the equivalent local timestamp
+ */
+ public long getMinLocalTimestamp(long remoteTimestamp)
+ {
+ return remoteTimestamp - minDiscrepancy;
+ }
+
+ /**
+ * Gets the maximum value for a local timestamp that would correspond
+ * to the remote timestamp.
+ *
+ * @param remoteTimestamp the remote timestamp
+ * @return the equivalent local timestamp
+ */
+ public long getMaxLocalTimestamp(long remoteTimestamp)
+ {
+ return remoteTimestamp + maxDiscrepancy;
+ }
+
+ /**
+ * Gets the minimum value for a remote timestamp that would correspond
+ * to the local timestamp.
+ *
+ * @param localTimestamp the local timestamp
+ * @return the equivalent remote timestamp
+ */
+ public long getMinRemoteTimestamp(long localTimestamp)
+ {
+ return localTimestamp + minDiscrepancy;
+ }
+
+ /**
+ * Gets the maximum value for a remote timestamp that would correspond
+ * to the local timestamp.
+ *
+ * @param localTimestamp the local timestamp
+ * @return the equivalent remote timestamp
+ */
+ public long getMaxRemoteTimestamp(long localTimestamp)
+ {
+ return localTimestamp - maxDiscrepancy;
+ }
+}
Property changes on: trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancy.java
___________________________________________________________________
Name: svn:keywords
+
Added: trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyObserver.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyObserver.java (rev 0)
+++ trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyObserver.java 2009-03-19 21:46:14 UTC (rev 86131)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.timestamp;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+
+/**
+ * An object that observes changes in a {@link TimestampDiscrepancyService}.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision: $
+ */
+public interface TimestampDiscrepancyObserver
+{
+ /**
+ * Notification from {@link TimestampDiscrepancyService} when it has
+ * changed the {@link TimestampDiscrepancy} associated with a particular
+ * node.
+ *
+ * @param node the node
+ * @param discrepancy the new discrepancy
+ */
+ void timestampDiscrepancyChanged(ClusterNode node, TimestampDiscrepancy discrepancy);
+
+ /**
+ * Callback allowing the observer to veto the removal by the
+ * {@link TimestampDiscrepancyService} of discrepancy data for a
+ * node that is no longer active in the cluster. Allows the observer to
+ * request that data for historically relevant nodes be retained.
+ *
+ * @param dead the node
+ * @param lastChecked the time (in ms since the epoch) the caller was
+ * last able to obtain timestamp information from the
+ * caller
+ *
+ * @return <code>true</code> if the data can be removed, <code>false</code>
+ * if it must be retained.
+ */
+ boolean canRemoveDeadEntry(ClusterNode dead, long lastChecked);
+}
Property changes on: trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyObserver.java
___________________________________________________________________
Name: svn:keywords
+
Added: trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyService.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyService.java (rev 0)
+++ trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyService.java 2009-03-19 21:46:14 UTC (rev 86131)
@@ -0,0 +1,1089 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.timestamp;
+
+import java.io.Serializable;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.log4j.Logger;
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
+import org.jboss.ha.framework.interfaces.HAPartition;
+import org.jboss.ha.framework.interfaces.DistributedReplicantManager.ReplicantListener;
+import org.jboss.kernel.spi.dependency.KernelControllerContext;
+import org.jboss.kernel.spi.dependency.KernelControllerContextAware;
+import org.jboss.util.threadpool.ThreadPool;
+
+/**
+ * Service that tracks {@link TimestampDiscrepancy} information for current
+ * and past members of the cluster.
+ * <p>
+ * Discrepancy information is not persisted, so no knowledge of past members
+ * is retained across a cluster restart.
+ * </p>
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision: $
+ */
+public class TimestampDiscrepancyService
+ implements KernelControllerContextAware
+{
+ private static final Logger log = Logger.getLogger(TimestampDiscrepancyService.class);
+
+ private static final Class<?>[] PUSH_DISCREPANCY_MAP_TYPES = new Class[]{ RemoteDiscrepancies.class };
+ private static final Class<?>[] NULL_TYPES = new Class[0];
+ private static final Object[] NULL_ARGS = new Object[0];
+
+ private String serviceHAName;
+ private final RpcHandler rpcTarget = new RpcHandler();
+ private final DRMListener drmListener = new DRMListener();
+ private final TreeMap<Server, TimestampDiscrepancy> discrepancies = new TreeMap<Server, TimestampDiscrepancy>();
+ private final TreeSet<Server> liveServers = new TreeSet<Server>();
+ private final Map<String, ClusterNode> nodesByName = new ConcurrentHashMap<String, ClusterNode>();
+ private int maxDeadServers = 100;
+ private long minDeadServerTime = 7 * 24 * 60 * 60 * 1000; // 30 days
+ private HAPartition partition;
+ private long lastStatusCheck;
+ private long minStatusCheckFrequency = 30* 60 * 60 * 1000; // 20 mins
+ private volatile boolean statusCheckRequired = true;
+ private long lastPurge;
+ private long minPurgeFrequency = 60 * 60 * 1000; // one hour
+ private final List<TimestampDiscrepancyObserver> observers = new CopyOnWriteArrayList<TimestampDiscrepancyObserver>();
+ private boolean coordinator;
+ private ThreadPool threadPool;
+ private final Map<ClusterNode, Map<Server, TimestampDiscrepancy>> unresolvedRemoteDependencies = new HashMap<ClusterNode, Map<Server, TimestampDiscrepancy>>();
+ private boolean deadMembersKnown = false;
+
+ // ------------------------------------------------------------- Properties
+
+ public HAPartition getPartition()
+ {
+ return partition;
+ }
+
+ public void setPartition(HAPartition partition)
+ {
+ this.partition = partition;
+ }
+
+ /**
+ * Gets the name under which we register ourself with the HAPartition.
+ *
+ * @return the name
+ */
+ public String getServiceHAName()
+ {
+ return serviceHAName;
+ }
+
+ /**
+ * Sets the name under which we register ourself with the HAPartition.
+ *
+ * @param serviceHAName the name
+ */
+ public void setServiceHAName(String serviceHAName)
+ {
+ this.serviceHAName = serviceHAName;
+ }
+
+ /**
+ * Gets the maximum number of records for non-active servers we'll
+ * retain in our @{link {@link #getTimestampDiscrepancies(boolean) discrepancies map}
+ * after which we can attempt to remove non-active servers who we heard
+ * from less recently than {@link #getMinDeadServerTime()}.
+ * <p>
+ * An active server is one that is include in the most recent
+ * view propagated by the
+ * {@link HAPartition#getDistributedReplicantManager() HAPartition's DRM}.
+ * </p>
+ *
+ * @return the max number of servers
+ */
+ public int getMaxDeadServers()
+ {
+ return maxDeadServers;
+ }
+
+ public void setMaxDeadServers(int maxDeadServers)
+ {
+ this.maxDeadServers = maxDeadServers;
+ }
+
+ /**
+ * Gets the minimum period of time since last checking with the server
+ * that we'll retain a non-active server in our
+ * @{link {@link #getTimestampDiscrepancies(boolean) discrepancies map}.
+ * <p>
+ * An active server is one that is included in the most recent
+ * view propagated by the
+ * {@link HAPartition#getDistributedReplicantManager() HAPartition's DRM}.
+ * </p>
+ *
+ * @return the minimum period of time, in ms
+ *
+ * @see #getMaxDeadServers()
+ */
+ public long getMinDeadServerTime()
+ {
+ return minDeadServerTime;
+ }
+
+ public void setMinDeadServerTime(long minDeadServerTime)
+ {
+ this.minDeadServerTime = minDeadServerTime;
+ }
+
+ /**
+ * Gets the minimum period between periodic status checks. A status check
+ * is a request to the cluster for each server's local timestamp, used
+ * to build the {@link TimestampDiscrepancy} for that server.
+ * <p>
+ * A status check can occur more frequently than this value if the service
+ * determines it is necessary, for example following a view change.
+ * </p>
+ * <p>
+ * The default value is 20 minutes.
+ * </p>
+ *
+ * @return the minimum frequency in ms
+ */
+ public long getMinStatusCheckFrequency()
+ {
+ return minStatusCheckFrequency;
+ }
+
+ public void setMinStatusCheckFrequency(long minStatusCheckFrequency)
+ {
+ this.minStatusCheckFrequency = minStatusCheckFrequency;
+ }
+
+ /**
+ * Gets the minimum period between attempts to purge non-active members
+ * from the @{link {@link #getTimestampDiscrepancies(boolean) discrepancies map}.
+ * <p>
+ * Default is one hour
+ * </p>
+ *
+ * @return the minimum frequency in ms
+ *
+ * @see #getMaxDeadServers()
+ * @see #getMinDeadServerTime()
+ */
+ public long getMinPurgeFrequency()
+ {
+ return minPurgeFrequency;
+ }
+
+ public void setMinPurgeFrequency(long minPurgeFrequency)
+ {
+ this.minPurgeFrequency = minPurgeFrequency;
+ }
+
+ /**
+ * Gets the time of the last request to the cluster for timestamps.
+ *
+ * @return the time of the last request, in ms since the epoch
+ */
+ public long getLastStatusCheck()
+ {
+ return lastStatusCheck;
+ }
+
+ /**
+ * Gets whether an event has occurred (e.g. a view change) that requires
+ * a status check.
+ *
+ * @return <code>true</code> if a check is required
+ */
+ public boolean isStatusCheckRequired()
+ {
+ return statusCheckRequired;
+ }
+
+ /**
+ * Gets the time of the last attempt to purge non-active members from
+ * the @{link {@link #getTimestampDiscrepancies(boolean) discrepancies map}.
+ *
+ * @return the time of the last purge, in ms since the epoch
+ */
+ public long getLastPurge()
+ {
+ return lastPurge;
+ }
+
+ /**
+ * Injects a thread pool for use in dispatching asynchronous tasks. If
+ * no thread pool is injected, threads will be spawned to handle such
+ * tasks.
+ * <p>
+ * Asynchronous tasks are generally associated with view changes.
+ * </p>
+ *
+ * @param threadPool the thread pool
+ */
+ public void setThreadPool(ThreadPool threadPool)
+ {
+ this.threadPool = threadPool;
+ }
+
+ // ----------------------------------------------------------------- Public
+
+ /**
+ * Gets the map of TimestampDiscrepancy data tracked by this service.
+ *
+ * @param allowStatusCheck is calling into the cluster to update the
+ * discrepancies map before returning allowed?
+ *
+ * @return the map. Will not return <code>null</code>
+ */
+ public Map<ClusterNode, TimestampDiscrepancy> getTimestampDiscrepancies(boolean allowStatusCheck)
+ {
+ if (allowStatusCheck)
+ {
+ statusCheck();
+ }
+
+ purgeDeadEntries();
+
+ synchronized (discrepancies)
+ {
+ HashMap<ClusterNode, TimestampDiscrepancy> result = new HashMap<ClusterNode, TimestampDiscrepancy>();
+ for (Map.Entry<Server, TimestampDiscrepancy> entry : discrepancies.entrySet())
+ {
+ result.put(entry.getKey().getNode(), entry.getValue());
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Gets the TimestampDiscrepancy data associated with a particular node.
+ *
+ * @param node the node
+ * @param allowStatusCheck is calling into the cluster to update the
+ * discrepancies map before returning allowed?
+ *
+ * @return the discrepancy data. Will return <code>null</code> if no data
+ * for <code>node</code> is stored.
+ */
+ public TimestampDiscrepancy getTimestampDiscrepancy(ClusterNode node, boolean allowStatusCheck)
+ {
+ if (allowStatusCheck)
+ {
+ statusCheck();
+ }
+
+ purgeDeadEntries();
+
+ synchronized (discrepancies)
+ {
+ return discrepancies.get(new Server(node));
+ }
+ }
+
+ /**
+ * Gets the TimestampDiscrepancy data associated with a particular node.
+ *
+ * @param node the name of the node
+ * @param allowStatusCheck is calling into the cluster to update the
+ * discrepancies map before returning allowed?
+ *
+ * @return the discrepancy data. Will return <code>null</code> if no data
+ * for <code>node</code> is stored.
+ */
+ public TimestampDiscrepancy getTimestampDiscrepancy(String nodeName, boolean allowStatusCheck)
+ {
+ ClusterNode node = nodesByName.get(nodeName);
+ return node == null ? null : getTimestampDiscrepancy(node, allowStatusCheck);
+ }
+
+ /**
+ * Gets whether the particular node is one of the servers this
+ * service regards as active (i.e. part of the cluster topology for
+ * the service).
+ *
+ * @param node the node
+ * @return <code>true</code> if the node is active, false otherwise
+ */
+ public boolean isServerActive(ClusterNode node)
+ {
+ synchronized (liveServers)
+ {
+ return liveServers.contains(new Server(node));
+ }
+ }
+
+ /**
+ * Bring the service into active operation.
+ *
+ * @throws Exception
+ */
+ public void start() throws Exception
+ {
+ partition.registerRPCHandler(getServiceHAName(), rpcTarget);
+
+ DistributedReplicantManager drm = partition.getDistributedReplicantManager();
+ drm.add(getServiceHAName(), partition.getClusterNode());
+ coordinator = drm.isMasterReplica(getServiceHAName());
+ drm.registerListener(getServiceHAName(), drmListener);
+
+ statusCheck();
+ }
+
+ /**
+ * Remove the service from active operation.
+ *
+ * @throws Exception
+ */
+ public void stop() throws Exception
+ {
+ DistributedReplicantManager drm = partition.getDistributedReplicantManager();
+ drm.unregisterListener(getServiceHAName(), drmListener);
+ drm.remove(getServiceHAName());
+
+ coordinator = false;
+
+ partition.unregisterRPCHandler(getServiceHAName(), rpcTarget);
+
+ synchronized (liveServers)
+ {
+ liveServers.clear();
+ }
+
+ synchronized (unresolvedRemoteDependencies)
+ {
+ unresolvedRemoteDependencies.clear();
+ }
+ }
+
+ /**
+ * Register a {@link TimestampDiscrepancyObserver} with this service.
+ *
+ * @param observer the observer
+ */
+ public void registerObserver(TimestampDiscrepancyObserver observer)
+ {
+ if (observer != null)
+ {
+ observers.add(observer);
+ }
+ }
+
+ /**
+ * Unregister a {@link TimestampDiscrepancyObserver} with this service.
+ *
+ * @param observer the observer
+ */
+ public void unregisterObserver(TimestampDiscrepancyObserver observer)
+ {
+ if (observer != null)
+ {
+ observers.remove(observer);
+ }
+ }
+
+ // -------------------------------------------- KernelControllerContextAware
+
+ /**
+ * Registers the context name as the {@link #setServiceHAName(String) serviceHAName}
+ * if it is not already set.
+ *
+ * {@inheritDoc}
+ */
+ public void setKernelControllerContext(KernelControllerContext context) throws Exception
+ {
+ if (context != null && serviceHAName == null)
+ {
+ setServiceHAName(context.getName().toString());
+ }
+ }
+
+ /**
+ * This implementation is a no-op.
+ *
+ * {@inheritDoc}
+ */
+ public void unsetKernelControllerContext(KernelControllerContext context) throws Exception
+ {
+ // no-op
+ }
+
+// // --------------------------------------------------- HAMembershipListener
+//
+// @SuppressWarnings("unchecked")
+// public synchronized void membershipChangedDuringMerge(Vector deadMembers, Vector newMembers, Vector allMembers,
+// Vector originatingGroups)
+// {
+// boolean wasCoordinator = coordinator;
+//
+// membershipChanged(deadMembers, newMembers, allMembers);
+//
+// if (wasCoordinator && !coordinator)
+// {
+// // There's been a merge and we are no longer coordinator. Asynchronously
+// // tell the rest of the cluster about our knowledge of timestamps
+// Runnable r = getDiscrepancyPushTask();
+// executeRunnable(r, getServiceHAName() + "-DiscrepancyMapPusher");
+// }
+// else if (coordinator)
+// {
+// // Other nodes may depend on us having timestamp knowledge, so be
+// // aggressive about getting it -- don't wait for a request.
+// // We also need to tell whoever merged with us about our
+// // knowledge of timestamps
+// final Runnable push = getDiscrepancyPushTask();
+// Runnable r = new Runnable()
+// {
+// public void run()
+// {
+// statusCheck();
+// push.run();
+// }
+// };
+//
+// executeRunnable(r, getServiceHAName() + "-AsyncStatusCheck");
+// }
+// }
+//
+// @SuppressWarnings("unchecked")
+// public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
+// {
+// synchronized (liveServers)
+// {
+// if (deadMembers != null)
+// {
+// for (Object dead : deadMembers)
+// {
+// if (dead instanceof ClusterNode)
+// {
+// liveServers.remove(new Server((ClusterNode) dead));
+// }
+// }
+// }
+// }
+//
+// if (newMembers != null && newMembers.size() > 0)
+// {
+// this.statusCheckRequired = true;
+// }
+//
+// coordinator = this.partition.getClusterNode().equals(allMembers.get(0));
+// }
+
+ // ----------------------------------------------------------------- Private
+
+ private synchronized void statusCheck()
+ {
+ if (statusCheckRequired || (System.currentTimeMillis() - lastStatusCheck > minStatusCheckFrequency))
+ {
+ try
+ {
+ long requestSent = System.currentTimeMillis();
+ @SuppressWarnings("unchecked")
+ List rsps = partition.callMethodOnCluster(getServiceHAName(), "getLocalTimestamp", NULL_ARGS, NULL_TYPES, true);
+ long responseReceived = System.currentTimeMillis();
+ long mcastTime = responseReceived - requestSent;
+
+ Map<ClusterNode, TimestampDiscrepancy> rspBySender = new HashMap<ClusterNode, TimestampDiscrepancy>();
+ if (rsps != null)
+ {
+ for (Object rsp : rsps)
+ {
+ if (rsp instanceof TimestampResponse)
+ {
+ TimestampResponse tr = (TimestampResponse) rsp;
+ rspBySender.put(tr.getResponder(),
+ new TimestampDiscrepancy(tr.getTimestamp(), requestSent, responseReceived));
+ }
+ else if (rsp != null)
+ {
+ log.warn("Unknown status check response " + rsp);
+ }
+ }
+ }
+
+ if (mcastTime > 250)
+ {
+ // Multicasting the RPC introduced a high possible error;
+ // see if multiple unicast is better
+ List<ClusterNode> nodes = partition.getDistributedReplicantManager().lookupReplicantsNodes(getServiceHAName());
+ for (ClusterNode node : nodes)
+ {
+ if (node.equals(this.partition.getClusterNode()))
+ {
+ continue;
+ }
+
+ try
+ {
+ long singleRequestSent = System.currentTimeMillis();
+ Object rsp = partition.callMethodOnNode(getServiceHAName(), "getLocalTimestamp", NULL_ARGS, NULL_TYPES, mcastTime, node);
+ long singleResponseReceived = System.currentTimeMillis();
+ long elapsed = singleResponseReceived - singleRequestSent;
+ if (elapsed < mcastTime) // better result than multicast
+ {
+ if (rsp instanceof TimestampResponse)
+ {
+ TimestampResponse tr = (TimestampResponse) rsp;
+ rspBySender.put(tr.getResponder(),
+ new TimestampDiscrepancy(tr.getTimestamp(), singleRequestSent, singleResponseReceived));
+ }
+ else if (rsp != null)
+ {
+ log.warn("Unknown status check response " + rsp);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ if (e instanceof Error)
+ {
+ throw (Error) e;
+ }
+ log.error("Caught exception requesting timestamp from node " + node, e);
+ }
+ }
+ }
+
+ synchronized (discrepancies)
+ {
+ synchronized (liveServers)
+ {
+ for (Map.Entry<ClusterNode, TimestampDiscrepancy> entry : rspBySender.entrySet())
+ {
+ Server s = new Server(entry.getKey());
+ TimestampDiscrepancy latest = entry.getValue();
+ TimestampDiscrepancy existing = discrepancies.get(s);
+ if (existing == null
+ || latest.getDiscrepancyRange() <= existing.getDiscrepancyRange()
+ || liveServers.contains(s) == false)
+ {
+ updateTimestampDiscrepancy(s, latest, true);
+ }
+ else
+ {
+ // We already have an entry for this live server with a
+ // narrower range that we'd prefer to keep
+ // If the new entry doesn't fit within the parameters
+ // of the old, we have to replace the old
+ if (existing.getMinDiscrepancy() < latest.getMinDiscrepancy()
+ || existing.getMaxDiscrepancy() > latest.getMaxDiscrepancy())
+ {
+ updateTimestampDiscrepancy(s, latest, true);
+ }
+ else
+ {
+ // Re-store existing, but with the new key
+ updateTimestampDiscrepancy(s, existing, true);
+ }
+ }
+ }
+ }
+ }
+
+ statusCheckRequired = false;
+ lastStatusCheck = System.currentTimeMillis();
+ }
+ catch (Exception e)
+ {
+ log.error("Caught exception in status check", e);
+ }
+ }
+
+ getDeadMembersFromCoordinator();
+ }
+
+ private void getDeadMembersFromCoordinator()
+ {
+ if (!deadMembersKnown)
+ {
+ try
+ {
+ DistributedReplicantManager drm = partition.getDistributedReplicantManager();
+ List<ClusterNode> nodes = drm.lookupReplicantsNodes(getServiceHAName());
+ ClusterNode coord = (nodes != null && nodes.size() > 0 ? nodes.get(0) : null);
+ if (coord != null && coord.equals(partition.getClusterNode()) == false)
+ {
+ Object rsp = partition.callMethodOnNode(getServiceHAName(), "getDiscrepancies", NULL_ARGS, NULL_TYPES, 60000, coord);
+ if (rsp instanceof RemoteDiscrepancies)
+ {
+ handleRemoteDiscrepancies((RemoteDiscrepancies) rsp);
+ deadMembersKnown = true;
+ }
+ else
+ {
+ log.error("No valid response from coordinator: " + rsp);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ if (e instanceof Error)
+ {
+ throw (Error) e;
+ }
+ log.error("Caught exception pulling dead member records from coordinator", e);
+ }
+ }
+ }
+
+ /**
+ * Notification from the DRMListener of a service topology change.
+ */
+ private void replicantsChanged(List<ClusterNode> newReplicants, boolean merge)
+ {
+ boolean wasCoordinator = coordinator;
+
+ Set<Server> newServers = new HashSet<Server>();
+ for (Object replicant : newReplicants)
+ {
+ newServers.add(new Server((ClusterNode) replicant));
+ }
+
+ boolean hasAdds = false;
+
+ synchronized (liveServers)
+ {
+ for (Server s : newServers)
+ {
+ if (liveServers.contains(s) == false)
+ {
+ liveServers.add(s);
+ hasAdds = true;
+ }
+ }
+
+ if (liveServers.size() != newServers.size())
+ {
+ for (Iterator<Server> it = liveServers.iterator(); it.hasNext(); )
+ {
+ Server s = it.next();
+ if (newServers.contains(s) == false)
+ {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ if (hasAdds)
+ {
+ statusCheckRequired = true;
+ }
+
+ DistributedReplicantManager drm = partition.getDistributedReplicantManager();
+ this.coordinator = drm.isMasterReplica(getServiceHAName());
+
+ if (wasCoordinator && !coordinator)
+ {
+ // There's been a merge and we are no longer coordinator. Asynchronously
+ // tell the rest of the cluster about our knowledge of timestamps
+ Runnable r = getDiscrepancyPushTask();
+ executeRunnable(r, getServiceHAName() + "-DiscrepancyMapPusher");
+ }
+ else if (coordinator)
+ {
+ // Other nodes may depend on us having timestamp knowledge, so be
+ // aggressive about getting it -- don't wait for a request.
+ // We also need to tell whoever merged with us about our
+ // knowledge of timestamps
+ final Runnable push = getDiscrepancyPushTask();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ statusCheck();
+ push.run();
+ }
+ };
+
+ executeRunnable(r, getServiceHAName() + "-AsyncStatusCheck");
+ }
+ }
+
+ private void executeRunnable(final Runnable r, String threadName)
+ {
+ if (threadPool != null)
+ {
+ threadPool.run(r);
+ }
+ else
+ {
+ final Thread t = new Thread(r, threadName);
+ t.setDaemon(true);
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ t.setContextClassLoader(r.getClass().getClassLoader());
+ return null;
+ }
+ });
+ t.start();
+ }
+ }
+
+ private synchronized void purgeDeadEntries()
+ {
+ if ((System.currentTimeMillis() - lastPurge > minPurgeFrequency))
+ {
+ synchronized (discrepancies)
+ {
+ synchronized (liveServers)
+ {
+ lastPurge = System.currentTimeMillis();
+
+ Server oldestLive = liveServers.isEmpty()? null : liveServers.first();
+ Set<Server> deadServers = oldestLive == null ? discrepancies.keySet() : discrepancies.headMap(oldestLive).keySet();
+
+ int excess = deadServers.size() - maxDeadServers;
+ if (excess > 0)
+ {
+ Set<Server> toClean = new HashSet<Server>();
+ for (Server server : deadServers)
+ {
+ long min = System.currentTimeMillis() - minDeadServerTime;
+ if (excess > 0 && server.getTimestampChecked() < min)
+ {
+ for (TimestampDiscrepancyObserver observer : observers)
+ {
+ if (observer.canRemoveDeadEntry(server.getNode(), server.getTimestampChecked()) == false)
+ {
+ // vetoed
+ continue;
+ }
+ }
+
+ // If we reached here it wasn't vetoed
+ toClean.add(server);
+ excess--;
+ }
+ else
+ {
+ // We've removed enough or the rest are newer than
+ // the minimum, so stop checking
+ break;
+ }
+ }
+
+ for (Server toRemove : toClean)
+ {
+ discrepancies.remove(toRemove);
+ }
+ }
+
+ }
+ }
+ }
+ }
+
+ private void updateTimestampDiscrepancy(Server server, TimestampDiscrepancy discrepancy, boolean live)
+ {
+ discrepancies.put(server, discrepancy);
+ nodesByName.put(server.getNode().getName(), server.getNode());
+ if (live)
+ {
+ liveServers.add(server);
+ }
+
+ synchronized (unresolvedRemoteDependencies)
+ {
+ Map<Server, TimestampDiscrepancy> unresolved = unresolvedRemoteDependencies.remove(server.getNode());
+ if (unresolved != null)
+ {
+ convertRemoteDiscrepanciesToLocalTime(unresolved, discrepancy);
+ }
+ }
+
+ for (TimestampDiscrepancyObserver observer : observers)
+ {
+ observer.timestampDiscrepancyChanged(server.getNode(), discrepancy);
+ }
+ }
+
+ /**
+ * Handles a pushRemoteDiscrepancies call from a remote node.
+ */
+ private void handleRemoteDiscrepancies(RemoteDiscrepancies remote)
+ {
+ ClusterNode sender = remote.getSender();
+ Map<Server, TimestampDiscrepancy> remoteDiscrepancies = remote.getDiscrepancies();
+
+ synchronized (discrepancies)
+ {
+ TimestampDiscrepancy senderDiscrepancy = discrepancies.get(new Server(sender));
+ if (senderDiscrepancy == null)
+ {
+ // We don't know how to convert these to local time. Just cache
+ // them until we can.
+ synchronized (unresolvedRemoteDependencies)
+ {
+ unresolvedRemoteDependencies.put(sender, remoteDiscrepancies);
+ }
+ }
+ else
+ {
+ convertRemoteDiscrepanciesToLocalTime(remoteDiscrepancies, senderDiscrepancy);
+ }
+ }
+ }
+
+ /**
+ * Takes a set of discrepancies provided by another node and adds any
+ * missing entries to our discrepancy set, *after* adjusting the
+ * TimestampDiscrepancy objects to incorporate our discrepancy with
+ * the node that provided the set.
+ */
+ private void convertRemoteDiscrepanciesToLocalTime(Map<Server, TimestampDiscrepancy> remoteDiscrepancies,
+ TimestampDiscrepancy senderDiscrepancy)
+ {
+ for (Map.Entry<Server, TimestampDiscrepancy> entry : remoteDiscrepancies.entrySet())
+ {
+ Server key = entry.getKey();
+ if (discrepancies.get(key) == null)
+ {
+ // A node we didn't know about
+ discrepancies.put(new Server(key, senderDiscrepancy),
+ new TimestampDiscrepancy(entry.getValue(), senderDiscrepancy));
+ ClusterNode node = key.getNode();
+ nodesByName.put(node.getName(), node);
+ }
+ }
+ }
+
+ /**
+ * Create a Runnable that will push a copy of our discrepancies map to the
+ * cluster.
+ */
+ private Runnable getDiscrepancyPushTask()
+ {
+ Map<Server, TimestampDiscrepancy> map = null;
+ synchronized (discrepancies)
+ {
+ map = new HashMap<Server, TimestampDiscrepancy>(discrepancies);
+ }
+
+ final RemoteDiscrepancies arg = new RemoteDiscrepancies(partition.getClusterNode(), map);
+ final HAPartition haPartition = this.partition;
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ haPartition.callMethodOnCluster(getServiceHAName(),
+ "pushDiscrepancyMap",
+ new Object[] { arg },
+ PUSH_DISCREPANCY_MAP_TYPES, true);
+ }
+ catch (Exception e)
+ {
+ log.error("Exception pushing Discrepancy map to cluster", e);
+ }
+ }
+ };
+ return r;
+ }
+
+ /** Object we register with the HAPartition */
+ public class RpcHandler
+ {
+ public Map<Server, TimestampDiscrepancy> getDiscrepancies()
+ {
+ Map<Server, TimestampDiscrepancy> result = null;
+ synchronized (discrepancies)
+ {
+ result = new HashMap<Server, TimestampDiscrepancy>(discrepancies);
+ }
+ return result;
+ }
+
+ public TimestampResponse getLocalTimestamp()
+ {
+ return new TimestampResponse(partition.getClusterNode());
+ }
+
+ public void pushDiscrepancyMap(RemoteDiscrepancies remote)
+ {
+ handleRemoteDiscrepancies(remote);
+ }
+ }
+
+ /** Object we register with the DRM */
+ @SuppressWarnings("unchecked")
+ private class DRMListener implements ReplicantListener
+ {
+ public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId, boolean merge)
+ {
+ TimestampDiscrepancyService.this.replicantsChanged(newReplicants, merge);
+ }
+ }
+
+
+ public static class Server implements Serializable, Comparable<Server>
+ {
+ /** The serialVersionUID */
+ private static final long serialVersionUID = 4477441836405966100L;
+
+ private final ClusterNode node;
+ private final long timestampChecked;
+
+ private Server(ClusterNode node)
+ {
+ if (node == null)
+ {
+ throw new IllegalArgumentException("Null node");
+ }
+ this.node = node;
+ this.timestampChecked = System.currentTimeMillis();
+ }
+
+ private Server(Server base, TimestampDiscrepancy offset)
+ {
+ this.node = base.node;
+ this.timestampChecked = offset.getMaxLocalTimestamp(base.timestampChecked);
+ }
+
+ public ClusterNode getNode()
+ {
+ return node;
+ }
+
+ public long getTimestampChecked()
+ {
+ return timestampChecked;
+ }
+
+ public int compareTo(Server o)
+ {
+ if (this.node.equals(o.node))
+ return 0;
+ return (int) (this.timestampChecked - o.timestampChecked);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+
+ if (obj instanceof Server)
+ {
+ return this.node.equals(((Server) obj).node);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return node.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder(getClass().getName());
+ sb.append("{node=");
+ sb.append(node);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ }
+
+ public static class TimestampResponse implements Serializable
+ {
+ /** The serialVersionUID */
+ private static final long serialVersionUID = -9171752596968923020L;
+
+ private final ClusterNode responder;
+ private final long timestamp = System.currentTimeMillis();
+
+ private TimestampResponse(ClusterNode responder)
+ {
+ if (responder == null)
+ {
+ throw new IllegalArgumentException("Null responder");
+ }
+ this.responder = responder;
+ }
+
+ public ClusterNode getResponder()
+ {
+ return responder;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+ }
+
+ public static class RemoteDiscrepancies implements Serializable
+ {
+ /** The serialVersionUID */
+ private static final long serialVersionUID = -7394430305832099065L;
+
+ private final ClusterNode sender;
+ private final Map<Server, TimestampDiscrepancy> discrepancies;
+
+ private RemoteDiscrepancies(ClusterNode sender, Map<Server, TimestampDiscrepancy> discrepancies)
+ {
+ if (sender == null)
+ {
+ throw new IllegalArgumentException("Null sender");
+ }
+ if (discrepancies == null)
+ {
+ throw new IllegalArgumentException("Null discrepancies");
+ }
+
+ this.sender = sender;
+ this.discrepancies = discrepancies;
+ }
+
+ public ClusterNode getSender()
+ {
+ return sender;
+ }
+
+ public Map<Server, TimestampDiscrepancy> getDiscrepancies()
+ {
+ return discrepancies;
+ }
+
+ }
+}
Property changes on: trunk/cluster/src/main/org/jboss/ha/timestamp/TimestampDiscrepancyService.java
___________________________________________________________________
Name: svn:keywords
+
More information about the jboss-cvs-commits
mailing list