[jboss-cvs] JBossAS SVN: r91659 - branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jul 25 09:51:07 EDT 2009
Author: bstansberry at jboss.com
Date: 2009-07-25 09:51:07 -0400 (Sat, 25 Jul 2009)
New Revision: 91659
Modified:
branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session/JBossCacheManager.java
branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session/SessionReplicationContext.java
Log:
[JBAS-7123] JBossCacheManager remains open to requests after shutdown
Modified: branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session/JBossCacheManager.java
===================================================================
--- branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session/JBossCacheManager.java 2009-07-25 05:04:40 UTC (rev 91658)
+++ branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session/JBossCacheManager.java 2009-07-25 13:51:07 UTC (rev 91659)
@@ -159,6 +159,8 @@
private OutdatedSessionChecker outdatedSessionChecker;
+ private volatile boolean stopping;
+
// ---------------------------------------------------------- Constructors
public JBossCacheManager() throws ClusteringNotSupportedException
@@ -341,19 +343,75 @@
session.getClass().getName());
}
- add(uncheckedCastSession(session), false); // wait to replicate until req end
+ try
+ {
+ // [JBAS-7123] Make sure we're either in the call stack where LockingValve has
+ // a lock, or that we acquire one ourselves
+ boolean inLockingValve = SessionReplicationContext.isLocallyActive();
+ if (inLockingValve || this.valveLock.tryLock(0, TimeUnit.SECONDS))
+ {
+ try
+ {
+ add(uncheckedCastSession(session), false); // wait to replicate until req end
+ }
+ finally
+ {
+ if (!inLockingValve)
+ {
+ this.valveLock.unlock();
+ }
+ }
+ }
+ else if (trace_)
+ {
+ log_.trace("add(): ignoring add -- Manager is not actively handling requests");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
// Satisfy the Manager interface. Internally we use
// createEmptyClusteredSession to avoid a cast
public Session createEmptySession()
{
- if (trace_)
+ Session session = null;
+ try
{
- log_.trace("Creating an empty ClusteredSession");
+ // [JBAS-7123] Make sure we're either in the call stack where LockingValve has
+ // a lock, or that we acquire one ourselves
+ boolean inLockingValve = SessionReplicationContext.isLocallyActive();
+ if (inLockingValve || this.valveLock.tryLock(0, TimeUnit.SECONDS))
+ {
+ try
+ {
+ if (trace_)
+ {
+ log_.trace("Creating an empty ClusteredSession");
+ }
+ session = createEmptyClusteredSession();
+ }
+ finally
+ {
+ if (!inLockingValve)
+ {
+ this.valveLock.unlock();
+ }
+ }
+ }
+ else if (trace_)
+ {
+ log_.trace("createEmptySession(): Manager is not handling requests; returning null");
+ }
}
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
- return createEmptyClusteredSession();
+ return session;
}
/**
@@ -368,7 +426,42 @@
* {@inheritDoc}
*/
public Session createSession(String sessionId)
- {
+ {
+ Session session = null;
+ try
+ {
+ // [JBAS-7123] Make sure we're either in the call stack where LockingValve has
+ // a lock, or that we acquire one ourselves
+ boolean inLockingValve = SessionReplicationContext.isLocallyActive();
+ if (inLockingValve || this.valveLock.tryLock(0, TimeUnit.SECONDS))
+ {
+ try
+ {
+ session = createSessionInternal(sessionId);
+ }
+ finally
+ {
+ if (!inLockingValve)
+ {
+ this.valveLock.unlock();
+ }
+ }
+ }
+ else if (trace_)
+ {
+ log_.trace("createEmptySession(): Manager is not handling requests; returning null");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ return session;
+ }
+
+ private Session createSessionInternal(String sessionId)
+ {
// First check if we've reached the max allowed sessions,
// then try to expire/passivate sessions to free memory
// maxActiveAllowed_ -1 is unlimited
@@ -401,55 +494,58 @@
ClusteredSession<? extends OutgoingDistributableSessionData> session = createEmptyClusteredSession();
- session.setNew(true);
- session.setCreationTime(System.currentTimeMillis());
- session.setMaxInactiveInterval(this.maxInactiveInterval_);
- session.setValid(true);
-
- String clearInvalidated = null; // see below
-
- if (sessionId == null)
+ if (session != null)
{
- sessionId = this.getNextId();
-
- // We are using mod_jk for load balancing. Append the JvmRoute.
- if (getUseJK())
- {
- if (trace_)
- {
- log_.trace("createSession(): useJK is true. Will append JvmRoute: " + this.getJvmRoute());
- }
- sessionId += "." + this.getJvmRoute();
- }
+ session.setNew(true);
+ session.setCreationTime(System.currentTimeMillis());
+ session.setMaxInactiveInterval(this.maxInactiveInterval_);
+ session.setValid(true);
+
+ String clearInvalidated = null; // see below
+
+ if (sessionId == null)
+ {
+ sessionId = this.getNextId();
+
+ // We are using mod_jk for load balancing. Append the JvmRoute.
+ if (getUseJK())
+ {
+ if (trace_)
+ {
+ log_.trace("createSession(): useJK is true. Will append JvmRoute: " + this.getJvmRoute());
+ }
+ sessionId += "." + this.getJvmRoute();
+ }
+ }
+ else
+ {
+ clearInvalidated = sessionId;
+ }
+
+ session.setId(sessionId); // Setting the id leads to a call to add()
+
+ getDistributedCacheManager().sessionCreated(session.getRealId());
+
+ session.tellNew(ClusteredSessionNotificationCause.CREATE);
+
+ if (trace_)
+ {
+ log_.trace("Created a ClusteredSession with id: " + sessionId);
+ }
+
+ createdCounter_.incrementAndGet(); // the call to add() handles the other counters
+
+ // Add this session to the set of those potentially needing replication
+ SessionReplicationContext.bindSession(session, snapshotManager_);
+
+ if (clearInvalidated != null)
+ {
+ // We no longer need to track any earlier session w/ same id
+ // invalidated by this thread
+ SessionInvalidationTracker.clearInvalidatedSession(clearInvalidated, this);
+ }
}
- else
- {
- clearInvalidated = sessionId;
- }
-
- session.setId(sessionId); // Setting the id leads to a call to add()
- getDistributedCacheManager().sessionCreated(session.getRealId());
-
- session.tellNew(ClusteredSessionNotificationCause.CREATE);
-
- if (trace_)
- {
- log_.trace("Created a ClusteredSession with id: " + sessionId);
- }
-
- createdCounter_.incrementAndGet(); // the call to add() handles the other counters
-
- // Add this session to the set of those potentially needing replication
- SessionReplicationContext.bindSession(session, snapshotManager_);
-
- if (clearInvalidated != null)
- {
- // We no longer need to track any earlier session w/ same id
- // invalidated by this thread
- SessionInvalidationTracker.clearInvalidatedSession(clearInvalidated, this);
- }
-
return session;
}
@@ -549,23 +645,49 @@
*/
public Session[] findSessions()
{
- // Need to load all the unloaded sessions
- if(unloadedSessions_.size() > 0)
+ Session[] sessions = null;
+ try
{
- // Make a thread-safe copy of the new id list to work with
- Set<String> ids = new HashSet<String>(unloadedSessions_.keySet());
-
- if(trace_) {
- log_.trace("findSessions: loading sessions from distributed cache: " + ids);
+ // [JBAS-7123] Make sure we're either in the call stack where LockingValve has
+ // a lock, or that we acquire one ourselves
+ boolean inLockingValve = SessionReplicationContext.isLocallyActive();
+ if (inLockingValve || this.valveLock.tryLock(0, TimeUnit.SECONDS))
+ {
+ try
+ {
+ // Need to load all the unloaded sessions
+ if(unloadedSessions_.size() > 0)
+ {
+ // Make a thread-safe copy of the new id list to work with
+ Set<String> ids = new HashSet<String>(unloadedSessions_.keySet());
+
+ if(trace_) {
+ log_.trace("findSessions: loading sessions from distributed cache: " + ids);
+ }
+
+ for(String id : ids) {
+ loadSession(id);
+ }
+ }
+
+ // All sessions are now "local" so just return the local sessions
+ sessions = findLocalSessions();
+ }
+ finally
+ {
+ if (!inLockingValve)
+ {
+ this.valveLock.unlock();
+ }
+ }
}
-
- for(String id : ids) {
- loadSession(id);
- }
}
-
- // All sessions are now "local" so just return the local sessions
- return findLocalSessions();
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ return sessions;
}
/**
@@ -699,8 +821,27 @@
throw new IllegalStateException("Manager not started");
}
+ if (stopping)
+ {
+ return;
+ }
+
log_.debug("Stopping");
+ this.stopping = true;
+
+ // Disable background work, then block for any ongoing backgroundProcess.
+ // Do this before draining the semaphore so we know draining
+ // won't impact any on-going background processing
+ backgroundProcessAllowed.set(false);
+ synchronized (backgroundProcessAllowed)
+ {
+ if (trace_)
+ {
+ log_.trace("All background processing terminated");
+ }
+ }
+
// Handle re-entrance
if (this.semaphore.tryAcquire())
{
@@ -719,12 +860,6 @@
}
}
- // Block for any ongoing backgroundProcess, then disable
- synchronized (backgroundProcessAllowed)
- {
- backgroundProcessAllowed.set(false);
- }
-
// Let subclasses clean up
stopExtensions();
@@ -1538,6 +1673,11 @@
ClusteredSession<? extends OutgoingDistributableSessionData> sessions[] = findLocalSessions();
for (int i = 0; i < sessions.length; ++i)
{
+ if (!backgroundProcessAllowed.get())
+ {
+ return;
+ }
+
try
{
ClusteredSession<? extends OutgoingDistributableSessionData> session = sessions[i];
@@ -1589,6 +1729,11 @@
ex, ex);
}
}
+
+ if (!backgroundProcessAllowed.get())
+ {
+ return;
+ }
// Next, handle any unloaded sessions
@@ -1601,7 +1746,12 @@
Map<String, OwnedSessionUpdate> unloaded = getUnloadedSessions();
for (Map.Entry<String, OwnedSessionUpdate> entry : unloaded.entrySet())
- {
+ {
+ if (!backgroundProcessAllowed.get())
+ {
+ return;
+ }
+
String realId = entry.getKey();
OwnedSessionUpdate osu = entry.getValue();
@@ -1728,23 +1878,46 @@
private ClusteredSession<? extends OutgoingDistributableSessionData> createEmptyClusteredSession()
{
-
- ClusteredSession<? extends OutgoingDistributableSessionData> session = null;
- switch (replicationGranularity_)
+ ClusteredSession<? extends OutgoingDistributableSessionData> session = null;
+ try
{
- case ATTRIBUTE:
- ClusteredManager<OutgoingAttributeGranularitySessionData> amgr = uncheckedCastManager(this);
- session = new AttributeBasedClusteredSession(amgr);
- break;
- case FIELD:
- ClusteredManager<OutgoingDistributableSessionData> fmgr = uncheckedCastManager(this);
- session = new FieldBasedClusteredSession(fmgr);
- break;
- default:
- ClusteredManager<OutgoingSessionGranularitySessionData> smgr = uncheckedCastManager(this);
- session = new SessionBasedClusteredSession(smgr);
- break;
+ // [JBAS-7123] Make sure we're either in the call stack where LockingValve has
+ // a lock, or that we acquire one ourselves
+ boolean inLockingValve = SessionReplicationContext.isLocallyActive();
+ if (inLockingValve || this.valveLock.tryLock(0, TimeUnit.SECONDS))
+ {
+ try
+ {
+ switch (replicationGranularity_)
+ {
+ case ATTRIBUTE :
+ ClusteredManager<OutgoingAttributeGranularitySessionData> amgr = uncheckedCastManager(this);
+ session = new AttributeBasedClusteredSession(amgr);
+ break;
+ case FIELD :
+ ClusteredManager<OutgoingDistributableSessionData> fmgr = uncheckedCastManager(this);
+ session = new FieldBasedClusteredSession(fmgr);
+ break;
+ default :
+ ClusteredManager<OutgoingSessionGranularitySessionData> smgr = uncheckedCastManager(this);
+ session = new SessionBasedClusteredSession(smgr);
+ break;
+ }
+ }
+ finally
+ {
+ if (!inLockingValve)
+ {
+ this.valveLock.unlock();
+ }
+ }
+ }
}
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
return session;
}
@@ -1810,126 +1983,150 @@
{
return null;
}
-
- long begin = System.currentTimeMillis();
- boolean mustAdd = false;
- boolean passivated = false;
+ ClusteredSession<? extends OutgoingDistributableSessionData> session = null;
- ClusteredSession<? extends OutgoingDistributableSessionData> session = sessions_.get(realId);
- boolean initialLoad = false;
- if (session == null)
- {
- initialLoad = true;
- // This is either the first time we've seen this session on this
- // server, or we previously expired it and have since gotten
- // a replication message from another server
- mustAdd = true;
- session = createEmptyClusteredSession();
-
- OwnedSessionUpdate osu = unloadedSessions_.get(realId);
- passivated = (osu != null && osu.isPassivated());
- }
-
- synchronized (session)
+ try
{
- ContextClassLoaderSwitcher.SwitchContext switcher = null;
- boolean doTx = false;
- try
+ // [JBAS-7123] Make sure we're either in the call stack where LockingValve has
+ // a lock, or that we acquire one ourselves
+ boolean inLockingValve = SessionReplicationContext.isLocallyActive();
+ if (inLockingValve || this.valveLock.tryLock(0, TimeUnit.SECONDS))
{
- // We need transaction so any data gravitation replication
- // is sent in batch.
- // Don't do anything if there is already transaction context
- // associated with this thread.
- if (batchingManager.isBatchInProgress() == false)
- {
- batchingManager.startBatch();
- doTx = true;
- }
-
- // Tomcat calls Manager.findSession before setting the tccl,
- // so we need to do it :(
- switcher = getContextClassLoaderSwitcher().getSwitchContext();
- switcher.setClassLoader(tcl_);
-
- IncomingDistributableSessionData data = proxy_.getSessionData(realId, initialLoad);
- if (data != null)
- {
- session.update(data);
- }
- else
- {
- // Clunky; we set the session variable to null to indicate
- // no data so move on
- session = null;
- }
-
- if (session != null)
- {
- ClusteredSessionNotificationCause cause = passivated ? ClusteredSessionNotificationCause.ACTIVATION
- : ClusteredSessionNotificationCause.FAILOVER;
- session.notifyDidActivate(cause);
- }
- }
- catch (Exception ex)
- {
try
{
-// if(doTx)
- // Let's set it no matter what.
- batchingManager.setBatchRollbackOnly();
- }
- catch (Exception exn)
- {
- log_.error("Caught exception rolling back transaction", exn);
- }
- // We will need to alert Tomcat of this exception.
- if (ex instanceof RuntimeException)
- throw (RuntimeException) ex;
-
- throw new RuntimeException("loadSession(): failed to load session " +
- realId, ex);
- }
- finally
- {
- try {
- if(doTx)
- {
- batchingManager.endBatch();
+ long begin = System.currentTimeMillis();
+ boolean mustAdd = false;
+ boolean passivated = false;
+
+ session = sessions_.get(realId);
+ boolean initialLoad = false;
+ if (session == null)
+ {
+ initialLoad = true;
+ // This is either the first time we've seen this session on this
+ // server, or we previously expired it and have since gotten
+ // a replication message from another server
+ mustAdd = true;
+ session = createEmptyClusteredSession();
+
+ OwnedSessionUpdate osu = unloadedSessions_.get(realId);
+ passivated = (osu != null && osu.isPassivated());
}
- }
- finally {
- if (switcher != null)
+
+ synchronized (session)
{
- switcher.reset();
+ ContextClassLoaderSwitcher.SwitchContext switcher = null;
+ boolean doTx = false;
+ try
+ {
+ // We need transaction so any data gravitation replication
+ // is sent in batch.
+ // Don't do anything if there is already transaction context
+ // associated with this thread.
+ if (batchingManager.isBatchInProgress() == false)
+ {
+ batchingManager.startBatch();
+ doTx = true;
+ }
+
+ // Tomcat calls Manager.findSession before setting the tccl,
+ // so we need to do it :(
+ switcher = getContextClassLoaderSwitcher().getSwitchContext();
+ switcher.setClassLoader(tcl_);
+
+ IncomingDistributableSessionData data = proxy_.getSessionData(realId, initialLoad);
+ if (data != null)
+ {
+ session.update(data);
+ }
+ else
+ {
+ // Clunky; we set the session variable to null to indicate
+ // no data so move on
+ session = null;
+ }
+
+ if (session != null)
+ {
+ ClusteredSessionNotificationCause cause = passivated ? ClusteredSessionNotificationCause.ACTIVATION
+ : ClusteredSessionNotificationCause.FAILOVER;
+ session.notifyDidActivate(cause);
+ }
+ }
+ catch (Exception ex)
+ {
+ try
+ {
+ // if(doTx)
+ // Let's set it no matter what.
+ batchingManager.setBatchRollbackOnly();
+ }
+ catch (Exception exn)
+ {
+ log_.error("Caught exception rolling back transaction", exn);
+ }
+ // We will need to alert Tomcat of this exception.
+ if (ex instanceof RuntimeException)
+ throw (RuntimeException) ex;
+
+ throw new RuntimeException("loadSession(): failed to load session " +
+ realId, ex);
+ }
+ finally
+ {
+ try {
+ if(doTx)
+ {
+ batchingManager.endBatch();
+ }
+ }
+ finally {
+ if (switcher != null)
+ {
+ switcher.reset();
+ }
+ }
+ }
+
+ if (session != null)
+ {
+ if (mustAdd)
+ {
+ add(session, false); // don't replicate
+ if (!passivated)
+ {
+ session.tellNew(ClusteredSessionNotificationCause.FAILOVER);
+ }
+ }
+ long elapsed = System.currentTimeMillis() - begin;
+ stats_.updateLoadStats(realId, elapsed);
+
+ if (trace_)
+ {
+ log_.trace("loadSession(): id= " + realId + ", session=" + session);
+ }
+ }
+ else if (trace_)
+ {
+ log_.trace("loadSession(): session " + realId +
+ " not found in distributed cache");
+ }
}
}
- }
-
- if (session != null)
- {
- if (mustAdd)
+ finally
{
- add(session, false); // don't replicate
- if (!passivated)
+ if (!inLockingValve)
{
- session.tellNew(ClusteredSessionNotificationCause.FAILOVER);
+ this.valveLock.unlock();
}
}
- long elapsed = System.currentTimeMillis() - begin;
- stats_.updateLoadStats(realId, elapsed);
-
- if (trace_)
- {
- log_.trace("loadSession(): id= " + realId + ", session=" + session);
- }
}
- else if (trace_)
- {
- log_.trace("loadSession(): session " + realId +
- " not found in distributed cache");
- }
}
-
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
return session;
}
Modified: branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session/SessionReplicationContext.java
===================================================================
--- branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session/SessionReplicationContext.java 2009-07-25 05:04:40 UTC (rev 91658)
+++ branches/Branch_5_x/tomcat/src/main/org/jboss/web/tomcat/service/session/SessionReplicationContext.java 2009-07-25 13:51:07 UTC (rev 91659)
@@ -162,10 +162,7 @@
* the current thread.
*
* @return <code>true</code> if there is a context associated with the thread
- *
- * @deprecated Will be removed in AS 6
*/
- @Deprecated
public static boolean isLocallyActive()
{
return getCurrentContext() != null;
More information about the jboss-cvs-commits
mailing list