[jboss-cvs] JBossAS SVN: r107212 - in projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session: sso and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 29 13:34:12 EDT 2010


Author: smarlow at redhat.com
Date: 2010-07-29 13:34:12 -0400 (Thu, 29 Jul 2010)
New Revision: 107212

Added:
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/CacheKey.java
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/SSOClusterManagerImpl.java
Log:
JBCLUSTER-286 SSOClusterManager that works with Infinispan

Added: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/CacheKey.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/CacheKey.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/CacheKey.java	2010-07-29 17:34:12 UTC (rev 107212)
@@ -0,0 +1,64 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2010, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+package org.jboss.ha.web.tomcat.service.session.sso;
+
+/**
+ * Used to represent the different types of entries in the cache store.
+ *
+ * Note that we purposely do not implement equals, which allows many different
+ * entries with the same key to be added.
+ *
+ */
+public class CacheKey
+{
+   public static enum Type { SESSION, CREDENTIAL }
+
+   private final String ssoId;
+   private final Type type;
+
+   public CacheKey(String ssoId, Type type)
+   {
+      this.ssoId = ssoId;
+      this.type = type;
+   }
+
+   public String getSSOID()
+   {
+      return this.ssoId;
+   }
+
+   public Type getType()
+   {
+      return this.type;
+   }
+
+   public int hashCode()
+   {
+      int result = 17;
+      result = 37*result + this.ssoId.hashCode();
+      result = 37*result + this.type.ordinal();
+      return result;
+   }
+
+
+}


Property changes on: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/CacheKey.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/SSOClusterManagerImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/SSOClusterManagerImpl.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/SSOClusterManagerImpl.java	2010-07-29 17:34:12 UTC (rev 107212)
@@ -0,0 +1,1184 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2010, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+package org.jboss.ha.web.tomcat.service.session.sso;
+
+import org.infinispan.atomic.AtomicMap;
+import org.infinispan.atomic.AtomicMapLookup;
+import org.infinispan.config.Configuration;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.Cache;
+import org.infinispan.remoting.transport.Address;
+import org.jboss.ha.framework.server.InfinispanHAPartitionCacheHandler;
+import org.jboss.logging.Logger;
+import org.jboss.util.threadpool.ThreadPool;
+import org.jboss.web.tomcat.service.sso.spi.FullyQualifiedSessionId;
+import org.jboss.web.tomcat.service.sso.spi.SSOClusterManager;
+import org.jboss.web.tomcat.service.sso.spi.SSOCredentials;
+import org.jboss.web.tomcat.service.sso.spi.SSOLocalManager;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * An implementation of SSOClusterManager that uses a TreeCache
+ * to share SSO information between cluster nodes.
+ *
+ * @author Brian E. Stansberry
+ * @author Scott Marlow
+ * @version $Revision$ $Date: 2007-01-12 03:39:24 +0100 (ven., 12 janv. 2007) $
+ */
+// @Listener
+public final class SSOClusterManagerImpl
+   implements SSOClusterManager
+{
+   // -------------------------------------------------------------  Constants
+
+
+   /**
+    * Default global value for the threadPoolName property
+    */
+   public static final String DEFAULT_THREAD_POOL_NAME =
+      "jboss.system:service=ThreadPool";
+
+   /** The default JBoss Cache to use for storing SSO entries */
+   public static final String DEFAULT_CACHE_NAME = "clustered-sso";
+
+   // -------------------------------------------------------  Instance Fields
+
+   /**
+    * SSO id which the thread is currently storing to the cache
+    */
+   private volatile ThreadLocal<String> beingLocallyAdded = new ThreadLocal<String>();
+
+   /**
+    * SSO id which a thread is currently removing from the cache
+    */
+   private volatile ThreadLocal<String> beingLocallyRemoved = new ThreadLocal<String>();
+
+   /**
+    * SSO id which the thread is deregistering due to removal on another node
+    */
+   private volatile ThreadLocal<String> beingRemotelyRemoved = new ThreadLocal<String>();
+
+   /**
+    * String name to use to access the TreeCache
+    */
+   private volatile String cacheName = DEFAULT_CACHE_NAME;
+
+   /**
+    * ObjectName of the TreeCache if legacy JMX integration is used
+    */
+   //private ObjectName cacheObjectName = null;
+
+   /**
+    * The clustered cache that holds the SSO credentials and the sessions.
+    * The CacheKey will indicate which type it is (CacheKey.CREDENTIAL or CacheKey.SESSION);
+    */
+   private volatile Cache<CacheKey, Object> cache = null;
+
+   /**
+    * Transaction Manager
+    */
+   private volatile TransactionManager tm = null;
+
+   private volatile String threadPoolName = DEFAULT_THREAD_POOL_NAME;
+
+   private volatile ThreadPool threadPool;
+
+   /**
+    * The Log-object for this class
+    */
+   private static final Logger log = Logger.getLogger(SSOClusterManagerImpl.class);;
+
+   /**
+    * Whether we are registered as a TreeCacheListener anywhere
+    */
+   private volatile boolean registeredAsListener = false;
+
+   /**
+    * The MBean server we use to access external components (TODO: convert to injection)
+    */
+   private volatile MBeanServer server = null;
+
+   /**
+    * The SingleSignOn for which we are providing cluster support
+    */
+   private volatile SSOLocalManager ssoValve = null;
+
+   /**
+    * Whether we have been started
+    */
+   private volatile boolean started = false;
+
+   /**
+    * Whether we have logged an error due to not having a valid cache
+    */
+   private volatile boolean missingCacheErrorLogged = false;
+
+   /**
+    * Our node's address in the cluster.
+    */
+   private volatile Object localAddress = null;
+
+   /**
+    * The new members of the last view passed to viewChange()
+    * TODO:  figure out how to set this at startup time as
+    * we did with the JBC implementation.  
+    */
+   private final Set<Object> currentView = new HashSet<Object>();;
+
+   /** Mutex lock to ensure only one view change at a time is being processed */
+   private final Object cleanupMutex = new Object();
+
+   private volatile InfinispanHAPartitionCacheHandler<CacheKey, Object> cacheHandler;
+
+   // ----------------------------------------------------------  Constructors
+
+
+   /**
+    * Creates a new SSOClusterManagerImpl that works with the given
+    * MBeanServer.  This constructor is only intended for use in unit testing.
+    */
+   //public SSOClusterManagerImpl(MBeanServer server)
+   //{
+   //   this.server = server;
+   //}
+
+
+   // ------------------------------------------------------------  Properties
+
+   public InfinispanHAPartitionCacheHandler getCacheHandler()
+   {
+      return cacheHandler;
+   }
+
+   public void setCacheHandler(InfinispanHAPartitionCacheHandler cacheHandler)
+   {
+      this.cacheHandler = cacheHandler;
+   }
+
+   public String getCacheName()
+   {
+      return cacheName;
+   }
+
+   public String getThreadPoolName()
+   {
+      return threadPoolName;
+   }
+
+   public boolean isUsingThreadPool()
+   {
+      return threadPool != null;
+   }
+
+   // -----------------------------------------------------  SSOClusterManager
+
+   /**
+    * Notify the cluster of the addition of a Session to an SSO session.
+    *
+    * @param ssoId   the id of the SSO session
+    * @param sessionId id of the Session that has been added
+    */
+   public void addSession(String ssoId, FullyQualifiedSessionId sessionId)
+   {
+      if (ssoId == null || sessionId == null)
+      {
+         return;
+      }
+
+      if (!isCacheAvailable())
+      {
+         return;
+      }
+
+      if (log.isTraceEnabled())
+      {
+         log.trace("addSession(): adding Session " + sessionId.getSessionId() +
+            " to cached session set for SSO " + ssoId);
+      }
+
+      boolean doTx = false;
+      try
+      {
+         // Confirm we have a transaction manager; if not get it from TreeCache
+         // failure to find will throw an IllegalStateException
+         if (tm == null)
+            configureFromCache();
+
+         // Don't do anything if there is already a transaction
+         // context associated with this thread.
+         if(tm.getTransaction() == null)
+            doTx = true;
+
+         if(doTx)
+            tm.begin();
+
+         putSessionInCache(ssoId, sessionId);
+      }
+      catch (Exception e)
+      {
+         try
+         {
+            if(doTx)
+               tm.setRollbackOnly();
+         }
+         catch (Exception ignored)
+         {
+         }
+         log.error("caught exception adding session " + sessionId.getSessionId() +
+            " to SSO id " + ssoId, e);
+      }
+      finally
+      {
+         if (doTx)
+            endTransaction();
+      }
+   }
+
+
+   /**
+    * Gets the SingleSignOn valve for which this object is handling
+    * cluster communications.
+    *
+    * @return the <code>SingleSignOn</code> valve.
+    */
+   public SSOLocalManager getSSOLocalManager()
+   {
+      return ssoValve;
+   }
+
+
+   /**
+    * Sets the SingleSignOn valve for which this object is handling
+    * cluster communications.
+    * <p><b>NOTE:</b> This method must be called before calls can be
+    * made to the other methods of this interface.
+    *
+    * @param localManager a <code>SingleSignOn</code> valve.
+    */
+   public void setSSOLocalManager(SSOLocalManager localManager)
+   {
+      ssoValve = localManager;
+      if (ssoValve != null)
+      {
+         if (server == null)
+         {
+            server = ssoValve.getMBeanServer();
+         }
+         String config = ssoValve.getCacheConfig();
+         if (config != null)
+         {
+            cacheName = config;
+         }
+         String poolName = ssoValve.getThreadPoolName();
+         if (poolName != null)
+         {
+            threadPoolName = poolName;
+         }
+      }
+   }
+
+
+   /**
+    * Notifies the cluster that a single sign on session has been terminated
+    * due to a user logout.
+    *
+    * @param ssoId
+    */
+   public void logout(String ssoId)
+   {
+      if (!isCacheAvailable())
+      {
+         return;
+      }
+
+      // Check whether we are already handling this removal
+      if (ssoId.equals(beingLocallyRemoved.get()))
+      {
+         return;
+      }
+
+      // Add this SSO to our list of in-process local removals so
+      // this.nodeRemoved() will ignore the removal
+      beingLocallyRemoved.set(ssoId);
+
+      if (log.isTraceEnabled())
+      {
+         log.trace("Registering logout of SSO " + ssoId +
+            " in clustered cache");
+      }
+
+      try
+      {
+         removeSSOFromCache(ssoId);
+      }
+      catch (Exception e)
+      {
+         log.error("Exception attempting to logout " +
+            ssoId, e);
+      }
+      finally
+      {
+         beingLocallyRemoved.set(null);
+      }
+   }
+
+
+   public SSOCredentials lookup(String ssoId)
+   {
+      if (!isCacheAvailable())
+      {
+         return null;
+      }
+
+      SSOCredentials credentials = null;
+      // Find the latest credential info from the cluster
+      try
+      {
+         credentials = getCredentialsFromCache(ssoId);
+      }
+      catch (Exception e)
+      {
+         log.error("caught exception looking up SSOCredentials for SSO id " +
+            ssoId, e);
+      }
+      return credentials;
+   }
+
+
+   /**
+    * Notifies the cluster of the creation of a new SSO entry.
+    *
+    * @param ssoId    the id of the SSO session
+    * @param authType the type of authenticator (BASIC, CLIENT-CERT, DIGEST
+    *                 or FORM) used to authenticate the SSO.
+    * @param username the username (if any) used for the authentication
+    * @param password the password (if any) used for the authentication
+    */
+   public void register(String ssoId, String authType,
+      String username, String password)
+   {
+      if (!isCacheAvailable())
+      {
+         return;
+      }
+
+      if (log.isTraceEnabled())
+      {
+         log.trace("Registering SSO " + ssoId + " in clustered cache");
+      }
+
+      storeCredentials(ssoId, authType, username, password);
+   }
+
+
+   /**
+    * Notify the cluster of the removal of a Session from an SSO session.
+    *
+    * @param ssoId   the id of the SSO session
+    * @param sessionId id of the Session that has been removed
+    */
+   public void removeSession(String ssoId, FullyQualifiedSessionId sessionId)
+   {
+      if (ssoId == null || sessionId == null)
+      {
+         return;
+      }
+
+      if (!isCacheAvailable())
+      {
+         return;
+      }
+
+      // Check that this session removal is not due to our own deregistration
+      // of an SSO following receipt of a nodeRemoved() call
+      if (ssoId.equals(beingRemotelyRemoved.get()))
+      {
+         return;
+      }
+
+      if (log.isTraceEnabled())
+      {
+         log.trace("removeSession(): removing Session " + sessionId.getSessionId() +
+            " from cached session set for SSO " + ssoId);
+      }
+
+      boolean doTx = false;
+      boolean removing = false;
+      try
+      {
+         // Confirm we have a transaction manager; if not get it from TreeCache
+         // failure to find will throw an IllegalStateException
+         if (tm == null)
+            configureFromCache();
+
+         // Don't do anything if there is already a transaction
+         // context associated with this thread.
+         if(tm.getTransaction() == null)
+            doTx = true;
+
+         if(doTx)
+            tm.begin();
+
+         // remove session
+         removeSessionFromCache(ssoId, sessionId);
+      }
+      catch (Exception e)
+      {
+         try
+         {
+            if(doTx)
+               tm.setRollbackOnly();
+         }
+         catch (Exception x)
+         {
+         }
+
+         log.error("caught exception removing session " + sessionId.getSessionId() +
+            " from SSO id " + ssoId, e);
+      }
+      finally
+      {
+         try
+         {
+            if (removing)
+            {
+               beingLocallyRemoved.set(null);
+            }
+         }
+         finally
+         {
+            if (doTx)
+               endTransaction();
+         }
+      }
+   }
+
+
+   /**
+    * Notifies the cluster of an update of the security credentials
+    * associated with an SSO session.
+    *
+    * @param ssoId    the id of the SSO session
+    * @param authType the type of authenticator (BASIC, CLIENT-CERT, DIGEST
+    *                 or FORM) used to authenticate the SSO.
+    * @param username the username (if any) used for the authentication
+    * @param password the password (if any) used for the authentication
+    */
+   public void updateCredentials(String ssoId, String authType,
+      String username, String password)
+   {
+      if (!isCacheAvailable())
+      {
+         return;
+      }
+
+      if (log.isTraceEnabled())
+      {
+         log.trace("Updating credentials for SSO " + ssoId +
+            " in clustered cache");
+      }
+
+      storeCredentials(ssoId, authType, username, password);
+   }
+
+
+   // ------------------------------------------------------  CacheListener
+
+   /**
+    * Extracts an SSO session id from the Fqn and uses it in an invocation of
+    * {@link ClusteredSingleSignOn#deregister(String) ClusteredSingleSignOn.deregister(String)}.
+    * <p/>
+    * Ignores invocations resulting from TreeCache changes originated by
+    * this object.
+    *
+    * @param event
+    */
+   @CacheEntryRemoved
+   public void entryRemoved(CacheEntryRemovedEvent event)
+   {
+      if (event.isPre())
+         return;
+
+      CacheKey key = (CacheKey)event.getKey();
+      String ssoId = key.getSSOID();
+
+      if (ssoId == null ||
+         key.getType() != CacheKey.Type.SESSION)
+      {
+         return;
+      }
+
+      // Entire SSO is being removed; i.e. an invalidation
+      // Ignore messages generated by our own logout activity
+      if (!ssoId.equals(beingLocallyRemoved.get()))
+      {
+         handleRemoteInvalidation(ssoId);
+      }
+      // signal the case that we have zero sessions for this ssoId
+      handlePeerRemoval(ssoId);
+   }
+
+   /**
+    * If any nodes have been removed from the view, asynchronously scans
+    * all SSOs looking for and removing sessions owned by the removed node.
+    * Notifies the SSO valve if as a result any SSOs no longer have active
+    * sessions.  If the removed node is the one associated with this object,
+    * does nothing.
+    */
+   @ViewChanged
+   public synchronized void viewChange(ViewChangedEvent event)
+   {
+      log.debug("Received ViewChangedEvent " + event);
+
+      Set<Object> oldMembers = new HashSet<Object>(event.getOldMembers());
+      synchronized (currentView)
+      {
+         currentView.clear();
+         currentView.addAll(event.getNewMembers());
+
+         // If we're not in the view, just exit
+         if (localAddress == null || !currentView.contains(localAddress))
+            return;
+
+         // Remove all the current members from the old set; any left
+         // are the dead members
+         oldMembers.removeAll(currentView);
+      }
+
+      if (oldMembers.size() > 0)
+      {
+         log.debug("Members have been removed; will launch cleanup task. Dead members: " + oldMembers);
+
+         launchSSOCleaner(false);
+      }
+
+   }
+
+
+   /**
+    * Instantiates a DeadMemberCleaner and assigns a thread
+    * to execute the cleanup task.
+    * @param notifyIfEmpty TODO
+    */
+   private void launchSSOCleaner(boolean notifyIfEmpty)
+   {
+      SSOCleanerTask cleaner = new SSOCleanerTask(notifyIfEmpty);
+      if (threadPool != null)
+      {
+         threadPool.run(cleaner);
+      }
+      else
+      {
+         Thread t = new Thread(cleaner, "ClusteredSSOCleaner");
+         t.setDaemon(true);
+         t.start();
+      }
+   }
+
+
+   /**
+    * Handles the notification that an entire SSO has been removed remotely
+    *
+    * @param ssoId id of the removed SSO
+    */
+   private void handleRemoteInvalidation(String ssoId)
+   {
+      beingRemotelyRemoved.set(ssoId);
+
+      try
+      {
+         if (log.isTraceEnabled())
+         {
+            log.trace("received a node removed message for SSO " + ssoId);
+         }
+
+         ssoValve.deregister(ssoId);
+      }
+      finally
+      {
+         beingRemotelyRemoved.set(null);
+      }
+   }
+
+   /**
+    * Checks whether any peers remain for the given SSO; if not
+    * notifies the valve that the SSO is empty.
+    *
+    * @param ssoId
+    */
+   private void handlePeerRemoval(String ssoId)
+   {
+      try
+      {
+         Set<FullyQualifiedSessionId> peers = getSSOSessions(ssoId);
+         if (peers.size() == 0)
+         {
+            ssoValve.notifySSOEmpty(ssoId);
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Caught exception checking if " +  ssoId + " is empty", e);
+      }
+   }
+
+   /**
+    * Extracts an SSO session id from the Fqn and uses it in an invocation of
+    * {@link ClusteredSingleSignOn#update ClusteredSingleSignOn.update()}.
+    * <p/>
+    * Only responds to modifications of nodes whose FQN's final segment is
+    * "credentials".
+    * <p/>
+    * Ignores invocations resulting from TreeCache changes originated by
+    * this object.
+    * <p/>
+    * Ignores invocations for SSO session id's that are not registered
+    * with the local SingleSignOn valve.
+    *
+    * @param event
+    */
+   @CacheEntryModified
+   public void nodeModified(CacheEntryModifiedEvent event)
+   {
+      if (event.isPre() || event.isOriginLocal())
+         return;
+
+      CacheKey key = (CacheKey)event.getKey();
+      if (key.getType() == CacheKey.Type.CREDENTIAL)
+      {
+         handleCredentialUpdate(key.getSSOID(), (SSOCredentials)event.getValue());
+      }
+      else if (key.getType() == CacheKey.Type.SESSION)
+      {
+         handleSessionSetChange(key.getSSOID());
+      }
+   }
+
+   /**
+    * @param ssoId the id of the sso
+    * @param credentials 
+    */
+   private void handleCredentialUpdate(String ssoId,SSOCredentials credentials)
+   {
+      // Ignore invocations that come as a result of our additions
+      if (ssoId.equals(beingLocallyAdded.get()))
+      {
+         return;
+      }
+
+      if (log.isTraceEnabled())
+      {
+         log.trace("received a credentials modified message for SSO " + ssoId);
+      }
+
+      try
+      {
+         if (credentials != null)
+         {
+            ssoValve.remoteUpdate(ssoId, credentials);
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("failed to update credentials for SSO " + ssoId, e);
+      }
+   }
+
+   /**
+    *
+    * @param ssoId single sign-on session id
+    */
+   private void handleSessionSetChange(String ssoId)
+   {
+      // Peers remove their entire node when it's empty, so any
+      // other modification means it's not empty
+      ssoValve.notifySSONotEmpty(ssoId);
+   }
+
+   /**
+    * Prepare for the beginning of active use of the public methods of this
+    * component.  This method should be called before any of the public
+    * methods of this component are utilized.  It should also send a
+    * LifecycleEvent of type START_EVENT to any registered listeners.
+    *
+    * @throws Exception if this component detects a fatal error
+    *                            that prevents this component from being used
+    */
+   public void start() throws Exception
+   {
+      // Validate and update our current component state
+      if (started)
+      {
+         throw new IllegalStateException("SSOClusterManagerImpl already Started");
+      }
+      this.cache = cacheHandler.getCache();
+      initThreadPool();
+      registerAsCacheListener();
+
+      this.tm = this.cache.getAdvancedCache().getTransactionManager();
+
+      started = true;
+   }
+
+
+   /**
+    * Gracefully terminate the active use of the public methods of this
+    * component.  This method should be the last one called on a given
+    * instance of this component.  It should also send a LifecycleEvent
+    * of type STOP_EVENT to any registered listeners.
+    *
+    * @throws Exception if this component detects a fatal error
+    *                            that needs to be reported
+    */
+   public void stop() throws Exception
+   {
+      // Validate and update our current component state
+      if (!started)
+      {
+         throw new IllegalStateException("SSOClusterManagerImpl not Started");
+      }
+
+      removeAsCacheListener();
+
+      started = false;
+   }
+
+
+   // -------------------------------------------------------  Public Methods
+
+   /**
+    * Gets the number of sessions associated with the given SSO. The same
+    * session active on more than one node will count more than once.
+    */
+   public int getSessionCount(String ssoId) throws Exception
+   {
+      return getSSOSessions(ssoId).size();
+   }
+
+   // -------------------------------------------------------  Private Methods
+
+   private Set<String> getSSOIds() throws Exception
+   {
+      Set keys = cache.keySet();
+      Set result = new HashSet<String>();
+      for (Object key: keys)
+      {
+         CacheKey ck = (CacheKey)key;
+         if (ck.getType() == CacheKey.Type.SESSION)
+         {
+            result.add(ck.getSSOID());
+         }
+      }
+
+      return result;
+   }
+
+   /**
+    *
+    * @param ssoId
+    * @return set of FullyQualifiedSessionId
+    * @throws Exception
+    */
+   private Set<FullyQualifiedSessionId> getSSOSessions(String ssoId) throws Exception
+   {
+      CacheKey key = new CacheKey(ssoId,CacheKey.Type.SESSION);
+      AtomicMap m = AtomicMapLookup.getAtomicMap(cache, key, true);
+      return m.keySet();
+   }
+
+   /**
+    * Obtains needed configuration information from the tree cache.
+    * Invokes "getTransactionManager" on the tree cache, caching the
+    * result or throwing an IllegalStateException if one is not found.
+    * Also gets our cluster-wide unique local address from the cache.
+    *
+    * @throws Exception
+    */
+   private void configureFromCache() throws Exception
+   {
+      if(tm == null)
+      {
+         tm = cache.getAdvancedCache().getTransactionManager();
+      }
+
+      if (tm == null)
+      {
+         throw new IllegalStateException("Cache does not have a " +
+                                         "transaction manager; please " +
+                                         "configure a valid " +
+                                         "TransactionManagerLookupClass");
+      }
+
+      // We no longer rule out buddy replication, as it can be valid if
+      // all activity for the SSO is meant to pinned to one server (i.e.
+      // only one session, or all sessions share the same session id cookie)
+      /*
+      if (cache.getConfiguration().getBuddyReplicationConfig() != null
+            && cache.getConfiguration().getBuddyReplicationConfig().isEnabled())
+      {
+         throw new IllegalStateException("Underlying cache is configured for " +
+                                         "buddy replication; use of buddy " +
+                                         "replication with ClusteredSingleSignOn " +
+                                         "is not supported");
+      }
+      */
+      // Find out our address
+      Address address = cache.getAdvancedCache().getRpcManager().getAddress();
+      if (address != null)
+         localAddress = address;
+      else if (Configuration.CacheMode.LOCAL == cache.getConfiguration().getCacheMode())
+         localAddress = "LOCAL";
+      else
+         throw new IllegalStateException("Cannot get local address from cache");
+
+
+      log.debug("Local address is " + localAddress);
+
+   }
+
+   private void endTransaction()
+   {
+      try
+      {
+         if(tm.getTransaction().getStatus() != Status.STATUS_MARKED_ROLLBACK)
+         {
+            tm.commit();
+         }
+         else
+         {
+            tm.rollback();
+         }
+      }
+      catch (Exception e)
+      {
+         log.error(e);
+         throw new RuntimeException("SSOClusterManagerImpl.endTransaction(): ", e);
+      }
+   }
+
+   private MBeanServer getMBeanServer()
+   {
+      if (server == null && ssoValve != null)
+      {
+         server = ssoValve.getMBeanServer();
+      }
+      return server;
+   }
+
+   private boolean isCacheAvailable()
+   {
+      //boolean avail = isCacheAvailable(false);
+      boolean avail = this.cache != null;
+      if (!avail)
+         logMissingCacheError();
+      return avail;
+   }
+
+   /**
+    * Put a new session in the cache
+    * @param ssoId session id
+    * @param fullyQualifiedSessionId  fully qualified session id
+    * @throws Exception
+    */
+   private void putSessionInCache(String ssoId, FullyQualifiedSessionId fullyQualifiedSessionId) throws Exception
+   {
+      CacheKey key = new CacheKey(ssoId,CacheKey.Type.SESSION);
+      AtomicMap m = AtomicMapLookup.getAtomicMap(cache, key, true);
+      m.put(fullyQualifiedSessionId, null);
+   }
+
+   /**
+    * Put or update user credentials in the cache
+    * @param ssoId session id
+    * @param credentials
+    * @throws Exception
+    */
+   private void putCredentialsInCache(String ssoId, SSOCredentials credentials) throws Exception
+   {
+      CacheKey key = new CacheKey(ssoId,CacheKey.Type.CREDENTIAL);
+      cache.put(key, credentials);
+   }
+
+   private SSOCredentials getCredentialsFromCache(String ssoId)
+   {
+      CacheKey key = new CacheKey(ssoId,CacheKey.Type.CREDENTIAL);
+      return (SSOCredentials)cache.get(key);
+   }
+
+   /**
+    * Invokes an operation on the JMX server to register ourself as a
+    * listener on the TreeCache service.
+    *
+    * @throws Exception
+    */
+   private void registerAsCacheListener() throws Exception
+   {
+      cache.addListener(this);
+      registeredAsListener = true;
+   }
+
+
+   /**
+    * Invokes an operation on the JMX server to register ourself as a
+    * listener on the TreeCache service.
+    *
+    * @throws Exception
+    */
+   private void removeAsCacheListener() throws Exception
+   {
+      if (registeredAsListener && cache != null)
+      {
+         cache.removeListener(this);
+         registeredAsListener = false;
+      }
+   }
+
+   /**
+    * Remove the specified session from the cache (used for session.logout)
+    *
+    * @param ssoId the session id representing the shared single session
+    * @throws Exception
+    */
+   private void removeSSOFromCache(String ssoId) throws Exception
+   {
+      CacheKey key = new CacheKey(ssoId,CacheKey.Type.SESSION);
+      cache.remove(key);
+   }
+
+   /**
+    * Remove one of the sessions associated with the users shared single session
+    * @param ssoId ssoId the session id representing the shared single session
+    * @param fullyQualifiedSessionId representing the session to remove
+    * @throws Exception
+    */
+   private void removeSessionFromCache(String ssoId, FullyQualifiedSessionId fullyQualifiedSessionId) throws Exception
+   {
+      CacheKey key = new CacheKey(ssoId,CacheKey.Type.SESSION);
+      AtomicMap m = AtomicMapLookup.getAtomicMap(cache, key, false);
+      if (m != null)
+      {
+         m.remove(fullyQualifiedSessionId);
+      }
+   }
+
+   /**
+    * Stores the given data to the clustered cache.
+    *
+    * @param ssoId    the id of the SSO session
+    * @param authType the type of authenticator (BASIC, CLIENT-CERT, DIGEST
+    *                 or FORM) used to authenticate the SSO.
+    * @param username the username (if any) used for the authentication
+    * @param password the password (if any) used for the authentication
+    */
+   private void storeCredentials(String ssoId, String authType, String username,
+      String password)
+   {
+      SSOCredentials credentials = new SSOCredentials(authType, username, password);
+
+      // Add this SSO to our list of in-process local adds so
+      // this.nodeModified() will ignore the addition
+      beingLocallyAdded.set(ssoId);
+
+      try
+      {
+         putCredentialsInCache(ssoId, credentials);
+      }
+      catch (Exception e)
+      {
+         log.error("Exception attempting to add TreeCache nodes for SSO " +
+            ssoId, e);
+      }
+      finally
+      {
+         beingLocallyAdded.set(null);
+      }
+   }
+
+   private void initThreadPool()
+   {
+      if (threadPoolName != null && getMBeanServer() != null)
+      {
+         try
+         {
+            ObjectName on = new ObjectName(threadPoolName);
+            threadPool = (ThreadPool) server.getAttribute(on, "Instance");
+            log.debug("Using ThreadPool at " + threadPoolName + " to clean dead members");
+         }
+         catch (Exception e)
+         {
+            log.info("Unable to access ThreadPool at " + threadPoolName +
+                     " -- will use individual threads for cleanup work");
+            log.debug("Failure to access ThreadPool due to: " + e);
+         }
+      }
+      else
+      {
+         log.debug("No ThreadPool configured -- will use individual threads for cleanup work");
+      }
+   }
+
+   private boolean isMissingCacheErrorLogged()
+   {
+      return missingCacheErrorLogged;
+   }
+
+   private void setMissingCacheErrorLogged(boolean missingCacheErrorLogged)
+   {
+      this.missingCacheErrorLogged = missingCacheErrorLogged;
+   }
+
+   private void logMissingCacheError()
+   {
+      StringBuffer msg = new StringBuffer("Cannot find Cache using ");
+      msg.append(getCacheName());
+      msg.append(" -- Cache must be started before SSOClusterManagerImpl ");
+      msg.append("can handle requests");
+
+      if (isMissingCacheErrorLogged())
+      {
+         // Just log it as a warning
+         log.warn(msg);
+      }
+      else
+      {
+         log.error(msg);
+         // Set a flag so we don't relog this error over and over
+         setMissingCacheErrorLogged(true);
+      }
+   }
+
+   // ---------------------------------------------------------  Outer Classes
+
+   /**
+    * Runnable that's run when the removal of a node from the cluster has been detected.
+    * Removes any SessionAddress objects associated with dead members from the
+    * session set of each SSO.  Operates locally only so each node can independently clean
+    * its SSOs without concern about replication lock conflicts.
+    */
+   private class SSOCleanerTask implements Runnable
+   {
+      private final boolean checkForEmpty;
+
+      private SSOCleanerTask(boolean checkForEmpty)
+      {
+         this.checkForEmpty = checkForEmpty;
+      }
+
+      public void run()
+      {
+         synchronized (cleanupMutex)
+         {
+            try
+            {
+               // Ensure we have a TransactionManager
+               if (tm == null)
+                  configureFromCache();
+
+               Set<String> ids = getSSOIds();
+               for (String sso : ids)
+               {
+                  cleanSSO(sso);
+               }
+            }
+            catch (Exception e)
+            {
+               log.error("Caught exception cleaning sessions from dead cluster members from SSOs ", e);
+            }
+         }
+      }
+
+      private void cleanSSO(String ssoId)
+      {
+         boolean doTx = false;
+         try
+         {
+            // Don't start tx if there is already one associated with this thread.
+            if(tm.getTransaction() == null)
+               doTx = true;
+
+            if(doTx)
+               tm.begin();
+
+            Set<FullyQualifiedSessionId> peers = getSSOSessions(ssoId);
+            if (peers != null && peers.size() > 0)
+            {
+               for (FullyQualifiedSessionId peer : peers)
+               {
+                  boolean alive = true;
+                  synchronized (currentView)
+                  {
+                     alive = currentView.contains(peer.getHostName());
+                  }
+                  if (!alive)
+                  {
+                     if (log.isTraceEnabled())
+                     {
+                        log.trace("Removing peer " + peer + " from SSO " + ssoId);
+                     }
+
+                     // Remove the peer node, but local-only
+                     // Each cache is responsible for cleaning itself
+                     // TODO:  either make the following local only or only
+                     // do the remove on the master cluster node.
+                     removeSessionFromCache(ssoId, peer);
+                  }
+               }
+            }
+            else if (checkForEmpty)
+            {
+               // SSO has no peers; notify our valve so we can expire it
+               ssoValve.notifySSOEmpty(ssoId);
+            }
+         }
+         catch (Exception e)
+         {
+            try
+            {
+               if(doTx)
+                  tm.setRollbackOnly();
+            }
+            catch (Exception ignored)
+            {
+            }
+            log.error("caught exception cleaning dead members from SSO " + ssoId, e);
+         }
+         finally
+         {
+            if (doTx)
+               endTransaction();
+         }
+      }
+   }
+
+}


Property changes on: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/sso/SSOClusterManagerImpl.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the jboss-cvs-commits mailing list