[jboss-cvs] JBossAS SVN: r65113 - trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Sep 5 15:28:39 EDT 2007
Author: bstansberry at jboss.com
Date: 2007-09-05 15:28:39 -0400 (Wed, 05 Sep 2007)
New Revision: 65113
Modified:
trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso/ClusteredSingleSignOn.java
trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso/TreeCacheSSOClusterManager.java
Log:
[JBAS-4057] Clustered SSO entries not properly cleaned up if a server crashes
[JBAS-3720] Change structure of ClusteredSSO data in tree cache to avoid lock issues
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso/ClusteredSingleSignOn.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso/ClusteredSingleSignOn.java 2007-09-05 19:19:46 UTC (rev 65112)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso/ClusteredSingleSignOn.java 2007-09-05 19:28:39 UTC (rev 65113)
@@ -16,13 +16,14 @@
package org.jboss.web.tomcat.service.sso;
-import org.jboss.web.tomcat.service.session.JBossManager;
-
import java.io.IOException;
import java.security.Principal;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.management.ObjectName;
import javax.servlet.ServletException;
@@ -33,13 +34,15 @@
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleListener;
import org.apache.catalina.Manager;
+import org.apache.catalina.Realm;
import org.apache.catalina.Session;
-import org.apache.catalina.Realm;
import org.apache.catalina.SessionEvent;
import org.apache.catalina.authenticator.Constants;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.session.ManagerBase;
+import org.jboss.web.tomcat.service.deployers.TomcatDeployer;
+import org.jboss.web.tomcat.service.session.JBossManager;
/**
@@ -66,7 +69,11 @@
extends org.apache.catalina.authenticator.SingleSignOn
implements LifecycleListener
{
-
+ /** By default we process expired SSOs no more often than once per minute */
+ public static final int DEFAULT_PROCESS_EXPIRES_INTERVAL = 60;
+ /** By default we let SSOs without active sessions live for 30 mins */
+ public static final int DEFAULT_MAX_EMPTY_LIFE = 1800;
+
// Override the superclass value
static
{
@@ -94,9 +101,38 @@
* Only relevant if the SSOClusterManager implementation is
* TreeCacheSSOClusterManager.
*/
- private String treeCacheName = "jboss.cache:service=TomcatClusteringCache";
+ private String treeCacheName = TomcatDeployer.DEFAULT_CACHE_NAME;
+
+ /**
+ * Object name of the thread pool used by SSOClusterManager.
+ * Only relevant if the SSOClusterManager implementation is
+ * TreeCacheSSOClusterManager.
+ */
+ private String threadPoolName = "jboss.system:service=ThreadPool";
+
+ /** Currently started Managers that have associated as session with an SSO */
+ private Set activeManagers = Collections.synchronizedSet(new HashSet());
- private Set activeManagers = Collections.synchronizedSet(new HashSet());
+ /** Max number of ms an SSO with no active sessions will be usable by a request */
+ private int maxEmptyLife = DEFAULT_MAX_EMPTY_LIFE * 1000;
+
+ /**
+ * Minimum number of ms since the last processExpires() run
+ * before a new run is allowed.
+ */
+ private int processExpiresInterval = DEFAULT_PROCESS_EXPIRES_INTERVAL * 1000;
+
+ /** Timestamp of the last processExpires() run */
+ private long lastProcessExpires = System.currentTimeMillis();
+
+ /**
+ * Map<String, Long> containing the ids of SSOs with no active sessions
+ * and the time at which they entered that state
+ */
+ private Map emptySSOs = new ConcurrentHashMap();
+
+ /** Used for sync locking of processExpires runs */
+ private final Object mutex = new Object();
// ------------------------------------------------------------- Properties
@@ -215,8 +251,132 @@
((TreeCacheSSOClusterManager) ssoClusterManager).setCacheName(cacheName);
}
}
+
+ /**
+ * Object name of the thread pool used by SSOClusterManager.
+ * Only relevant if the SSOClusterManager implementation is
+ * TreeCacheSSOClusterManager.
+ */
+ public String getThreadPoolName()
+ {
+ return threadPoolName;
+ }
+ /**
+ * Sets the object name of the thread pool used by SSOClusterManager.
+ * Only relevant if the SSOClusterManager implementation is
+ * TreeCacheSSOClusterManager.
+ */
+ public void setThreadPoolName(String poolName)
+ throws Exception
+ {
+ this.threadPoolName = poolName;
+ if (ssoClusterManager != null
+ && ssoClusterManager instanceof TreeCacheSSOClusterManager)
+ {
+ ((TreeCacheSSOClusterManager) ssoClusterManager).setThreadPoolName(poolName);
+ }
+ }
+ /**
+ * Gets the max number of seconds an SSO with no active sessions will be
+ * usable by a request.
+ *
+ * @return a non-negative number
+ *
+ * @see #DEFAULT_MAX_EMPTY_LIFE *
+ * @see #setMaxEmptyLife()
+ */
+ public int getMaxEmptyLife()
+ {
+ return (maxEmptyLife / 1000);
+ }
+
+
+ /**
+ * Sets the maximum number of seconds an SSO with no active sessions will be
+ * usable by a request.
+ * <p>
+ * A positive value for this property allows a user to continue to use an SSO
+ * even after all the sessions associated with it have been expired. It does not
+ * keep an SSO alive if a session associated with it has been invalidated due to
+ * an <code>HttpSession.invalidate()</code> call.
+ * </p>
+ * <p>
+ * The primary purpose of this property is to avoid the situation where a server
+ * on which all of an SSO's sessions lives is shutdown, thus expiring all the
+ * sessions and causing the invalidation of the SSO. A positive value for this
+ * property would give the user an opportunity to fail over to another server
+ * and maintain the SSO.
+ * </p>
+ *
+ * @param maxEmptyLife a non-negative number
+ *
+ * @throws IllegalArgumentException if <code>maxEmptyLife < 0</code>
+ */
+ public void setMaxEmptyLife(int maxEmptyLife)
+ {
+ if (maxEmptyLife < 0)
+ throw new IllegalArgumentException("maxEmptyLife must be >= 0");
+
+ this.maxEmptyLife = maxEmptyLife * 1000;
+ }
+
+
+ /**
+ * Gets the minimum number of seconds since the start of the last check for overaged
+ * SSO's with no active sessions before a new run is allowed.
+ *
+ * @return a positive number
+ *
+ * @see #DEFAULT_PROCESS_EXPIRES_INTERVAL
+ * @see #setMaxEmptyLife()
+ * @see #setProcessExpiresInterval(int)
+ */
+ public int getProcessExpiresInterval()
+ {
+ return processExpiresInterval / 1000;
+ }
+
+ /**
+ * Sets the minimum number of seconds since the start of the last check for overaged
+ * SSO's with no active sessions before a new run is allowed. During this check,
+ * any such overaged SSOs will be invalidated.
+ * <p>
+ * Note that setting this value does not imply that a check will be performed
+ * every <code>processExpiresInterval</code> seconds, only that it will not
+ * be performed more often than that.
+ * </p>
+ *
+ * @param processExpiresInterval a non-negative number. <code>0</code> means
+ * the overage check can be performed whenever
+ * the container wishes to.
+ *
+ * @throws IllegalArgumentException if <code>processExpiresInterval < 1</code>
+ *
+ * @see #setMaxEmptyLife()
+ */
+ public void setProcessExpiresInterval(int processExpiresInterval)
+ {
+ if (processExpiresInterval < 0)
+ throw new IllegalArgumentException("processExpiresInterval must be >= 0");
+
+ this.processExpiresInterval = processExpiresInterval * 1000;
+ }
+
+
+ /**
+ * Gets the timestamp of the start of the last check for overaged
+ * SSO's with no active sessions.
+ *
+ * @see #setProcessExpiresInterval(int)
+ */
+ public long getLastProcessExpires()
+ {
+ return lastProcessExpires;
+ }
+
+
// ------------------------------------------------------ Lifecycle Methods
@@ -302,8 +462,8 @@
// Look up the single session id associated with this session (if any)
Session session = event.getSession();
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug("Process session destroyed on " + session);
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace("Process session destroyed on " + session);
String ssoId = null;
synchronized (reverse)
@@ -313,21 +473,35 @@
if (ssoId == null)
return;
- // Was the session destroyed as the result of a timeout or
- // the undeployment of the containing webapp?
- // If so, we'll just remove the expired session from the
- // SSO. If the session was logged out, we'll log out
- // of all sessions associated with the SSO.
- if (isSessionTimedOut(session) || isManagerStopped(session))
+ try
{
- removeSession(ssoId, session);
+ // Was the session destroyed as the result of a timeout or
+ // the undeployment of the containing webapp?
+ // If so, we'll just remove the expired session from the
+ // SSO. If the session was logged out, we'll log out
+ // of all sessions associated with the SSO.
+ if (isSessionTimedOut(session) || isManagerStopped(session))
+ {
+ removeSession(ssoId, session);
+
+ // Quite poor. We hijack the caller thread (the Tomcat background thread)
+ // to do our cleanup of expired sessions
+ processExpires();
+ }
+ else
+ {
+ // The session was logged out.
+ logout(ssoId);
+ }
}
- else
+ catch (Exception e)
{
- // The session was logged out.
- logout(ssoId);
+ // Don't propagate back to the webapp; we don't want to disrupt
+ // the session expiration process
+ getContainer().getLogger().error("Caught exception updating SSO " + ssoId +
+ " following destruction of session " +
+ session.getIdInternal(), e);
}
-
}
private boolean isSessionTimedOut(Session session)
@@ -336,7 +510,7 @@
&& (System.currentTimeMillis() - session.getLastAccessedTime() >=
session.getMaxInactiveInterval() * 1000);
}
-
+
private boolean isManagerStopped(Session session)
{
boolean stopped = false;
@@ -391,11 +565,8 @@
{
source.removeLifecycleListener(this);
- if (getContainer().getLogger().isDebugEnabled())
- {
- getContainer().getLogger().debug("ClusteredSSO: removed " +
- "stopped manager " + source.toString());
- }
+ getContainer().getLogger().debug("ClusteredSSO: removed stopped " +
+ "manager " + source.toString());
}
// TODO consider getting the sessions and removing any from our sso's
@@ -429,12 +600,12 @@
request.removeNote(Constants.REQ_SSOID_NOTE);
// Has a valid user already been authenticated?
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug("Process request for '" + request.getRequestURI() + "'");
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace("Process request for '" + request.getRequestURI() + "'");
if (request.getUserPrincipal() != null)
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug(" Principal '" + request.getUserPrincipal().getName() +
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace(" Principal '" + request.getUserPrincipal().getName() +
"' has already been authenticated");
getNext().invoke(request, response);
return;
@@ -455,25 +626,26 @@
}
if (cookie == null)
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug(" SSO cookie is not present");
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace(" SSO cookie is not present");
getNext().invoke(request, response);
return;
}
// Look up the cached Principal associated with this cookie value
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug(" Checking for cached principal for " + cookie.getValue());
+ String ssoId = cookie.getValue();
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace(" Checking for cached principal for " + ssoId);
JBossSingleSignOnEntry entry = getSingleSignOnEntry(cookie.getValue());
- if (entry != null)
+ if (entry != null && isValid(ssoId, entry))
{
Principal ssoPrinc = entry.getPrincipal();
// have to deal with the fact that the entry may not have an
// associated Principal. SSO entries retrieved via a lookup from a
// cluster will not have a Principal, as Principal is not Serializable
- if (getContainer().getLogger().isDebugEnabled())
+ if (getContainer().getLogger().isTraceEnabled())
{
- getContainer().getLogger().debug(" Found cached principal '" +
+ getContainer().getLogger().trace(" Found cached principal '" +
(ssoPrinc == null ? "NULL" : ssoPrinc.getName()) +
"' with auth type '" + entry.getAuthType() + "'");
}
@@ -488,8 +660,8 @@
}
else
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug(" No cached principal found, erasing SSO cookie");
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace(" No cached principal found, erasing SSO cookie");
cookie.setMaxAge(0);
response.addCookie(cookie);
}
@@ -514,8 +686,8 @@
*/
protected void associate(String ssoId, Session session)
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug("Associate sso id " + ssoId + " with session " + session);
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace("Associate sso id " + ssoId + " with session " + session);
JBossSingleSignOnEntry sso = getSingleSignOnEntry(ssoId);
boolean added = false;
@@ -620,9 +792,14 @@
*/
protected void deregister(String ssoId)
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug("Deregistering sso id '" + ssoId + "'");
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace("Deregistering sso id '" + ssoId + "'");
+
+ // It's possible we don't have the SSO locally but it's in
+ // the emptySSOs map; if so remove it
+ emptySSOs.remove(ssoId);
+
// Look up and remove the corresponding SingleSignOnEntry
JBossSingleSignOnEntry sso = null;
synchronized (cache)
@@ -787,8 +964,8 @@
*/
protected void removeSession(String ssoId, Session session)
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug("Removing session " + session.toString() +
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace("Removing session " + session.toString() +
" from sso id " + ssoId);
// Get a reference to the SingleSignOn
@@ -809,13 +986,6 @@
{
reverse.remove(session);
}
-
- // If there are no sessions left in the SingleSignOnEntry,
- // deregister the entry.
- if (entry.getSessionCount() == 0)
- {
- deregister(ssoId);
- }
}
@@ -894,9 +1064,9 @@
void registerLocal(String ssoId, Principal principal, String authType,
String username, String password)
{
- if (getContainer().getLogger().isDebugEnabled())
+ if (getContainer().getLogger().isTraceEnabled())
{
- getContainer().getLogger().debug("Registering sso id '" + ssoId + "' for user '" +
+ getContainer().getLogger().trace("Registering sso id '" + ssoId + "' for user '" +
principal.getName() + "' with auth type '" + authType + "'");
}
@@ -932,8 +1102,8 @@
{
if (sso.getCanReauthenticate() == false)
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug("Update sso id " + ssoId + " to auth type " + authType);
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace("Update sso id " + ssoId + " to auth type " + authType);
synchronized (sso)
{
@@ -943,8 +1113,8 @@
}
else if (sso.getPrincipal() == null && principal != null)
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug("Update sso id " + ssoId + " with principal " +
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace("Update sso id " + ssoId + " with principal " +
principal.getName());
synchronized (sso)
@@ -967,8 +1137,8 @@
// Only update if the entry is missing information
if (sso != null && sso.getCanReauthenticate() == false)
{
- if (getContainer().getLogger().isDebugEnabled())
- getContainer().getLogger().debug("Update sso id " + ssoId + " to auth type " + authType);
+ if (getContainer().getLogger().isTraceEnabled())
+ getContainer().getLogger().trace("Update sso id " + ssoId + " to auth type " + authType);
synchronized (sso)
{
@@ -979,6 +1149,35 @@
}
}
+
+ /**
+ * Callback from the SSOManager when it detects an SSO without
+ * any active sessions across the cluster
+ */
+ protected void notifySSOEmpty(String ssoId)
+ {
+ Object obj = emptySSOs.put(ssoId, new Long(System.currentTimeMillis()));
+
+ if (obj == null && getContainer().getLogger().isTraceEnabled())
+ {
+ getContainer().getLogger().trace("Notified that SSO " + ssoId + " is empty");
+ }
+ }
+
+ /**
+ * Callback from the SSOManager when it detects an SSO that
+ * has active sessions across the cluster
+ */
+ protected void notifySSONotEmpty(String ssoId)
+ {
+ Object obj = emptySSOs.remove(ssoId);
+
+ if (obj != null && getContainer().getLogger().isTraceEnabled())
+ {
+ getContainer().getLogger().trace("Notified that SSO " + ssoId +
+ " is no longer empty");
+ }
+ }
// ------------------------------------------------------- Private Methods
@@ -1017,6 +1216,7 @@
if (mgr instanceof TreeCacheSSOClusterManager)
{
((TreeCacheSSOClusterManager) mgr).setCacheName(getTreeCacheName());
+ ((TreeCacheSSOClusterManager) mgr).setThreadPoolName(getThreadPoolName());
}
ssoClusterManager = mgr;
clusterManagerClass = className;
@@ -1034,5 +1234,66 @@
}
}
}
+
+
+ private void processExpires()
+ {
+ long now = 0L;
+ synchronized (mutex)
+ {
+ now = System.currentTimeMillis();
+
+ if (now - lastProcessExpires > processExpiresInterval)
+ {
+ lastProcessExpires = now;
+ }
+ else
+ {
+ return;
+ }
+ }
+
+ clearExpiredSSOs(now);
+ }
+
+ private synchronized void clearExpiredSSOs(long now)
+ {
+ for (Iterator iter = emptySSOs.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ if ( (now - ((Long) entry.getValue()).longValue()) > maxEmptyLife)
+ {
+ String ssoId = (String) entry.getKey();
+ if (getContainer().getLogger().isTraceEnabled())
+ {
+ getContainer().getLogger().trace("Invalidating expired SSO " + ssoId);
+ }
+ logout(ssoId);
+ }
+ }
+ }
+
+ private boolean isValid(String ssoId, JBossSingleSignOnEntry entry)
+ {
+ boolean valid = true;
+ if (entry.getSessionCount() == 0)
+ {
+ Long expired = (Long) emptySSOs.get(ssoId);
+ if (expired != null
+ && (System.currentTimeMillis() - expired.longValue()) > maxEmptyLife)
+ {
+ valid = false;
+
+ if (getContainer().getLogger().isTraceEnabled())
+ {
+ getContainer().getLogger().trace("Invalidating expired SSO " + ssoId);
+ }
+
+ logout(ssoId);
+ }
+ }
+
+ return valid;
+ }
}
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso/TreeCacheSSOClusterManager.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso/TreeCacheSSOClusterManager.java 2007-09-05 19:19:46 UTC (rev 65112)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/service/sso/TreeCacheSSOClusterManager.java 2007-09-05 19:28:39 UTC (rev 65113)
@@ -24,8 +24,13 @@
import java.io.Serializable;
import java.security.Principal;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.transaction.Status;
@@ -38,19 +43,23 @@
import org.jboss.cache.Cache;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Node;
import org.jboss.cache.Region;
import org.jboss.cache.RegionNotEmptyException;
+import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.config.Option;
-import org.jboss.cache.jmx.CacheJmxWrapperMBean;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.NodeModified;
import org.jboss.cache.notifications.annotation.NodeRemoved;
+import org.jboss.cache.notifications.annotation.ViewChanged;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
import org.jboss.cache.notifications.event.NodeRemovedEvent;
+import org.jboss.cache.notifications.event.ViewChangedEvent;
+import org.jboss.cache.pojo.PojoCache;
import org.jboss.logging.Logger;
-import org.jboss.mx.util.MBeanProxyExt;
import org.jboss.mx.util.MBeanServerLocator;
import org.jboss.util.NestedRuntimeException;
+import org.jboss.util.threadpool.ThreadPool;
/**
* An implementation of SSOClusterManager that uses a TreeCache
@@ -94,12 +103,11 @@
public static final String DEFAULT_GLOBAL_CACHE_NAME =
"jboss.cache:service=TomcatClusteringCache";
- private static final Option GRAVITATE_OPTION = new Option();
-
- static
- {
- GRAVITATE_OPTION.setForceDataGravitation(true);
- }
+ /**
+ * Default global value for the threadPoolName property
+ */
+ public static final String DEFAULT_THREAD_POOL_NAME =
+ "jboss.system:service=ThreadPool";
// ------------------------------------------------------- Instance Fields
@@ -138,6 +146,10 @@
*/
private TransactionManager tm = null;
+ private String threadPoolName = DEFAULT_THREAD_POOL_NAME;
+
+ private ThreadPool threadPool;
+
/**
* The lifecycle event support for this component.
*/
@@ -179,10 +191,21 @@
private boolean missingCacheErrorLogged = false;
/**
+ * Whether our cache is using buddy replication
+ */
+ private boolean usingBuddyReplication;
+
+ /**
* Our node's address in the cluster.
*/
private Serializable localAddress = null;
+ /** The members of the last view passed to viewChange() */
+ private Set currentView = new HashSet();;
+
+ /** Mutex lock to ensure only one view change at a time is being processed */
+ private Object cleanupMutex = new Object();
+
// ---------------------------------------------------------- Constructors
@@ -268,6 +291,29 @@
}
}
+ public String getThreadPoolName()
+ {
+ return threadPoolName;
+ }
+
+
+ public void setThreadPoolName(String threadPoolName)
+ {
+ if (started)
+ {
+ log.info("Call to setThreadPoolName() ignored; already started");
+ }
+ else
+ {
+ this.threadPoolName = threadPoolName;
+ }
+ }
+
+ public boolean isUsingThreadPool()
+ {
+ return threadPool != null;
+ }
+
// ----------------------------------------------------- SSOClusterManager
/**
@@ -311,9 +357,10 @@
if(doTx)
tm.begin();
- Set sessions = getSessionSet(fqn, true);
- sessions.add(new SessionAddress(session.getId(), localAddress));
- putInTreeCache(fqn, sessions);
+ // Gravitate the SSO if necessary
+ gravitateSSO(ssoId);
+
+ putInTreeCache(fqn, session.getId(), null);
}
catch (Exception e)
{
@@ -395,8 +442,11 @@
Fqn fqn = getSingleSignOnFqn(ssoId);
try
- {
- removeFromTreeCache(fqn);
+ {
+ // Gravitate the SSO if necessary
+ gravitateSSO(ssoId);
+
+ removeFromTreeCache(fqn, false);
}
catch (Exception e)
{
@@ -430,8 +480,11 @@
// Find the latest credential info from the cluster
Fqn fqn = getCredentialsFqn(ssoId);
try
- {
- SSOCredentials data = (SSOCredentials) getFromTreeCache(fqn);
+ {
+ // Gravitate the SSO if necessary
+ gravitateSSO(ssoId);
+
+ SSOCredentials data = (SSOCredentials) getFromTreeCache(fqn, KEY);
if (data != null)
{
entry = new JBossSingleSignOnEntry(null,
@@ -470,7 +523,10 @@
{
log.trace("Registering SSO " + ssoId + " in clustered cache");
}
-
+
+ // Gravitate the SSO if necessary
+ gravitateSSO(ssoId);
+
storeSSOData(ssoId, authType, username, password);
}
@@ -524,23 +580,23 @@
if(doTx)
tm.begin();
- Set sessions = getSessionSet(fqn, false);
- if (sessions != null)
+ // Gravitate the SSO if necessary
+ gravitateSSO(ssoId);
+
+ Set keys = getSessionKeys(ssoId);
+ if (keys.contains(session.getId()))
{
- sessions.remove(new SessionAddress(session.getId(), localAddress));
- if (sessions.size() == 0)
- {
- // No sessions left; remove node
-
- // Add this SSO to our list of in-process local removals so
- // this.nodeRemoved() will ignore the removal
- removing = true;
- beingLocallyRemoved.set(ssoId);
- removeFromTreeCache(getSingleSignOnFqn(ssoId));
+ if (keys.size() == 1)
+ {
+ // This is our last session locally; remove our node (which,
+ // via nodeRemoved callback also marks the sso empty if it's
+ // also the last session globally
+ removeFromTreeCache(fqn, false);
}
else
{
- putInTreeCache(fqn, sessions);
+ // Simple removal of one our local sessions
+ removeFromTreeCache(fqn, session.getId());
}
}
}
@@ -600,6 +656,9 @@
log.trace("Updating credentials for SSO " + ssoId +
" in clustered cache");
}
+
+ // Gravitate the SSO if necessary
+ gravitateSSO(ssoId);
storeSSOData(ssoId, authType, username, password);
}
@@ -622,17 +681,101 @@
if (event.isPre())
return;
- String ssoId = getIdFromFqn(event.getFqn());
+ Fqn fqn = event.getFqn();
+ boolean isBuddyFqn = isBuddyFqn(fqn);
+ String ssoId = getIdFromFqn(fqn, isBuddyFqn);
if (ssoId == null)
return;
+
+ int basePos = isBuddyFqn ? 2 : 0;
+
+ if (fqn.size() == (2 + basePos))
+ {
+ // Entire SSO is being removed; i.e. an invalidation
+
+ // Ignore messages generated by our own logout activity
+ if (!ssoId.equals(beingLocallyRemoved.get()))
+ {
+ handleRemoteInvalidation(ssoId);
+ }
+ }
+ else if (fqn.size() == (4 + basePos))
+ {
+ // A peer is gone
+ handlePeerRemoval(ssoId);
+ }
+ }
- // Ignore messages generated by our own activity
- if (ssoId.equals(beingLocallyRemoved.get()))
+ /**
+ * If any nodes have been removed from the view, asynchronously scans
+ * all SSOs looking for and removing sessions owned by the removed node.
+ * Notifies the SSO valve if as a result any SSOs no longer have active
+ * sessions. If the removed node is the one associated with this object,
+ * does nothing.
+ */
+ @ViewChanged
+ public synchronized void viewChange(ViewChangedEvent event)
+ {
+ if (event.isPre())
+ return;
+
+ log.debug("Received ViewChangedEvent " + event);
+
+ Set oldMembers = new HashSet(currentView);
+ synchronized (currentView)
{
- return;
+ currentView.clear();
+ currentView.addAll(event.getNewView().getMembers());
+
+ // If we're not in the view, just exit
+ if (localAddress == null || !currentView.contains(localAddress))
+ return;
+
+ // Remove all the current members from the old set; any left
+ // are the dead members
+ oldMembers.removeAll(currentView);
}
+ if (oldMembers.size() > 0)
+ {
+ log.debug("Members have been removed; will launch cleanup task. Dead members: " + oldMembers);
+
+ launchSSOCleaner(false);
+ }
+
+ }
+
+
+ /**
+ * Instantiates a DeadMemberCleaner and assigns a thread
+ * to execute the cleanup task.
+ * @param notifyIfEmpty TODO
+ */
+ private void launchSSOCleaner(boolean notifyIfEmpty)
+ {
+ SSOCleanerTask cleaner = new SSOCleanerTask();
+ cleaner.setCheckForEmpty(notifyIfEmpty);
+ if (threadPool != null)
+ {
+ threadPool.run(cleaner);
+ }
+ else
+ {
+ Thread t = new Thread(cleaner, "ClusteredSSOCleaner");
+ t.setDaemon(true);
+ t.start();
+ }
+ }
+
+
+ /**
+ * Handles the notification that an entire SSO has been removed remotely
+ *
+ * @param ssoId id of the removed SSO
+ */
+ private void handleRemoteInvalidation(String ssoId)
+ {
beingRemotelyRemoved.set(ssoId);
try
@@ -648,8 +791,29 @@
{
beingRemotelyRemoved.set(null);
}
-
}
+
+ /**
+ * Checks whether any peers remain for the given SSO; if not
+ * notifies the valve that the SSO is empty.
+ *
+ * @param ssoId
+ */
+ private void handlePeerRemoval(String ssoId)
+ {
+ try
+ {
+ Set peers = getSSOPeers(ssoId);
+ if (peers.size() == 0)
+ {
+ ssoValve.notifySSOEmpty(ssoId);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Caught exception checking if " + ssoId + " is empty", e);
+ }
+ }
/**
* Extracts an SSO session id from the Fqn and uses it in an invocation of
@@ -669,19 +833,28 @@
@NodeModified
public void nodeModified(NodeModifiedEvent event)
{
- if (event.isPre())
+ if (event.isPre() || event.isOriginLocal())
return;
Fqn fqn = event.getFqn();
-
- // We are only interested in changes to the CREDENTIALS node
- if (CREDENTIALS.equals(getTypeFromFqn(fqn)) == false)
+ boolean isBuddyFqn = isBuddyFqn(fqn);
+ String type = getTypeFromFqn(fqn, isBuddyFqn);
+ if (CREDENTIALS.equals(type))
{
- return;
+ handleCredentialUpdate(getIdFromFqn(fqn, isBuddyFqn), event.getData());
}
+ else if (SESSIONS.equals(type))
+ {
+ handleSessionSetChange(fqn, isBuddyFqn);
+ }
+ }
- String ssoId = getIdFromFqn(fqn); // won't be null or above check fails
-
+ /**
+ * @param ssoId the id of the sso
+ * @param nodeData JBC data map assoicated with the update
+ */
+ private void handleCredentialUpdate(String ssoId, Map nodeData)
+ {
// Ignore invocations that come as a result of our additions
if (ssoId.equals(beingLocallyAdded.get()))
{
@@ -700,22 +873,18 @@
log.trace("received a credentials modified message for SSO " + ssoId);
}
- // Put this SSO in the queue of those to be updated
-// credentialUpdater.enqueue(sso, ssoId);
try
{
- SSOCredentials data = (SSOCredentials) getFromTreeCache(fqn);
+ SSOCredentials data = (SSOCredentials) nodeData.get(KEY);
if (data != null)
{
- // We want to release our read lock quickly, so get the needed
- // data from the cache, commit the tx, and then use the data
String authType = data.getAuthType();
String username = data.getUsername();
String password = data.getPassword();
if (log.isTraceEnabled())
{
- log.trace("CredentialUpdater: Updating credentials for SSO " + sso);
+ log.trace("Updating credentials for SSO " + sso);
}
synchronized (sso)
@@ -731,7 +900,23 @@
log.error("failed to update credentials for SSO " + ssoId, e);
}
}
+
+ /**
+ *
+ * @param fqn an Fqn that points to the SESSIONS node of an SSO or lower
+ */
+ private void handleSessionSetChange(Fqn fqn, boolean isBuddy)
+ {
+ int basePos = isBuddy ? 2 : 0;
+ // Ignore anything not for a peer's session node
+ if (fqn.size() != (4 + basePos))
+ return;
+ // Peers remove their entire node when it's empty, so any
+ // other modification means it's not empty
+ ssoValve.notifySSONotEmpty(getIdFromFqn(fqn, isBuddy));
+ }
+
// ------------------------------------------------------------- Lifecycle
@@ -784,6 +969,8 @@
throw new LifecycleException
("TreeCacheSSOClusterManager already Started");
}
+
+ initThreadPool();
try
{
@@ -830,32 +1017,65 @@
}
- // ------------------------------------------------------- Private Methods
+ // ------------------------------------------------------- Public Methods
- private Object getFromTreeCache(Fqn fqn) throws Exception
+ /**
+ * Gets the number of sessions associated with the given SSO. The same
+ * session active on more than one node will count twice.
+ */
+ public int getSessionCount(String ssoId) throws Exception
{
- InvocationContext ctx = cache.getInvocationContext();
- Option existing = ctx.getOptionOverrides();
- try
+ int count = 0;
+ Set peers = getSSOPeers(ssoId);
+ for (Iterator it = peers.iterator(); it.hasNext();)
{
- ctx.setOptionOverrides(GRAVITATE_OPTION);
- return cache.get(fqn, KEY);
+ Set ids = getSessionKeys(ssoId, (Serializable) it.next());
+ count += ids.size();
}
- finally
- {
- ctx.setOptionOverrides(existing);
- }
+ return count;
}
+
+ // ------------------------------------------------------- Private Methods
+ private Object getFromTreeCache(Fqn fqn, Object key) throws Exception
+ {
+ return cache.get(fqn, key);
+ }
+
+ private Set getSSOIds() throws Exception
+ {
+ Fqn ssoRootFqn = new Fqn(new Object[] {SSO});
+ Node ssoRoot = cache.getRoot().getChild(ssoRootFqn);
+ return ssoRoot == null ? new HashSet() : ssoRoot.getChildrenNames();
+ }
+
+ private Set getSSOPeers(String ssoId) throws Exception
+ {
+ Fqn fqn = getSessionRootFqn(ssoId);
+ Node ssoRoot = cache.getRoot().getChild(fqn);
+ return ssoRoot == null ? new HashSet() : ssoRoot.getChildrenNames();
+ }
+
private Fqn getCredentialsFqn(String ssoid)
{
Object[] objs = new Object[]{SSO, ssoid, CREDENTIALS};
return new Fqn(objs);
}
+ private Fqn getSessionRootFqn(String ssoId)
+ {
+ Object[] objs = new Object[]{SSO, ssoId, SESSIONS };
+ return new Fqn(objs);
+ }
+
private Fqn getSessionsFqn(String ssoid)
{
- Object[] objs = new Object[]{SSO, ssoid, localAddress, SESSIONS};
+ return getSessionsFqn(ssoid, localAddress);
+ }
+
+ private Fqn getSessionsFqn(String ssoid, Serializable address)
+ {
+ Object[] objs = new Object[]{SSO, ssoid, SESSIONS, address };
return new Fqn(objs);
}
@@ -865,55 +1085,94 @@
return new Fqn(objs);
}
+ private boolean isBuddyFqn(Fqn fqn)
+ {
+ boolean isBuddy = usingBuddyReplication;
+
+ if (isBuddy)
+ {
+ isBuddy = fqn.size() > 0 && BuddyManager.BUDDY_BACKUP_SUBTREE.equals(fqn.get(0));
+ }
+ return isBuddy;
+ }
+
/**
* Extracts an SSO session id from a fully qualified name object.
*
* @param fqn the Fully Qualified Name used by TreeCache
* @return the second element in the Fqn -- the SSO session id
*/
- private String getIdFromFqn(Fqn fqn)
+ private String getIdFromFqn(Fqn fqn, boolean isBuddy)
{
String id = null;
- if (fqn.size() > 1 && SSO.equals(fqn.get(0)))
+ int basePos = isBuddy ? 2 : 0;
+ if (fqn.size() > (1 + basePos) && SSO.equals(fqn.get(0 + basePos)))
{
- id = (String) fqn.get(1);
+ id = (String) fqn.get(1 + basePos);
}
return id;
}
- private Set getSessionSet(Fqn fqn, boolean create)
- throws Exception
- {
- Set sessions = (Set) getFromTreeCache(fqn);
- if (create && sessions == null)
- {
- sessions = new HashSet();
- }
- return sessions;
- }
-
/**
* Extracts the SSO tree cache node type from a fully qualified name
* object.
*
* @param fqn the Fully Qualified Name used by TreeCache
+ * @param isBuddy does fqn come from a buddy backup subtree?
* @return the 3rd in the Fqn -- either
* {@link #CREDENTIALS CREDENTIALS} or {@link #SESSIONS SESSIONS},
* or <code>null</code> if <code>fqn</code> is not for an SSO.
*/
- private String getTypeFromFqn(Fqn fqn)
+ private String getTypeFromFqn(Fqn fqn, boolean isBuddy)
{
String type = null;
- if (fqn.size() > 2 && SSO.equals(fqn.get(0)))
- type = (String) fqn.get(2);
+ int basePos = isBuddy ? 2 : 0;
+ if (fqn.size() > (2 + basePos) && SSO.equals(fqn.get(0 + basePos)))
+ type = (String) fqn.get(2 + basePos);
return type;
}
+ private Set getSessionKeys(String ssoId)
+ {
+ return getSessionKeys(ssoId, localAddress);
+ }
+
+ private Set getSessionKeys(String ssoId, Serializable peer)
+ {
+ Fqn fqn = getSessionsFqn(ssoId, peer);
+ Set keys = null;
+ Node sessions = cache.getRoot().getChild(fqn);
+ if (sessions != null)
+ {
+ keys = sessions.getKeys();
+ }
+ else
+ {
+ keys = new HashSet();
+ }
+ return keys;
+ }
+
+ private void gravitateSSO(String ssoId)
+ {
+ if (usingBuddyReplication)
+ {
+ Fqn fqn = getSingleSignOnFqn(ssoId);
+ InvocationContext ctx = cache.getInvocationContext();
+ Option opt = new Option();
+ opt.setForceDataGravitation(true);
+ ctx.setOptionOverrides(opt);
+ cache.get(fqn, "dummy");
+ }
+ }
+
/**
* Obtains needed configuration information from the tree cache.
* Invokes "getTransactionManager" on the tree cache, caching the
* result or throwing an IllegalStateException if one is not found.
- * Also get our cluster-wide unique local address from the cache.
+ * Confirms that the cache is not configured for buddy replication,
+ * throwing IllegalStateException if it is. Also gets our cluster-wide
+ * unique local address from the cache.
*
* @throws Exception
*/
@@ -929,6 +1188,14 @@
"TransactionManagerLookupClass");
}
+ if (cache.getConfiguration().getBuddyReplicationConfig() != null
+ && cache.getConfiguration().getBuddyReplicationConfig().isEnabled())
+ {
+ usingBuddyReplication = true;
+ log.info("Underlying cache is configured for buddy replication; use of " +
+ "buddy replication with ClusteredSingleSignOn is not advised");
+ }
+
// Find out our address
Object address = cache.getLocalAddress();
// In reality this is a JGroups IpAddress, but the API says
@@ -937,6 +1204,21 @@
localAddress = (Serializable) address;
else
localAddress = address.toString();
+
+ log.debug("Local address is " + localAddress);
+
+ // Get the currentView
+ synchronized (currentView)
+ {
+ currentView.clear();
+ List members = cache.getMembers();
+ if (members != null)
+ {
+ currentView.addAll(members);
+
+ log.debug("Current view is " + currentView);
+ }
+ }
}
private void endTransaction()
@@ -1011,29 +1293,31 @@
return avail;
}
- private void putInTreeCache(Fqn fqn, Object data) throws Exception
+ private void putInTreeCache(Fqn fqn, Object key, Object data) throws Exception
{
- InvocationContext ctx = cache.getInvocationContext();
- Option existing = ctx.getOptionOverrides();
- try
- {
- ctx.setOptionOverrides(GRAVITATE_OPTION);
- cache.put(fqn, KEY, data);
- }
- finally
- {
- ctx.setOptionOverrides(existing);
- }
+ cache.put(fqn, key, data);
}
private void integrateWithCache() throws Exception
{
if (cache == null)
{
- // Get the cache
- CacheJmxWrapperMBean mbean = (CacheJmxWrapperMBean) MBeanProxyExt.create(CacheJmxWrapperMBean.class,
- getCacheObjectName());
- cache = mbean.getCache();
+ // Determine if our cache is a PojoCache or a plain Cache
+ MBeanInfo info = server.getMBeanInfo(getCacheObjectName());
+ MBeanAttributeInfo[] attrs = info.getAttributes();
+ for (MBeanAttributeInfo attr : attrs)
+ {
+ if ("PojoCache".equals(attr.getName()))
+ {
+ cache = ((PojoCache) server.getAttribute(getCacheObjectName(), "PojoCache")).getCache();
+ break;
+ }
+ else if ("Cache".equals(attr.getName()))
+ {
+ cache = (Cache) server.getAttribute(getCacheObjectName(), "Cache");
+ break;
+ }
+ }
// Ensure we have a transaction manager and a cluster-wide unique address
configureFromCache();
@@ -1043,6 +1327,9 @@
registerAsCacheListener();
+ // Scan for any SSOs with no entries; mark them for expiration
+ launchSSOCleaner(true);
+
log.debug("Successfully integrated with cache service " + cacheObjectName);
}
}
@@ -1101,21 +1388,23 @@
}
}
- private void removeFromTreeCache(Fqn fqn) throws Exception
+ private void removeFromTreeCache(Fqn fqn, boolean localOnly) throws Exception
{
- InvocationContext ctx = cache.getInvocationContext();
- Option existing = ctx.getOptionOverrides();
- try
+ if (localOnly)
{
- ctx.setOptionOverrides(GRAVITATE_OPTION);
- cache.removeNode(fqn);
+ InvocationContext ctx = cache.getInvocationContext();
+ Option option = new Option();
+ option.setCacheModeLocal(true);
+ ctx.setOptionOverrides(option);
}
- finally
- {
- ctx.setOptionOverrides(existing);
- }
+ cache.removeNode(fqn);
}
+ private void removeFromTreeCache(Fqn fqn, Object key) throws Exception
+ {
+ cache.remove(fqn, key);
+ }
+
/**
* Stores the given data to the clustered cache in a tree branch whose FQN
* is the given SSO id. Stores the given credential data in a child node
@@ -1143,7 +1432,7 @@
try
{
- putInTreeCache(getCredentialsFqn(ssoId), data);
+ putInTreeCache(getCredentialsFqn(ssoId), KEY, data);
}
catch (Exception e)
{
@@ -1155,6 +1444,29 @@
beingLocallyAdded.set(null);
}
}
+
+ private void initThreadPool()
+ {
+ if (threadPoolName != null)
+ {
+ try
+ {
+ ObjectName on = new ObjectName(threadPoolName);
+ threadPool = (ThreadPool) server.getAttribute(on, "Instance");
+ log.debug("Using ThreadPool at " + threadPoolName + " to clean dead members");
+ }
+ catch (Exception e)
+ {
+ log.info("Unable to access ThreadPool at " + threadPoolName +
+ " -- will use individual threads for cleanup work");
+ log.debug("Failure to access ThreadPool due to: " + e);
+ }
+ }
+ else
+ {
+ log.debug("No ThreadPool configured -- will use individual threads for cleanup work");
+ }
+ }
private boolean isMissingCacheErrorLogged()
{
@@ -1250,42 +1562,112 @@
}
} // end SSOCredentials
-
- static class SessionAddress implements Serializable
- {
- /** The serialVersionUID */
- private static final long serialVersionUID = -3702932999380140004L;
+
+ /**
+ * Runnable that's run when the removal of a node from the cluster has been detected.
+ * Removes any SessionAddress objects associated with dead members from the
+ * session set of each SSO. Operates locally only so each node can independently clean
+ * its SSOs without concern about replication lock conflicts.
+ */
+ private class SSOCleanerTask implements Runnable
+ {
+ boolean checkForEmpty = false;
- Serializable address;
- String sessionId;
- SessionAddress(String sessionId, Serializable address)
+ boolean getCheckForEmpty()
{
- this.sessionId = sessionId;
- this.address = address;
+ return checkForEmpty;
}
- public boolean equals(Object obj)
+ void setCheckForEmpty(boolean checkForEmpty)
{
- if (this == obj)
- return true;
-
- if (!(obj instanceof SessionAddress))
- return false;
-
- SessionAddress other = (SessionAddress) obj;
-
- return (sessionId.equals(other.sessionId)
- && address.equals(other.address));
+ this.checkForEmpty = checkForEmpty;
}
- public int hashCode()
+ public void run()
{
- int total = (19 * 43) + sessionId.hashCode();
- return ((total * 43) + address.hashCode());
+ synchronized (cleanupMutex)
+ {
+ try
+ {
+ // Ensure we have a TransactionManager
+ if (tm == null)
+ configureFromCache();
+
+ Set ids = getSSOIds();
+ for (Iterator iter = ids.iterator(); iter.hasNext();)
+ {
+ cleanSSO((String) iter.next());
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Caught exception cleaning sessions from dead cluster members from SSOs ", e);
+ }
+ }
}
-
+ private void cleanSSO(String ssoId)
+ {
+ boolean doTx = false;
+ try
+ {
+ // Don't start tx if there is already one associated with this thread.
+ if(tm.getTransaction() == null)
+ doTx = true;
+
+ if(doTx)
+ tm.begin();
+
+ Set peers = getSSOPeers(ssoId);
+ if (peers != null && peers.size() > 0)
+ {
+ for (Iterator iter = peers.iterator(); iter.hasNext(); )
+ {
+ Serializable peer = (Serializable) iter.next();
+ boolean alive = true;
+ synchronized (currentView)
+ {
+ alive = currentView.contains(peer);
+ }
+ if (!alive)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Removing peer " + peer + " from SSO " + ssoId);
+ }
+
+ Fqn fqn = getSessionsFqn(ssoId, peer);
+ // Remove the peer node, but local-only
+ // Each cache is responsible for cleaning itself
+ removeFromTreeCache(fqn, true);
+ }
+ }
+ }
+ else if (checkForEmpty)
+ {
+ // SSO has no peers; notify our valve so we can expire it
+ ssoValve.notifySSOEmpty(ssoId);
+ }
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ if(doTx)
+ tm.setRollbackOnly();
+ }
+ catch (Exception ignored)
+ {
+ }
+ log.error("caught exception cleaning dead members from SSO " + ssoId, e);
+ }
+ finally
+ {
+ if (doTx)
+ endTransaction();
+ }
+ }
}
} // end TreeCacheSSOClusterManager
More information about the jboss-cvs-commits
mailing list