[Jboss-cvs] JBossAS SVN: r56542 - trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Sep 4 00:15:05 EDT 2006
Author: bstansberry at jboss.com
Date: 2006-09-04 00:15:00 -0400 (Mon, 04 Sep 2006)
New Revision: 56542
Removed:
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/LocalSessionActivity.java
Modified:
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/AttributeBasedClusteredSession.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/CacheListener.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/ClusteredSession.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/ClusteredSessionValve.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/FieldBasedClusteredSession.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/InstantSnapshotManager.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/IntervalSnapshotManager.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheClusteredSession.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheManager.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheService.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SessionBasedClusteredSession.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SessionReplicationContext.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SnapshotManager.java
trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/Util.java
Log:
Merge from Branch_4_0
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/AttributeBasedClusteredSession.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/AttributeBasedClusteredSession.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/AttributeBasedClusteredSession.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.Map.Entry;
/**
* Implementation of a clustered session for the JBossCacheManager. The replication granularity
@@ -74,14 +75,6 @@
{
super(manager);
}
-
- /**
- * Used only for externalization.
- */
- private AttributeBasedClusteredSession()
- {
- super();
- }
// ----------------------------------------------- Overridden Public Methods
@@ -136,23 +129,31 @@
if (getSessionAttributesDirty())
{
// Go thru the modified attr list first
- Set set = attrModifiedMap_.keySet();
- Iterator it = set.iterator();
- while (it.hasNext())
+ int modCount = attrModifiedMap_.size();
+ if (modCount == 1)
{
- Object key = it.next();
- proxy_.putAttribute(realId, (String) key, attrModifiedMap_.get(key));
+ for (Iterator it = attrModifiedMap_.entrySet().iterator(); it.hasNext(); )
+ {
+ Map.Entry entry = (Entry) it.next();
+ proxy_.putAttribute(realId, (String) entry.getKey(), entry.getValue());
+ }
}
-
- // Go thru the remove attr list
- set = attrRemovedMap_.keySet();
- it = set.iterator();
- while (it.hasNext())
+ else if (modCount > 0)
{
- Object key = it.next();
- proxy_.removeAttribute(realId, (String) key);
+ // It's more efficient to write a map than 2 method calls,
+ // plus it reduces the number of CacheListener notifications
+ proxy_.putAttribute(realId, attrModifiedMap_);
}
-
+
+ // Go thru the remove attr list
+ if (attrRemovedMap_.size() > 0)
+ {
+ for (Iterator it = attrRemovedMap_.keySet().iterator(); it.hasNext(); )
+ {
+ proxy_.removeAttribute(realId, (String) it.next());
+ }
+ }
+
clearAttrChangedMaps();
}
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/CacheListener.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/CacheListener.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/CacheListener.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -24,6 +24,7 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.AbstractTreeCacheListener;
+import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.logging.Logger;
import org.jboss.metadata.WebMetaData;
import org.jgroups.View;
@@ -31,6 +32,12 @@
public class CacheListener extends AbstractTreeCacheListener
{
+ // Element within an FQN that is JSESSION
+ private static final int JSESSION_FQN_INDEX = 0;
+ // Element within an FQN that is the hostname
+ private static final int HOSTNAME_FQN_INDEX = 1;
+ // ELEMENT within an FQN this is the webapp name
+ private static final int WEBAPP_FQN_INDEX = 2;
// Element within an FQN that is the session id
private static final int SESSION_ID_FQN_INDEX = 3;
// Size of an Fqn that points to the root of a session
@@ -41,35 +48,32 @@
private static final int POJO_KEY_FQN_INDEX = POJO_ATTRIBUTE_FQN_INDEX + 1;
// Size of an Fqn that points to the root of a session
private static final int POJO_KEY_FQN_SIZE = POJO_KEY_FQN_INDEX + 1;
- // The root of the buddy backup subtree
- private static final String BUDDY_BACKUP = "_BUDDY_BACKUP_";
+ // The index of the root of a buddy backup subtree
+ private static final int BUDDY_BACKUP_ROOT_OWNER_INDEX = BuddyManager.BUDDY_BACKUP_SUBTREE_FQN.size();
+ // The size of the root of a buddy backup subtree (including owner)
+ private static final int BUDDY_BACKUP_ROOT_OWNER_SIZE = BUDDY_BACKUP_ROOT_OWNER_INDEX + 1;
- protected static Logger log_ = Logger.getLogger(CacheListener.class);
- protected JBossCacheWrapper cacheWrapper_;
- protected JBossCacheManager manager_;
- protected Fqn subtreeRoot_;
- protected boolean fieldBased_;
+ private static Logger log_ = Logger.getLogger(CacheListener.class);
+ private JBossCacheWrapper cacheWrapper_;
+ private JBossCacheManager manager_;
+ private String webapp_;
+ private String hostname_;
+ private boolean fieldBased_;
+ // When trying to ignore unwanted notifications, do we check for local activity first?
+ private boolean disdainLocalActivity_;
- CacheListener(JBossCacheWrapper wrapper, JBossCacheManager manager, Fqn subtreeRoot)
+ CacheListener(JBossCacheWrapper wrapper, JBossCacheManager manager, String hostname, String webapp)
{
cacheWrapper_ = wrapper;
manager_ = manager;
- subtreeRoot_ = subtreeRoot;
- fieldBased_ = manager_.getReplicationGranularity() == WebMetaData.REPLICATION_GRANULARITY_FIELD;
+ hostname_ = hostname;
+ webapp_ = webapp;
+ int granularity = manager_.getReplicationGranularity();
+ fieldBased_ = (granularity == WebMetaData.REPLICATION_GRANULARITY_FIELD);
+ // TODO decide if disdaining local activity is always good for REPL_ASYNC
+ disdainLocalActivity_ = (granularity == WebMetaData.REPLICATION_GRANULARITY_SESSION);; // for now
}
- /**
- * If this event is emitted by myself or is not for the web app
- * I'm managing, then we can skip.
- * @param fqn
- * @return
- */
- protected boolean needToHandle(Fqn fqn)
- {
- return (LocalSessionActivity.isLocallyActive(getIdFromFqn(fqn)) == false
- && fqn.isChildOf(subtreeRoot_));
- }
-
// --------------- TreeCacheListener methods ------------------------------------
public void nodeCreated(Fqn fqn)
@@ -79,31 +83,27 @@
public void nodeRemoved(Fqn fqn)
{
- ParsedBuddyFqn pfqn = new ParsedBuddyFqn(fqn);
- fqn = pfqn.noBuddy;
- if (fqn == null)
+ // Ignore our own activity
+ if (SessionReplicationContext.isLocallyActive())
return;
- if (fqn.size() == POJO_KEY_FQN_SIZE)
+ boolean isBuddy = isBuddyFqn(fqn);
+
+ // Potential removal of a Pojo where we need to unregister as an Observer.
+ if (fieldBased_
+ && isFqnPojoKeySized(fqn, isBuddy)
+ && isFqnForOurWebapp(fqn, isBuddy))
{
- // Potential removal of a Pojo where we need to unregister
- // as an Observer.
- if (fieldBased_
- && needToHandle(fqn)
- && JBossCacheService.ATTRIBUTE.equals(fqn.get(POJO_ATTRIBUTE_FQN_INDEX))
- && fqn.size() > POJO_KEY_FQN_INDEX)
- {
- String sessId = getIdFromFqn(fqn);
- String attrKey = (String) fqn.get(POJO_KEY_FQN_INDEX);
-
- manager_.processRemoteAttributeRemoval(sessId, attrKey);
- }
+ String sessId = getIdFromFqn(fqn, isBuddy);
+ String attrKey = getPojoKeyFromFqn(fqn, isBuddy);
+ manager_.processRemoteAttributeRemoval(sessId, attrKey);
}
- else if(fqn.size() == SESSION_FQN_SIZE && needToHandle(fqn))
+ else if(isFqnSessionRootSized(fqn, isBuddy)
+ && isFqnForOurWebapp(fqn, isBuddy))
{
// A session has been invalidated from another node;
// need to inform manager
- String sessId = getIdFromFqn(fqn);
+ String sessId = getIdFromFqn(fqn, isBuddy);
manager_.processRemoteInvalidation(sessId);
}
}
@@ -121,32 +121,45 @@
nodeDirty(fqn);
}
- protected void nodeDirty(Fqn fqn)
+ private void nodeDirty(Fqn fqn)
{
- // Parse the Fqn so we if it has a buddy backup region in it
- // we can just deal with the part below that
- ParsedBuddyFqn pfqn = new ParsedBuddyFqn(fqn);
- Fqn noBuddy = pfqn.noBuddy;
+ // If checking for local activity has a higher likelihood of
+ // catching unwanted notifications than checking fqn size,
+ // do it first
+ if (disdainLocalActivity_)
+ {
+ if (SessionReplicationContext.isLocallyActive())
+ return;
+ }
- // Check if we need to handle this event. If this is from myself or not for
- // my webapp, then I should skip it.
- // We only deal with events for the root node of a session,
- // so skip all others
- if(noBuddy == null || noBuddy.size() != SESSION_FQN_SIZE || !needToHandle(noBuddy)) return;
+ boolean isBuddy = isBuddyFqn(fqn);
+ // We only care if there is a chance this is for a session root
+ if (!isFqnSessionRootSized(fqn, isBuddy))
+ return;
+
+ if (!disdainLocalActivity_)
+ {
+ if (SessionReplicationContext.isLocallyActive())
+ return;
+ }
+
+ // We only care if this is for our webapp
+ if (!isFqnForOurWebapp(fqn, isBuddy))
+ return;
- // Query if we have version value. Use the full Fqn, not just the
- // "no buddy" part.
- // If we have a version value, we compare the version. Invalidate if necessary.
+ // Query if we have version value in the distributed cache.
+ // If we have a version value, compare the version and invalidate if necessary.
Integer version = (Integer)cacheWrapper_.get(fqn, JBossCacheService.VERSION_KEY);
if(version != null)
{
- String realId = getIdFromFqn(noBuddy);
-
+ String realId = getIdFromFqn(fqn, isBuddy);
+
ClusteredSession session = manager_.findLocalSession(realId);
if (session == null)
{
+ String owner = isBuddy ? getBuddyOwner(fqn) : null;
// Notify the manager that an unloaded session has been updated
- manager_.unloadedSessionChanged(realId, pfqn.owner);
+ manager_.unloadedSessionChanged(realId, owner);
}
else if (session.isNewData(version.intValue()))
{
@@ -159,11 +172,18 @@
version.intValue());
}
}
- else
+ else if (!isBuddy)
{
log_.warn("Possible concurrency problem: Replicated version id " +
version + " matches in-memory version for session " + realId);
}
+ /*else
+ {
+ We have a local session but got a modification for the buddy tree.
+ This means another node is in the process of taking over the session;
+ we don't worry about it
+ }
+ */
}
else
{
@@ -203,61 +223,87 @@
// loading the session from the distrubted store.
}
+ // FIXME why would there be a notification of passivation on another node?
public void nodePassivate(Fqn fqn, boolean pre)
{
- // Parse the Fqn so if it has a buddy backup region in it
- // we can just deal with the part below that
- ParsedBuddyFqn pfqn = new ParsedBuddyFqn(fqn);
- Fqn noBuddy = pfqn.noBuddy;
+ if (!pre || SessionReplicationContext.isLocallyActive())
+ return;
- // Check if we need to handle this event. If this is from myself or not for
- // my webapp, then I should skip it.
+ boolean isBuddy = isBuddyFqn(fqn);
+
// We only deal with events for the root node of a session,
// so skip all others
- if(noBuddy == null || noBuddy.size() != SESSION_FQN_SIZE || !needToHandle(noBuddy)) return;
-
- if(pre)
+ if(isFqnSessionRootSized(fqn, isBuddy)
+ && isFqnForOurWebapp(fqn, isBuddy))
{
// A session has been passivated on another node;
// need to inform the manager
- String realId = getIdFromFqn(noBuddy);
- // process the session passivation
- manager_.processSessionPassivation(realId, pfqn.owner);
+ String realId = getIdFromFqn(fqn, isBuddy);
+ String owner = isBuddy ? getBuddyOwner(fqn) : null;
+ manager_.processSessionPassivation(realId, owner);
}
}
+
+ private boolean isFqnForOurWebapp(Fqn fqn, boolean isBuddy)
+ {
+ try
+ {
+ if (webapp_.equals(fqn.get(isBuddy ? BUDDY_BACKUP_ROOT_OWNER_SIZE + WEBAPP_FQN_INDEX : WEBAPP_FQN_INDEX))
+ && hostname_.equals(fqn.get(isBuddy ? BUDDY_BACKUP_ROOT_OWNER_SIZE + HOSTNAME_FQN_INDEX : HOSTNAME_FQN_INDEX))
+ && JBossCacheService.SESSION.equals(fqn.get(isBuddy ? BUDDY_BACKUP_ROOT_OWNER_SIZE + JSESSION_FQN_INDEX : JSESSION_FQN_INDEX)))
+ return true;
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // can't be ours; too small; just fall through
+ }
- protected String getIdFromFqn(Fqn fqn)
+ return false;
+ }
+
+ private static boolean isFqnSessionRootSized(Fqn fqn, boolean isBuddy)
{
- return (String)fqn.get(SESSION_ID_FQN_INDEX);
+ return fqn.size() == (isBuddy ? BUDDY_BACKUP_ROOT_OWNER_SIZE + SESSION_FQN_SIZE : SESSION_FQN_SIZE);
}
- private class ParsedBuddyFqn
+ private static boolean isFqnPojoKeySized(Fqn fqn, boolean isBuddy)
{
- Fqn raw;
- Fqn noBuddy;
- String owner;
-
- ParsedBuddyFqn(Fqn raw)
+ return fqn.size() == (isBuddy ? BUDDY_BACKUP_ROOT_OWNER_SIZE + POJO_KEY_FQN_SIZE : POJO_KEY_FQN_SIZE);
+ }
+
+ private static String getIdFromFqn(Fqn fqn, boolean isBuddy)
+ {
+ return (String)fqn.get(isBuddy ? BUDDY_BACKUP_ROOT_OWNER_SIZE + SESSION_ID_FQN_INDEX : SESSION_ID_FQN_INDEX);
+ }
+
+ private static String getPojoKeyFromFqn(Fqn fqn, boolean isBuddy)
+ {
+ return (String) fqn.get(isBuddy ? BUDDY_BACKUP_ROOT_OWNER_SIZE + POJO_KEY_FQN_INDEX: POJO_KEY_FQN_INDEX);
+ }
+
+ private static boolean isBuddyFqn(Fqn fqn)
+ {
+ try
{
- this.raw = raw;
- if (raw != null)
- {
- if (BUDDY_BACKUP.equals(raw.get(0)))
- {
- if (raw.size() > 2)
- {
- owner = (String) raw.get(1);
- noBuddy = raw.getFqnChild(2, raw.size());
- if (log_.isTraceEnabled())
- log_.trace(raw + " parsed to " + noBuddy + " with owner " + owner);
- }
- }
- else
- {
- noBuddy = raw;
- }
- }
+ return BuddyManager.BUDDY_BACKUP_SUBTREE.equals(fqn.get(0));
}
+ catch (IndexOutOfBoundsException e)
+ {
+ // Can only happen if fqn is ROOT, and we shouldn't get
+ // notifications for ROOT.
+ // If it does, just means it's not a buddy
+ return false;
+ }
}
+
+ /**
+ * Extracts the owner portion of an buddy subtree Fqn.
+ *
+ * @param fqn An Fqn that is a child of the buddy backup root node.
+ */
+ private static String getBuddyOwner(Fqn fqn)
+ {
+ return (String) fqn.get(BUDDY_BACKUP_ROOT_OWNER_INDEX);
+ }
}
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/ClusteredSession.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/ClusteredSession.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/ClusteredSession.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -72,7 +72,7 @@
* Descriptive information describing this Session implementation.
*/
protected static final String info = "ClusteredSession/1.0";
-
+
/**
* Set of attribute names which are not allowed to be replicated/persisted.
*/
@@ -202,14 +202,6 @@
this.firstAccess = true;
calcMaxUnreplicatedInterval();
}
-
- /**
- * Used only for externalization.
- */
- protected ClusteredSession()
- {
- super(null);
- }
/**
* Check to see if the session data is still valid. Outdated here means
@@ -494,7 +486,7 @@
// Tomcat marks the session as non new, but that's not really
// accurate per SRV.7.2, as the second request hasn't come in yet
// So, we fix that
- isNew = false;
+ isNew = true;
}
}
@@ -1067,7 +1059,13 @@
public void setNew(boolean isNew)
{
super.setNew(isNew);
- sessionMetadataDirty();
+ // Don't replicate metadata just 'cause its the second request
+ // The only effect of this is if someone besides a request
+ // deserializes metadata from the distributed cache, this
+ // field may be out of date.
+ // If a request accesses the session, the access() call will
+ // set isNew=false, so the request will see the correct value
+ // sessionMetadataDirty();
}
public void setValid(boolean isValid)
@@ -1395,7 +1393,7 @@
}
}
-
+
/**
* Exists in this class solely to act as an API-compatible bridge to the
* deprecated {@link #removeJBossInternalAttribute(String)}.
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/ClusteredSessionValve.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/ClusteredSessionValve.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/ClusteredSessionValve.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -22,6 +22,9 @@
package org.jboss.web.tomcat.tc6.session;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
import javax.servlet.ServletException;
import org.apache.catalina.*;
@@ -75,30 +78,48 @@
*/
public void invoke(Request request, Response response) throws IOException, ServletException
{
- // Store the request and response object for the clustering code that has no direct access to
- // this objects
- SessionReplicationContext.initContext(request, response);
-
+ // Initialize the context and store the request and response objects
+ // for any clustering code that has no direct access to these objects
+ SessionReplicationContext.enterWebapp(request, response, true);
try
{
// let the servlet invocation go through
getNext().invoke(request, response);
-
+ }
+ finally // We replicate no matter what
+ {
// --> We are now after the servlet invocation
-
- SessionReplicationContext[] sessions = SessionReplicationContext.getActiveSessions();
-
- for (int i = 0; i < sessions.length; i++)
+ try
{
- sessions[i].getSnapshot().snapshot(sessions[i].getSessionId());
+ SessionReplicationContext ctx = SessionReplicationContext.exitWebapp();
+
+ if (ctx.getSoleSnapshotManager() != null)
+ {
+ ctx.getSoleSnapshotManager().snapshot(ctx.getSoleSession());
+ }
+ else
+ {
+ // Cross-context request touched multiple sesssions;
+ // need to replicate them all
+ Map sessions = ctx.getCrossContextSessions();
+ if (sessions != null && sessions.size() > 0)
+ {
+ for (Iterator iter = sessions.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ ((SnapshotManager) entry.getValue()).snapshot((ClusteredSession) entry.getKey());
+ }
+ }
+ }
}
+ finally
+ {
+ SessionReplicationContext.finishCacheActivity();
+ }
+
}
- finally
- {
- SessionReplicationContext.clearContext();
- }
}
// Lifecylce-interface
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/FieldBasedClusteredSession.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/FieldBasedClusteredSession.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/FieldBasedClusteredSession.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -24,11 +24,11 @@
import org.jboss.aspects.patterns.observable.Observer;
import org.jboss.aspects.patterns.observable.Subject;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -85,14 +85,6 @@
{
super(manager);
}
-
- /**
- * Used only for externalization.
- */
- private FieldBasedClusteredSession()
- {
- super();
- }
// ----------------------------------------------- Overridden Public Methods
@@ -333,16 +325,16 @@
if(value != null)
{
// Special case for Collection classes.
- if( value instanceof Map || value instanceof List || value instanceof Set)
+ if( value instanceof Map || value instanceof Collection)
{
// We need to obtain the proxy first.
value = proxy_.getPojo(realId, key);
}
-
+
// Need to use obj since it can return as a proxy.
proxy_.addObserver(this, value);
}
-
+
// Only mark session dirty if we can replicate the attribute
sessionAttributesDirty();
}
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/InstantSnapshotManager.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/InstantSnapshotManager.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/InstantSnapshotManager.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -21,7 +21,6 @@
*/
package org.jboss.web.tomcat.tc6.session;
-import org.apache.catalina.Session;
/**
* A concrete implementation of the snapshot manager interface
@@ -41,20 +40,19 @@
/**
* Instant replication of the modified session
*/
- public void snapshot(String id)
+ public void snapshot(ClusteredSession session)
{
- try
+ if (session != null)
{
- // find the session that has been modified
- AbstractJBossManager mgr = getManager();
- Session session = mgr.findSession(id);
- if (session != null)
- mgr.storeSession(session);
+ try
+ {
+ getManager().storeSession(session);
+ }
+ catch (Exception e)
+ {
+ getLog().warn("Failed to replicate session " + session.getIdInternal(), e);
+ }
}
- catch (Exception e)
- {
- getLog().warn("Failed to replicate sessionID:" + id, e);
- }
}
public void start()
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/IntervalSnapshotManager.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/IntervalSnapshotManager.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/IntervalSnapshotManager.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -24,7 +24,6 @@
import java.util.LinkedHashSet;
import java.util.Set;
-import org.apache.catalina.Session;
import org.jboss.logging.Logger;
/**
@@ -48,6 +47,9 @@
// the distribute thread
private Thread thread = null;
+ // Is session processing allowed?
+ private boolean processingAllowed = false;
+
// has the thread finished?
private boolean threadDone = false;
@@ -65,18 +67,19 @@
/**
* Store the modified session in a hashmap for the distributor thread
*/
- public void snapshot(String id)
+ public void snapshot(ClusteredSession session)
{
try
- {
+ {
+ // Don't hold a ref to the session for a long time
synchronized (sessions)
{
- sessions.add(id);
+ sessions.add(session);
}
}
catch (Exception e)
{
- log.warn("Failed to replicate sessionID:" + id, e);
+ log.error("Failed to queue session " + session + " for replication", e);
}
}
@@ -85,26 +88,28 @@
*/
protected void processSessions()
{
- String[] ids = null;
+ ClusteredSession[] toProcess = null;
synchronized (sessions)
{
- ids = new String[sessions.size()];
- ids = (String[]) sessions.toArray(ids);
+ toProcess = new ClusteredSession[sessions.size()];
+ toProcess = (ClusteredSession[]) sessions.toArray(toProcess);
sessions.clear();
}
AbstractJBossManager mgr = getManager();
- for (int i = 0; i < ids.length; i++)
+ for (int i = 0; i < toProcess.length; i++)
{
+ // Confirm we haven't been stopped
+ if (!processingAllowed)
+ break;
+
try
{
- Session session = mgr.findSession(ids[i]);
- if (session != null)
- mgr.storeSession(session);
+ mgr.storeSession(toProcess[i]);
}
catch (Exception e)
{
- getLog().error("Caught exception processing session " + ids[i], e);
+ getLog().error("Caught exception processing session " + toProcess[i].getRealId(), e);
}
}
}
@@ -114,6 +119,7 @@
*/
public void start()
{
+ processingAllowed = true;
startThread();
}
@@ -122,6 +128,7 @@
*/
public void stop()
{
+ processingAllowed = false;
stopThread();
synchronized (sessions)
{
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheClusteredSession.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheClusteredSession.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheClusteredSession.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -48,14 +48,6 @@
super(manager, manager.getUseJK());
establishProxy();
}
-
- /**
- * Used only for externalization.
- */
- protected JBossCacheClusteredSession()
- {
- super();
- }
/**
* Initialize fields marked as transient after loading this session
@@ -121,9 +113,9 @@
public synchronized void processSessionRepl()
{
// Replicate the session.
- if (log.isDebugEnabled())
+ if (log.isTraceEnabled())
{
- log.debug("processSessionRepl(): session is dirty. Will increment " +
+ log.trace("processSessionRepl(): session is dirty. Will increment " +
"version from: " + getVersion() + " and replicate.");
}
this.incrementVersion();
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheManager.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheManager.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheManager.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -465,8 +465,7 @@
throw new IllegalStateException("Manager not started");
}
- if (log_.isDebugEnabled())
- log_.debug("Stopping");
+ log_.debug("Stopping");
resetStats();
@@ -644,9 +643,7 @@
activeCounter_++;
// Add this session to the set of those potentially needing replication
- // We use the realId here
- String realId = getRealId(sessionId);
- SessionReplicationContext.addActiveSession(realId, snapshotManager_);
+ SessionReplicationContext.bindSession(session, snapshotManager_);
return session;
}
@@ -654,15 +651,15 @@
public boolean storeSession(Session baseSession)
{
boolean stored = false;
- if(baseSession != null)
+ if(baseSession != null && started_)
{
ClusteredSession session = (ClusteredSession) baseSession;
synchronized (session)
{
- if (log_.isDebugEnabled())
+ if (log_.isTraceEnabled())
{
- log_.debug("check to see if needs to store and replicate " +
+ log_.trace("check to see if needs to store and replicate " +
"session with id " + session.getIdInternal());
}
@@ -756,10 +753,7 @@
private ClusteredSession createEmptyClusteredSession()
{
- if (log_.isDebugEnabled())
- {
- log_.debug("Creating an empty ClusteredSession");
- }
+ log_.debug("Creating an empty ClusteredSession");
ClusteredSession session = null;
switch (replicationGranularity_)
@@ -807,8 +801,11 @@
// session from the other nodes to be gravitated, thus resuscitating
// the session.
if (session == null
- && !SessionReplicationContext.isSessionActive(realId, snapshotManager_))
+ && !SessionReplicationContext.isSessionBoundAndExpired(realId, snapshotManager_))
{
+ if (log_.isTraceEnabled())
+ log_.trace("Checking for session " + realId + " in the distributed cache");
+
session = loadSession(realId);
if (session != null)
{
@@ -819,14 +816,17 @@
}
else if (session != null && session.isOutdated())
{
+ if (log_.isTraceEnabled())
+ log_.trace("Updating session " + realId + " from the distributed cache");
+
// Need to update it from the cache
loadSession(realId);
}
-
+
if (session != null)
{
// Add this session to the set of those potentially needing replication
- SessionReplicationContext.addActiveSession(realId, snapshotManager_);
+ SessionReplicationContext.bindSession(session, snapshotManager_);
}
return session;
@@ -896,8 +896,7 @@
*/
public ClusteredSession findLocalSession(String realId)
{
- ClusteredSession session = (ClusteredSession) sessions_.get(realId);
- return session;
+ return (ClusteredSession) sessions_.get(realId);
}
/**
@@ -921,17 +920,21 @@
try {
// Ignore any cache notifications that our own work generates
- LocalSessionActivity.startLocalActivity(realId);
+ SessionReplicationContext.startCacheActivity();
clusterSess.removeMyself();
}
finally {
- LocalSessionActivity.finishLocalActivity();
+ SessionReplicationContext.finishCacheActivity();
+
+ // We don't want to replicate this session at the end
+ // of the request; the removal process took care of that
+ SessionReplicationContext.sessionExpired(clusterSess, realId, snapshotManager_);
+
+ sessions_.remove(realId);
+ stats_.removeStats(realId);
+ activeCounter_--;
}
-
- sessions_.remove(realId);
- stats_.removeStats(realId);
}
- activeCounter_--;
}
/**
@@ -952,16 +955,30 @@
{
log_.debug("Removing session from local store with id: " + realId);
}
- clusterSess.removeMyselfLocal();
- sessions_.remove(realId);
- stats_.removeStats(realId);
+
+ try {
+ // Ignore any cache notifications that our own work generates
+ SessionReplicationContext.startCacheActivity();
+ clusterSess.removeMyselfLocal();
+ }
+ finally
+ {
+ SessionReplicationContext.finishCacheActivity();
+
+ // We don't want to replicate this session at the end
+ // of the request; the removal process took care of that
+ SessionReplicationContext.sessionExpired(clusterSess, realId, snapshotManager_);
+
+ sessions_.remove(realId);
+ stats_.removeStats(realId);
+
+ // Update counters.
+ // It's a bit ad-hoc to do it here. But since we currently call
+ // this when session expires ...
+ expiredCounter_++;
+ activeCounter_--;
+ }
}
-
- // Update counters.
- // It's a bit ad-hoc to do it here. But since we currently call
- // this when session expires ...
- expiredCounter_++;
- activeCounter_--;
}
/**
@@ -1040,7 +1057,7 @@
// Ignore cache notifications we may generate for this
// session if data gravitation occurs.
- LocalSessionActivity.startLocalActivity(realId);
+ SessionReplicationContext.startCacheActivity();
session = proxy_.loadSession(realId, session);
}
@@ -1070,7 +1087,7 @@
endTransaction(realId);
}
finally {
- LocalSessionActivity.finishLocalActivity();
+ SessionReplicationContext.finishCacheActivity();
}
}
@@ -1110,7 +1127,6 @@
// for a single put
boolean notSession = (replicationGranularity_ != WebMetaData.REPLICATION_GRANULARITY_SESSION);
boolean doTx = false;
- String realId = session.getRealId();
try
{
// We need transaction so all the replication are sent in batch.
@@ -1127,7 +1143,7 @@
// at this level because we don't want to resume handling
// notifications until any compensating changes resulting
// from a tx rollback are done.
- LocalSessionActivity.startLocalActivity(realId);
+ SessionReplicationContext.startCacheActivity();
session.processSessionRepl();
}
@@ -1161,7 +1177,7 @@
endTransaction(session.getId());
}
finally {
- LocalSessionActivity.finishLocalActivity();
+ SessionReplicationContext.finishCacheActivity();
}
}
}
@@ -1217,11 +1233,11 @@
return;
}
- if (log_.isDebugEnabled())
+ if (log_.isTraceEnabled())
{
- log_.debug("processExpires():max active sessions = " + maxActive_);
- log_.debug("processExpires(): passivation mode = " + isPassivationEnabled());
- log_.debug("processExpires(): Looking for sessions that have expired ...");
+ log_.trace("processExpires():max active sessions = " + maxActive_);
+ log_.trace("processExpires(): passivation mode = " + isPassivationEnabled());
+ log_.trace("processExpires(): Looking for sessions that have expired ...");
}
try
{
@@ -1382,7 +1398,11 @@
{
// We weren't managing the session anyway. But remove it
// from the list of cached sessions we haven't loaded
- unloadedSessions_.remove(realId);
+ if (unloadedSessions_.remove(realId) != null)
+ {
+ if (log_.isTraceEnabled())
+ log_.trace("Removed entry for session " + realId + " from unloaded session map");
+ }
}
else
{
@@ -1438,12 +1458,12 @@
try {
// Tell the proxy to ignore cache notifications we are about
// to generate for this session.
- LocalSessionActivity.startLocalActivity(realId);
+ SessionReplicationContext.startCacheActivity();
session.passivate();
proxy_.evictSession(realId);
}
finally {
- LocalSessionActivity.finishLocalActivity();
+ SessionReplicationContext.finishCacheActivity();
}
Object obj = unloadedSessions_.put(realId,
@@ -1546,10 +1566,7 @@
// Add SnapshotValve and, if needed, JvmRouteValve and batch repl valve
installValves();
- if (log_.isDebugEnabled())
- {
- log_.debug("start(): JBossCacheService started");
- }
+ log_.debug("start(): JBossCacheService started");
}
catch (Exception e)
{
@@ -1662,10 +1679,7 @@
// Notify our interested LifecycleListeners
lifecycle_.fireLifecycleEvent(AFTER_START_EVENT, this);
- if (log_.isDebugEnabled())
- {
- log_.debug("start(): JBossCacheService started");
- }
+ log_.debug("start(): JBossCacheService started");
}
catch (Exception e)
{
@@ -1859,8 +1873,7 @@
// when it's made. So we catch the exception and fall back
// to adding the valve directly.
// TODO consider skipping adding via JMX and just do it directly
- if (log_.isDebugEnabled())
- log_.debug("Caught exception installing valve to Context", e);
+ log_.debug("Caught exception installing valve to Context", e);
}
}
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheService.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheService.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/JBossCacheService.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -21,14 +21,13 @@
*/
package org.jboss.web.tomcat.tc6.session;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -36,7 +35,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Stack;
import java.util.StringTokenizer;
import java.util.WeakHashMap;
@@ -48,10 +46,12 @@
import org.jboss.aspects.patterns.observable.Subject;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
-import org.jboss.cache.aop.PojoCache;
import org.jboss.cache.aop.PojoCacheMBean;
+import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.transaction.BatchModeTransactionManager;
import org.jboss.invocation.MarshalledValue;
+import org.jboss.invocation.MarshalledValueInputStream;
+import org.jboss.invocation.MarshalledValueOutputStream;
import org.jboss.logging.Logger;
import org.jboss.mx.util.MBeanProxyExt;
import org.jboss.serial.io.MarshalledObject;
@@ -67,33 +67,12 @@
* </ul>
*/
public class JBossCacheService
-{
- // Types that are considered "primitive".
- private static final Set immediates =
- new HashSet(Arrays.asList(new Object[]{
- String.class,
- Boolean.class,
- Double.class,
- Float.class,
- Integer.class,
- Long.class,
- Short.class,
- Character.class,
- Boolean.TYPE,
- Double.TYPE,
- Float.TYPE,
- Integer.TYPE,
- Long.TYPE,
- Short.TYPE,
- Character.TYPE,
- Class.class}));
-
+{
protected static Logger log_ = Logger.getLogger(JBossCacheService.class);
- public static final String BUDDY_BACKUP = "_BUDDY_BACKUP_";
- public static final Fqn BUDDY_BACKUP_FQN = new Fqn("_BUDDY_BACKUP_");
+ public static final String BUDDY_BACKUP = BuddyManager.BUDDY_BACKUP_SUBTREE;
+ public static final Fqn BUDDY_BACKUP_FQN = BuddyManager.BUDDY_BACKUP_SUBTREE_FQN;
public static final String SESSION = "JSESSION";
public static final String ATTRIBUTE = "ATTRIBUTE";
- public static final String KEY = "ATRR_KEY";
// Needed for cache invalidation
static final String VERSION_KEY = "VERSION";
static final String FQN_DELIMITER = "/";
@@ -128,21 +107,29 @@
// Create Proxy-Object for this service
proxy_ = (PojoCacheMBean) MBeanProxyExt.create(PojoCacheMBean.class,
cacheServiceName_);
- if (proxy_ == null)
- {
- throw new RuntimeException("JBossCacheService: locate null TomcatCacheMbean");
- }
+ }
+ catch (Throwable t)
+ {
- cacheWrapper_ = new JBossCacheWrapper(proxy_);
-
- useTreeCacheMarshalling_ = proxy_.getUseRegionBasedMarshalling();
+ String str = "Could not access TreeCache service " +
+ (cacheServiceName_ == null ? "<null>" : cacheServiceName_.toString()) +
+ " for Tomcat clustering";
+ log_.debug(str);
+ throw new ClusteringNotSupportedException(str, t);
}
- catch (Throwable e)
+
+ if (proxy_ == null)
{
- String str = cacheServiceName_ + " service to Tomcat clustering not found";
- log_.error(str);
+ String str = "Could not access TreeCache service " +
+ (cacheServiceName_ == null ? "<null>" : cacheServiceName_.toString()) +
+ " for Tomcat clustering";
+ log_.debug(str);
throw new ClusteringNotSupportedException(str);
}
+
+ cacheWrapper_ = new JBossCacheWrapper(proxy_);
+
+ useTreeCacheMarshalling_ = proxy_.getUseRegionBasedMarshalling();
}
public void start(ClassLoader tcl, JBossCacheManager manager)
@@ -170,14 +157,13 @@
log_.debug("Old and new virtual host name are: " + host + ", " + hostName_);
- // Construct the fqn
- Object[] objs = new Object[]{SESSION, hostName_, webAppPath_};
- Fqn pathFqn = new Fqn( objs );
-
- cacheListener_ = new CacheListener(cacheWrapper_, manager_, pathFqn);
+ // Listen for cache changes
+ cacheListener_ = new CacheListener(cacheWrapper_, manager_, hostName_, webAppPath_);
proxy_.addTreeCacheListener(cacheListener_);
// register the tcl and bring over the state for the webapp
+ Object[] objs = new Object[]{SESSION, hostName_, webAppPath_};
+ Fqn pathFqn = new Fqn( objs );
String fqnStr = pathFqn.toString();
try {
if(useTreeCacheMarshalling_)
@@ -185,7 +171,7 @@
log_.debug("UseMarshalling is true. We will register the fqn: " +
fqnStr + " with class loader" +tcl +
" and activate the webapp's Region");
- proxy_.registerClassLoader(fqnStr, tcl);
+ proxy_.registerClassLoader(fqnStr, tcl);
proxy_.activateRegion(fqnStr);
}
} catch (Exception ex)
@@ -203,11 +189,11 @@
if(isCachePassivationEnabled())
{
- log_.info("JBossCacheService.start(): JBossCache passivation is enabled");
+ log_.debug("JBossCacheService.start(): JBossCache passivation is enabled");
}
else
{
- log_.info("JBossCacheService.start(): JBossCache passivation is disabled");
+ log_.debug("JBossCacheService.start(): JBossCache passivation is disabled");
}
}
@@ -395,7 +381,7 @@
Fqn fqn = getSessionFqn(realId);
if(log_.isDebugEnabled())
{
- log_.debug("evictSession(): evicting session from my distributed store");
+ log_.debug("evictSession(): evicting session from my distributed store. Fqn: " + fqn);
}
cacheWrapper_.evictSubtree(fqn);
@@ -551,14 +537,6 @@
*/
public Object setPojo(String realId, String key, Object pojo)
{
- // TODO This actually redundant here -- its already in
- // FieldBasedClusteredSession
- if( !Util.checkPojoType(pojo) )
- {
- throw new RuntimeException("setPojo: pojo is not an instance of Advised. Need to declare it in aop: "
- +pojo.toString());
- }
-
if(log_.isTraceEnabled())
{
log_.trace("setPojo(): session id: " + realId + " key: " + key +
@@ -568,13 +546,13 @@
Fqn fqn = getFieldFqn(realId, key);
try {
// Ignore any cache notifications that our own work generates
- LocalSessionActivity.startLocalActivity(realId);
+ SessionReplicationContext.startCacheActivity();
return proxy_.putObject(fqn, pojo);
} catch (CacheException e) {
throw new RuntimeException("JBossCacheService: exception occurred in cache setPojo ... ", e);
}
finally {
- LocalSessionActivity.finishLocalActivity();
+ SessionReplicationContext.finishCacheActivity();
}
}
@@ -594,13 +572,13 @@
Fqn fqn = getFieldFqn(realId, key);
try {
// Ignore any cache notifications that our own work generates
- LocalSessionActivity.startLocalActivity(realId);
+ SessionReplicationContext.startCacheActivity();
return proxy_.removeObject(fqn);
} catch (CacheException e) {
throw new RuntimeException("JBossCacheService: exception occurred in cache removePojo ... ", e);
}
finally {
- LocalSessionActivity.finishLocalActivity();
+ SessionReplicationContext.finishCacheActivity();
}
}
@@ -620,11 +598,11 @@
Fqn fqn = getAttributeFqn(realId);
try {
// Ignore any cache notifications that our own work generates
- LocalSessionActivity.startLocalActivity(realId);
+ SessionReplicationContext.startCacheActivity();
cacheWrapper_.removeLocalSubtree(fqn);
}
finally {
- LocalSessionActivity.finishLocalActivity();
+ SessionReplicationContext.finishCacheActivity();
}
}
@@ -644,11 +622,11 @@
Fqn fqn = getFieldFqn(realId, key);
try {
// Ignore any cache notifications that our own work generates
- LocalSessionActivity.startLocalActivity(realId);
+ SessionReplicationContext.startCacheActivity();
cacheWrapper_.removeLocalSubtree(fqn);
}
finally {
- LocalSessionActivity.finishLocalActivity();
+ SessionReplicationContext.finishCacheActivity();
}
}
@@ -705,39 +683,35 @@
*/
public void addObserver(Observer session, Object pojo)
{
- addObserver(session, pojo, new Stack());
+ addObserver(session, pojo, new HashSet());
}
- private void addObserver(Observer session, Object pojo, Stack stack)
+ private void addObserver(Observer session, Object pojo, Set processed)
{
if ( pojo instanceof Collection )
{
Collection col = (Collection)pojo;
for (Iterator i = col.iterator(); i.hasNext();) {
- Object obj = i.next();
// If not a managed pojo, will return anyway
- addObserver(session, obj, stack);
+ addObserver(session, i.next(), processed);
}
return;
}
else if (pojo instanceof Map)
{
- Map map = (Map)pojo;
- for (Iterator i = map.keySet().iterator(); i.hasNext();) {
- Object key = i.next();
- Object value = map.get(key);
+ for (Iterator i = ((Map)pojo).entrySet().iterator(); i.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) i.next();
// Walk thru key and value
- addObserver(session, key, stack);
- addObserver(session, value, stack);
+ addObserver(session, entry.getKey(), processed);
+ addObserver(session, entry.getValue(), processed);
}
return;
}
- // BRIAN 3/14 changed this from checking Advised to checking Subject
- // since that is what we cast to below
if(! (pojo instanceof Subject) )
{
return; // No need to add observer since it is primitive.
@@ -761,12 +735,15 @@
Set complexFields = (Set) typeMap.get(type);
if (complexFields == null)
{
- complexFields = parseComplexFields(type);
+ complexFields = Util.parseComplexFields(type);
typeMap.put(type, complexFields);
}
+
+ if (complexFields.size() == 0)
+ return;
- // Store a ref to the pojo on the stack to avoid cyclic additions
- stack.push(pojo);
+ // Store a ref to the pojo to avoid cyclic additions
+ processed.add(pojo);
for (Iterator iter = complexFields.iterator(); iter.hasNext();)
{
@@ -785,8 +762,8 @@
value=field.get(pojo);
// Continue recursively unless we've already handled this value
- if (value != null && !stack.contains(value))
- addObserver(session, value, stack);
+ if (value != null && !processed.contains(value))
+ addObserver(session, value, processed);
break;
}
catch(IllegalAccessException e)
@@ -808,8 +785,6 @@
}
}
}
-
- stack.pop();
}
/**
@@ -823,10 +798,10 @@
*/
public void removeObserver(Observer session, Object pojo)
{
- removeObserver(session, pojo, new Stack());
+ removeObserver(session, pojo, new HashSet());
}
- private void removeObserver(Observer session, Object pojo, Stack stack)
+ private void removeObserver(Observer session, Object pojo, Set stack)
{
if ( pojo instanceof Collection )
{
@@ -877,13 +852,16 @@
Set complexFields = (Set) typeMap.get(type);
if (complexFields == null)
{
- complexFields = parseComplexFields(type);
+ complexFields = Util.parseComplexFields(type);
typeMap.put(type, complexFields);
}
- // Store a ref to the pojo on the stack to avoid cyclic additions
- stack.push(pojo);
+ if (complexFields.size() == 0)
+ return;
+ // Store a ref to the pojo to avoid cyclic removals
+ stack.add(pojo);
+
for (Iterator iter = complexFields.iterator(); iter.hasNext();)
{
String fieldName = (String) iter.next();
@@ -924,8 +902,6 @@
}
}
}
-
- stack.pop();
}
public boolean isCachePassivationEnabled()
@@ -945,7 +921,7 @@
{
// /SESSION/id/ATTR/key
// Guard against string with delimiter.
- List list = new ArrayList();
+ List list = new ArrayList(6);
list.add(SESSION);
list.add(hostName_);
list.add(webAppPath_);
@@ -1133,48 +1109,7 @@
log_.error("externalizeSession(): exception occurred externalizing session " + session, e);
return null;
}
-
- }
-
- private Set parseComplexFields(Class clazz)
- {
- Set result = new HashSet();
- while (clazz != null)
- {
- Field[] fields = clazz.getDeclaredFields();
- for (int i = 0; i < fields.length; i++)
- {
- if (!immediates.contains(fields[i].getType())
- && isReplicatable(fields[i]))
- {
- result.add(fields[i].getName());
- }
- }
-
- clazz = clazz.getSuperclass();
- }
- return result;
}
- /**
- * Returns false if the given field is static, transient or final.
- *
- * @param f the field
- * @return
- */
- private static boolean isReplicatable(Field f) {
- int mods = f.getModifiers();
- /**
- * The following modifiers are ignored in the cache, i.e., they will not be stored in the cache.
- * Whenever, user trying to access these fields, it will be accessed from the in-memory version.
- */
- if (Modifier.isStatic(mods)
- || Modifier.isTransient(mods)
- || Modifier.isFinal(mods)) {
- return false;
- }
- return true;
- }
-
}
Deleted: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/LocalSessionActivity.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/LocalSessionActivity.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/LocalSessionActivity.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -1,87 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-package org.jboss.web.tomcat.tc6.session;
-
-/**
- * Binds the session id of a session that is currently being manipulated
- * locally to a ThreadLocal so CacheListener can ignore events generated by the
- * activity.
- *
- * @author Brian Stansberry
- * @version $Id$
- */
-class LocalSessionActivity
-{
- private static ThreadLocal activeSessions = new ThreadLocal();
-
- private String sessionId;
- private int level = 0;
-
- /**
- * Marks the current thread as actively processing the given session.
- * If the thread has already been so marked, increases a counter
- * so a subsequent call to finishLocalActivity does not remove
- * the association (allowing nested calls).
- *
- * @param sessionId. Can be <code>null</code>.
- */
- public static void startLocalActivity(String sessionId)
- {
- LocalSessionActivity current = (LocalSessionActivity) activeSessions.get();
- if (current == null || !(current.sessionId.equals(sessionId)))
- {
- current = new LocalSessionActivity(sessionId);
- activeSessions.set(current);
- }
- current.level++;
- }
-
- /**
- * Marks the completion of activity on a given session. Should be called
- * once for each invocation of {@link #startLocalActivity(String)}.
- *
- */
- public static void finishLocalActivity()
- {
- LocalSessionActivity current = (LocalSessionActivity) activeSessions.get();
- if (current != null)
- {
- if (--current.level == 0)
- activeSessions.set(null);
- }
- }
-
- public static boolean isLocallyActive(String sessionId)
- {
- boolean result = false;
- LocalSessionActivity active = (LocalSessionActivity) activeSessions.get();
- if (active != null)
- result = active.sessionId.equals(sessionId);
-
- return result;
- }
-
- private LocalSessionActivity(String sessionId)
- {
- this.sessionId = sessionId;
- }
-}
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SessionBasedClusteredSession.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SessionBasedClusteredSession.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SessionBasedClusteredSession.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -62,14 +62,6 @@
{
super(manager);
}
-
- /**
- * Used only for externalization.
- */
- private SessionBasedClusteredSession()
- {
- super();
- }
// ---------------------------------------------- Overridden Public Methods
@@ -178,19 +170,16 @@
// Let superclass write out everything but the attribute map
super.writeExternal(out);
- synchronized (attributes)
- {
- // Don't replicate any excluded attributes
- Map excluded = removeExcludedAttributes(attributes);
-
- out.writeObject(attributes);
-
- // Restore any excluded attributes
- if (excluded != null)
- attributes.putAll(excluded);
- }
- }
-
+ // Don't replicate any excluded attributes
+ Map excluded = removeExcludedAttributes(attributes);
+
+ out.writeObject(attributes);
+
+ // Restore any excluded attributes
+ if (excluded != null)
+ attributes.putAll(excluded);
+ }
+
}
protected void update(ClusteredSession replicated)
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SessionReplicationContext.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SessionReplicationContext.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SessionReplicationContext.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -1,155 +1,280 @@
package org.jboss.web.tomcat.tc6.session;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.Stack;
+import java.util.HashMap;
+import java.util.Map;
-import javax.servlet.http.HttpServletRequest;
-
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
public final class SessionReplicationContext
{
- private String sessionId;
- private SnapshotManager snapshot;
+ private static final ThreadLocal replicationContext = new ThreadLocal();
- private static final ThreadLocal contextStack = new ThreadLocal();
- private static final ThreadLocal requestThreadLocal = new ThreadLocal();
- private static final ThreadLocal responseThreadLocal = new ThreadLocal();
+ private static final SessionReplicationContext EMPTY = new SessionReplicationContext();
- public static void initContext(Request request, Response response)
+ private int webappCount;
+ private int activityCount;
+ private SnapshotManager soleManager;
+ private ClusteredSession soleSession;
+ private Map crossCtxSessions;
+ private Map expiredSessions;
+ private Request outerRequest;
+ private Response outerResponse;
+
+ /**
+ * Associate a SessionReplicationContext with the current thread, if
+ * there isn't one already. If there isn't one, associate the
+ * given request and response with the context.
+ * <p/>
+ * <strong>NOTE:</strong> Nested calls to this method and {@link #exitWebapp()}
+ * are supported; once a context is established the number of calls to this
+ * method and <code>exitWebapp()</code> are tracked.
+ *
+ * @param request
+ * @param response
+ */
+ public static void enterWebapp(Request request, Response response, boolean startCacheActivity)
{
- Stack stack = (Stack) contextStack.get();
- if (stack == null)
+ SessionReplicationContext ctx = getCurrentContext();
+ if (ctx == null)
{
- stack = new Stack();
- contextStack.set(stack);
+ ctx = new SessionReplicationContext(request, response);
+ replicationContext.set(ctx);
}
- stack.push(new LinkedHashSet());
- // We only store the outermost request if there are nested calls
- // to this method in the same request
- if (stack.size() == 1)
- {
- requestThreadLocal.set(request);
- responseThreadLocal.set(response);
- }
+ ctx.webappCount++;
+ if (startCacheActivity)
+ ctx.activityCount++;
}
- public static SessionReplicationContext[] getActiveSessions()
+ /**
+ * Signals that the webapp is finished handling the request (and
+ * therefore replication can begin.)
+ *
+ * @return a SessionReplicationContext, from which information
+ * about any sessions needing replication can be obtained.
+ * Will not return <code>null</code>.
+ */
+ public static SessionReplicationContext exitWebapp()
{
- Stack stack = (Stack) contextStack.get();
- Set set = (Set) stack.peek();
+ SessionReplicationContext ctx = getCurrentContext();
+ if (ctx != null)
+ {
+ ctx.webappCount--;
+ if (ctx.webappCount < 1)
+ {
+ // We've unwound any nested webapp calls, so we'll clean up and
+ // return the context to allow replication. If all cache activity
+ // is done as well, clear the ThreadLocal
+
+ ctx.outerRequest = null;
+ ctx.outerResponse = null;
+
+ if (ctx.activityCount < 1)
+ replicationContext.set(null);
+
+ return ctx;
+ }
+ }
- SessionReplicationContext[] result = new SessionReplicationContext[set.size()];
- return (SessionReplicationContext[]) set.toArray(result);
+ // A nested valve called us. Just return an empty context
+ return EMPTY;
+
}
- public static void clearContext()
+ public static void bindSession(ClusteredSession session, SnapshotManager manager)
{
- Stack stack = (Stack) contextStack.get();
- stack.pop();
-
- // If this is the last in the stack, clear the request and response
- if (stack.size() == 0)
+ SessionReplicationContext ctx = getCurrentContext();
+ if (ctx != null && ctx.webappCount > 0)
{
- requestThreadLocal.set(null);
- responseThreadLocal.set(null);
- // TODO consider leaving the stack around
- contextStack.set(null);
+ ctx.addReplicatableSession(session, manager);
}
+ /*else {
+ We are past the part of the request cycle where we
+ track sessions for replication
+ }*/
}
- public static void addActiveSession(String realId, SnapshotManager manager)
+ public static void sessionExpired(ClusteredSession session, String realId, SnapshotManager manager)
{
- Stack stack = (Stack) contextStack.get();
- if (stack != null)
+ SessionReplicationContext ctx = getCurrentContext();
+ if (ctx != null && ctx.webappCount > 0)
{
- Set set = (Set) stack.peek();
- if (set != null)
- set.add(new SessionReplicationContext(realId, manager));
- }
+ ctx.addExpiredSession(session, manager);
+ }
}
- public static boolean isSessionActive(String realId, SnapshotManager manager)
+ public static boolean isSessionBoundAndExpired(String realId, SnapshotManager manager)
{
boolean result = false;
- Stack stack = (Stack) contextStack.get();
- if (stack != null)
+ SessionReplicationContext ctx = getCurrentContext();
+ if (ctx != null)
{
- Set set = (Set) stack.peek();
- result = set.contains(new SessionReplicationContext(realId, manager));
+ result = ctx.isSessionExpired(realId, manager);
}
return result;
}
+ /**
+ * Marks the current thread as actively processing the given session.
+ * If the thread has already been so marked, increases a counter
+ * so a subsequent call to finishLocalActivity does not remove
+ * the association (allowing nested calls).
+ */
+ public static void startCacheActivity()
+ {
+ SessionReplicationContext ctx = getCurrentContext();
+ if (ctx == null)
+ {
+ ctx = new SessionReplicationContext();
+ replicationContext.set(ctx);
+ }
+
+ ctx.activityCount++;
+ }
+
+ /**
+ * Marks the completion of activity on a given session. Should be called
+ * once for each invocation of {@link #startCacheActivity()}.
+ */
+ public static void finishCacheActivity()
+ {
+ SessionReplicationContext ctx = getCurrentContext();
+ if (ctx != null)
+ {
+ ctx.activityCount--;
+ if (ctx.activityCount < 1 && ctx.webappCount < 1)
+ {
+ replicationContext.set(null);
+ }
+ }
+ }
+
+ public static boolean isLocallyActive()
+ {
+ return getCurrentContext() != null;
+ }
+
public static Request getOriginalRequest()
{
- return (Request) requestThreadLocal.get();
+ SessionReplicationContext ctx = getCurrentContext();
+ return (ctx == null ? null : ctx.outerRequest);
}
public static Response getOriginalResponse()
{
- return (Response) responseThreadLocal.get();
+ SessionReplicationContext ctx = getCurrentContext();
+ return (ctx == null ? null : ctx.outerResponse);
}
- // Prevent external instantiation
- private SessionReplicationContext(String sessionId, SnapshotManager manager)
+ private static SessionReplicationContext getCurrentContext()
{
- this.sessionId = sessionId;
- this.snapshot = manager;
+ return (SessionReplicationContext) replicationContext.get();
}
-
- public SnapshotManager getSnapshot()
+
+ private SessionReplicationContext(Request request, Response response)
{
- return snapshot;
+ this.outerRequest = request;
+ this.outerResponse = response;
}
+
+ private SessionReplicationContext() {}
- public String getSessionId()
+ /**
+ * Gets a Map<SnapshotManager, ClusteredSession> of sessions that were accessed
+ * during the course of a request. Will only be non-null if
+ * {@link #bindSession(ClusteredSession, SnapshotManager)} was called
+ * with more than one SnapshotManager (i.e the request crossed session
+ * contexts.)
+ */
+ public Map getCrossContextSessions()
{
- return sessionId;
+ return crossCtxSessions;
}
- public boolean equals(Object obj)
+ /**
+ * Gets the SnapshotManager that was passed to
+ * {@link #bindSession(ClusteredSession, SnapshotManager)} if and only
+ * if only one such SnapshotManager was passed. Returns <code>null</code>
+ * otherwise, in which case a cross-context request is a possibility,
+ * and {@link #getCrossContextSessions()} should be checked.
+ */
+ public SnapshotManager getSoleSnapshotManager()
{
- if (this == obj)
- return true;
-
- if (obj instanceof SessionReplicationContext)
- {
- SessionReplicationContext other = (SessionReplicationContext) obj;
- // Test the snapshot first as it is more likely to be unique
- return (this.snapshot.equals(other.snapshot)
- && this.sessionId.equals(other.sessionId));
- }
-
- return false;
+ return soleManager;
}
/**
- * Returns the {@link #getSnapshot snapshot}'s hashcode, as it should be sufficient
- * to minimize bucket collisions; for the same request there shouldn't be more
- * than one session per snapshot.
+ * Gets the ClusteredSession that was passed to
+ * {@link #bindSession(ClusteredSession, SnapshotManager)} if and only
+ * if only one SnapshotManager was passed. Returns <code>null</code>
+ * otherwise, in which case a cross-context request is a possibility,
+ * and {@link #getCrossContextSessions()} should be checked.
*/
- public int hashCode()
+ public ClusteredSession getSoleSession()
{
- return snapshot.hashCode();
+ return soleSession;
}
-
- public String toString()
+
+ private void addReplicatableSession(ClusteredSession session, SnapshotManager mgr)
{
- StringBuffer sb = new StringBuffer(getClass().getName());
- sb.append("{sessionid=");
- sb.append(sessionId);
- sb.append("snapshot=");
- sb.append(snapshot);
- sb.append("}");
- return sb.toString();
+ if (crossCtxSessions != null)
+ {
+ crossCtxSessions.put(session, mgr);
+ }
+ else if (soleManager == null)
+ {
+ // First one bound
+ soleManager = mgr;
+ soleSession = session;
+ }
+ else if (!mgr.equals(soleManager))
+ {
+ // We have a cross-context call; need a Map for the sessions
+ crossCtxSessions = new HashMap();
+ crossCtxSessions.put(soleSession, soleManager);
+ crossCtxSessions.put(session, mgr);
+ soleManager = null;
+ soleSession = null;
+ }
+ else
+ {
+ soleSession = session;
+ }
}
+ private void addExpiredSession(ClusteredSession session, SnapshotManager manager)
+ {
+ boolean store = manager.equals(soleManager);
+ if (store)
+ {
+ soleManager = null;
+ soleSession = null;
+ }
+ else if (crossCtxSessions != null)
+ {
+ // Only store the session if it was previously in our map
+ store = (crossCtxSessions.remove(session) != null);
+ }
+
+ if (store)
+ {
+ if (this.expiredSessions == null)
+ {
+ expiredSessions = new HashMap();
+ }
+ expiredSessions.put(manager, session.getRealId());
+ }
+ }
+ private boolean isSessionExpired(String realId, SnapshotManager manager)
+ {
+ boolean result = false;
+ if (expiredSessions != null)
+ {
+ result = realId.equals(expiredSessions.get(manager));
+ }
+ return result;
+ }
-
-
}
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SnapshotManager.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SnapshotManager.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/SnapshotManager.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -21,7 +21,6 @@
*/
package org.jboss.web.tomcat.tc6.session;
-import org.apache.catalina.Context;
import org.jboss.logging.Logger;
/**
@@ -55,7 +54,7 @@
* Tell the snapshot manager which session was modified and
* must be replicated
*/
- public abstract void snapshot(String id);
+ public abstract void snapshot(ClusteredSession session);
/**
* Start the snapshot manager
Modified: trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/Util.java
===================================================================
--- trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/Util.java 2006-09-04 03:42:05 UTC (rev 56541)
+++ trunk/tomcat/src/main/org/jboss/web/tomcat/tc6/session/Util.java 2006-09-04 04:15:00 UTC (rev 56542)
@@ -23,11 +23,15 @@
package org.jboss.web.tomcat.tc6.session;
import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.jboss.aop.Advised;
-import org.jboss.cache.aop.CachedType;
/**
* Utility methods related to JBoss distributed sessions.
@@ -35,8 +39,27 @@
* @author Brian Stansberry
* @version $Revision$
*/
-public abstract class Util
+public class Util
{
+ // Types that are considered "primitive".
+ private static final Set immediates =
+ new HashSet(Arrays.asList(new Object[]{
+ String.class,
+ Boolean.class,
+ Double.class,
+ Float.class,
+ Integer.class,
+ Long.class,
+ Short.class,
+ Character.class,
+ Boolean.TYPE,
+ Double.TYPE,
+ Float.TYPE,
+ Integer.TYPE,
+ Long.TYPE,
+ Short.TYPE,
+ Character.TYPE,
+ Class.class}));
/**
* Returns a session id with any trailing jvmRoute removed.
@@ -73,10 +96,51 @@
|| (pojo instanceof Collection)
|| (pojo instanceof Map)
|| (pojo instanceof Advised)
- || (CachedType.isImmediate(pojo.getClass())));
+ || (immediates.contains(pojo.getClass())));
}
+
+ public static Set parseComplexFields(Class clazz)
+ {
+ Set result = new HashSet();
+
+ while (clazz != null)
+ {
+ Field[] fields = clazz.getDeclaredFields();
+ for (int i = 0; i < fields.length; i++)
+ {
+ if (!immediates.contains(fields[i].getType())
+ && isReplicatable(fields[i]))
+ {
+ result.add(fields[i].getName());
+ }
+ }
+
+ clazz = clazz.getSuperclass();
+ }
+ return result;
+ }
/**
+ * Returns false if the given field is static, transient or final.
+ *
+ * @param f the field
+ * @return
+ */
+ public static boolean isReplicatable(Field f) {
+ int mods = f.getModifiers();
+ /**
+ * The following modifiers are ignored in the cache, i.e., they will not be stored in the cache.
+ * Whenever, user trying to access these fields, it will be accessed from the in-memory version.
+ */
+ if (Modifier.isStatic(mods)
+ || Modifier.isTransient(mods)
+ || Modifier.isFinal(mods)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Prevent instantiation.
*/
private Util() {}
More information about the jboss-cvs-commits
mailing list