[jboss-cvs] JBossAS SVN: r60617 - trunk/ejb3/src/main/org/jboss/ejb3/cache/tree.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Feb 18 22:52:25 EST 2007
Author: bstansberry at jboss.com
Date: 2007-02-18 22:52:25 -0500 (Sun, 18 Feb 2007)
New Revision: 60617
Modified:
trunk/ejb3/src/main/org/jboss/ejb3/cache/tree/StatefulTreeCache.java
Log:
[EJBTHREE-867] Loosen coupling of lifecycle of nested SFSBs
[EJBTHREE-849] Properly handle passivation/activation callbacks for nested SFSBs
[EJBTHREE-851] Invoke passivation/activation callbacks around replication
[EJBTHREE-846] Transfer passivated sessions on startup
[EJBTHREE-856] Add gravitation call needed for buddy replication
Modified: trunk/ejb3/src/main/org/jboss/ejb3/cache/tree/StatefulTreeCache.java
===================================================================
--- trunk/ejb3/src/main/org/jboss/ejb3/cache/tree/StatefulTreeCache.java 2007-02-19 03:48:42 UTC (rev 60616)
+++ trunk/ejb3/src/main/org/jboss/ejb3/cache/tree/StatefulTreeCache.java 2007-02-19 03:52:25 UTC (rev 60617)
@@ -21,6 +21,7 @@
*/
package org.jboss.ejb3.cache.tree;
+import java.lang.ref.WeakReference;
import java.util.Map;
import javax.ejb.EJBException;
@@ -28,30 +29,30 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.jboss.annotation.ejb.cache.tree.CacheConfig;
import org.jboss.aop.Advisor;
-import org.jboss.cache.eviction.EvictionPolicyConfig;
-import org.jboss.cache.eviction.LRUConfiguration;
-import org.jboss.cache.jmx.CacheJmxWrapperMBean;
import org.jboss.cache.AbstractCacheListener;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.Region;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.eviction.EvictionPolicyConfig;
+import org.jboss.cache.eviction.LRUConfiguration;
+import org.jboss.cache.jmx.CacheJmxWrapperMBean;
import org.jboss.ejb3.Container;
import org.jboss.ejb3.EJBContainer;
import org.jboss.ejb3.Pool;
import org.jboss.ejb3.cache.ClusteredStatefulCache;
+import org.jboss.ejb3.stateful.NestedStatefulBeanContext;
import org.jboss.ejb3.stateful.ProxiedStatefulBeanContext;
import org.jboss.ejb3.stateful.StatefulBeanContext;
+import org.jboss.logging.Logger;
import org.jboss.mx.util.MBeanProxyExt;
import org.jboss.mx.util.MBeanServerLocator;
-import org.jboss.logging.Logger;
-import org.jboss.annotation.ejb.cache.tree.CacheConfig;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.config.Option;
-
/**
* Comment
*
@@ -60,23 +61,27 @@
*/
public class StatefulTreeCache implements ClusteredStatefulCache
{
- protected static Logger log = Logger.getLogger(StatefulTreeCache.class);
- static int FQN_SIZE = 2; // depth of fqn that we store the session in.
-
+ private static final int FQN_SIZE = 2; // depth of fqn that we store the session in.
+
private static Option BYPASS_OPTION = new Option();
private static Option LOCAL_ONLY_OPTION = new Option();
+ private static Option GRAVITATE_OPTION = new Option();
static
{
BYPASS_OPTION.setBypassInterceptorChain(true);
LOCAL_ONLY_OPTION.setCacheModeLocal(true);
+ GRAVITATE_OPTION.setForceDataGravitation(true);
}
+ private Logger log = Logger.getLogger(StatefulTreeCache.class);
private Pool pool;
+ private WeakReference<ClassLoader> classloader;
private Cache cache;
private Fqn cacheNode;
private Region region;
private ClusteredStatefulCacheListener listener;
+
public static long MarkInUseWaitTime = 15000;
public StatefulBeanContext create()
@@ -85,8 +90,12 @@
try
{
ctx = (StatefulBeanContext) pool.get();
- cache.put(new Fqn(cacheNode, ctx.getId().toString()), "bean", ctx);
- ctx.inUse = true;
+ if (log.isTraceEnabled())
+ {
+ log.trace("Caching context " + ctx.getId() + " of type " + ctx.getClass());
+ }
+ putInCache(ctx);
+ ctx.setInUse(true);
ctx.lastUsed = System.currentTimeMillis();
}
catch (EJBException e)
@@ -106,9 +115,12 @@
try
{
ctx = (StatefulBeanContext) pool.get(initTypes, initValues);
- Fqn id = new Fqn(cacheNode, ctx.getId().toString());
- cache.put(id, "bean", ctx);
- ctx.inUse = true;
+ if (log.isTraceEnabled())
+ {
+ log.trace("Caching context " + ctx.getId() + " of type " + ctx.getClass());
+ }
+ putInCache(ctx);
+ ctx.setInUse(true);
ctx.lastUsed = System.currentTimeMillis();
}
catch (EJBException e)
@@ -124,28 +136,48 @@
public StatefulBeanContext get(Object key) throws EJBException
{
+ return get(key, true);
+ }
+
+ public StatefulBeanContext get(Object key, boolean markInUse) throws EJBException
+ {
StatefulBeanContext entry = null;
Fqn id = new Fqn(cacheNode, key.toString());
try
{
- Object obj = cache.get(id, "bean");
- entry = (StatefulBeanContext) obj;
+ // If need be, gravitate
+ InvocationContext ictx = cache.getInvocationContext();
+ ictx.setOptionOverrides(getGravitateOption());
+ entry = (StatefulBeanContext) cache.get(id, "bean");
}
catch (CacheException e)
{
RuntimeException re = convertToRuntimeException(e);
throw re;
}
+
if (entry == null)
{
- throw new NoSuchEJBException("Could not find Stateful bean: " + key);
+ throw new NoSuchEJBException("Could not find stateful bean: " + key);
}
- entry.inUse = true;
- // Mark it to eviction thread that don't passivate it yet.
- // Note the Fqn we use is relative to the region!
- region.markNodeCurrentlyInUse(new Fqn(key.toString()), MarkInUseWaitTime);
- entry.lastUsed = System.currentTimeMillis();
+ else if (markInUse && entry.isRemoved())
+ {
+ throw new NoSuchEJBException("Could not find stateful bean: " + key +
+ " (bean was marked as removed)");
+ }
+ entry.postReplicate();
+
+ if (markInUse)
+ {
+ entry.setInUse(true);
+
+ // Mark the Fqn telling the eviction thread not to passivate it yet.
+ // Note the Fqn we use is relative to the region!
+ region.markNodeCurrentlyInUse(new Fqn(key.toString()), MarkInUseWaitTime);
+ entry.lastUsed = System.currentTimeMillis();
+ }
+
if(log.isTraceEnabled())
{
log.trace("get: retrieved bean with cache id " +id.toString());
@@ -164,10 +196,15 @@
log.trace("remove: cache id " +id.toString());
}
StatefulBeanContext ctx = (StatefulBeanContext) cache.get(id, "bean");
+
+ if (ctx != null)
+ {
+ if (!ctx.isRemoved())
+ pool.remove(ctx);
- cache.removeNode(id);
-
- if (ctx != null) pool.remove(ctx);
+ if (ctx.getCanRemoveFromCache())
+ cache.removeNode(id);
+ }
}
catch (CacheException e)
{
@@ -180,7 +217,7 @@
{
synchronized (ctx)
{
- ctx.inUse = false;
+ ctx.setInUse(false);
ctx.lastUsed = System.currentTimeMillis();
// OK, it is free to passivate now.
// Note the Fqn we use is relative to the region!
@@ -190,21 +227,17 @@
public void replicate(StatefulBeanContext ctx)
{
+ // StatefulReplicationInterceptor should only pass us the ultimate
+ // parent context for a tree of nested beans, which should always be
+ // a standard StatefulBeanContext
+ if (ctx instanceof NestedStatefulBeanContext)
+ {
+ throw new IllegalArgumentException("Received unexpected replicate call for nested context " + ctx.getId());
+ }
+
try
{
- // Don't replicate proxies, as they are in the cache
- // as elements of their containing bean context
- // TODO -- if this is a ProxiedStatefulBeanContext
- // should we replicate the parent context, since that's where
- // the data is?
- if (!(ctx instanceof ProxiedStatefulBeanContext))
- {
- cache.put(new Fqn(cacheNode, ctx.getId().toString()), "bean", ctx);
- }
- else if (log.isTraceEnabled())
- {
- log.trace("replicate(): ignoring replicate call for proxy to " + ctx.getId());
- }
+ putInCache(ctx);
}
catch (CacheException e)
{
@@ -215,8 +248,13 @@
public void initialize(Container container) throws Exception
{
+ log = Logger.getLogger(getClass().getName() + "." + container.getEjbName());
+
+ this.pool = container.getPool();
+ ClassLoader cl = ((EJBContainer) container).getClassloader();
+ this.classloader = new WeakReference(cl);
+
Advisor advisor = (Advisor) container;
- this.pool = container.getPool();
CacheConfig config = (CacheConfig) advisor.resolveAnnotation(CacheConfig.class);
MBeanServer server = MBeanServerLocator.locateJBoss();
ObjectName cacheON = new ObjectName(config.name());
@@ -230,15 +268,19 @@
EvictionPolicyConfig epc = getEvictionPolicyConfig((int) config.idleTimeoutSeconds(),
config.maxSize());
region.setEvictionPolicy(epc);
- // BES 11/16/2006 uncomment if we switch to per-region marshalling
-// region.registerContextClassLoader(Thread.currentThread().getContextClassLoader());
-// region.activate();
- log.debug("initialize(): created eviction region: " +region + " for ejb: " +container.getEjbName());
+
+ // Transfer over the state for the region
+ region.registerContextClassLoader(cl);
+ region.activate();
+
+ log.debug("initialize(): created region: " +region + " for ejb: " +container.getEjbName());
}
protected EvictionPolicyConfig getEvictionPolicyConfig(int timeToLiveSeconds, int maxNodes)
{
LRUConfiguration epc = new LRUConfiguration();
+ // Override the standard policy class
+ epc.setEvictionPolicyClass(AbortableLRUPolicy.class.getName());
epc.setTimeToLiveSeconds(timeToLiveSeconds);
epc.setMaxNodes(maxNodes);
return epc;
@@ -260,22 +302,9 @@
// Remove the listener
cache.removeCacheListener(listener);
- // BES 11/16/2006 uncomment if we switch to per-region marshalling
- // If we do, we don't need to remove the node below; deactivate() does it
-// if (region != null)
-// {
-// region.deactivate();
-// region.unregisterContextClassLoader();
-// }
-
- // FIXME this method needs to be in Cache
- ((CacheSPI) cache).getRegionManager().removeRegion(region.getFqn());
- // Clear any queues
- region.resetEvictionQueues();
- region = null;
-
try {
- // remove locally.
+ // Remove locally. We do this to clean up the persistent store,
+ // which is not affected by the region.deactivate call below.
InvocationContext ctx = cache.getInvocationContext();
ctx.setOptionOverrides(getLocalOnlyOption());
cache.removeNode(cacheNode);
@@ -284,10 +313,31 @@
{
log.error("stop(): can't remove bean from the underlying distributed cache");
}
-
+
+ if (region != null)
+ {
+ region.deactivate();
+ region.unregisterContextClassLoader();
+
+ // FIXME this method needs to be in Cache
+ ((CacheSPI) cache).getRegionManager().removeRegion(region.getFqn());
+ // Clear any queues
+ region.resetEvictionQueues();
+ region = null;
+ }
+
+ classloader = null;
+
log.debug("stop(): StatefulTreeCache stopped successfully for " +cacheNode);
}
+ private void putInCache(StatefulBeanContext ctx)
+ {
+ ctx.preReplicate();
+ cache.put(new Fqn(cacheNode, ctx.getId().toString()), "bean", ctx);
+ ctx.markedForReplication = false;
+ }
+
/**
* Creates a RuntimeException, but doesn't pass CacheException as the cause
* as it is a type that likely doesn't exist on a client.
@@ -307,39 +357,8 @@
*/
public class ClusteredStatefulCacheListener extends AbstractCacheListener
{
- protected Logger log = Logger.getLogger(ClusteredStatefulCacheListener.class);
-
- // BES 11/18/2006 this was causing stack overflow; switched to nodeLoaded,
- // which gives direct access to the data
-// @Override
-// public void nodeActivated(Fqn fqn, boolean pre) {
-// if(pre) return; // we are not interested in preActivate event
-// if(fqn.size() != FQN_SIZE) return;
-// if(!fqn.isChildOrEquals(cacheNode)) return; // don't care about fqn that doesn't belong to me.
-//
-// StatefulBeanContext bean = null;
-// try {
-// // TODO Can this cause deadlock in the cache level? Should be ok but need review.
-// bean = (StatefulBeanContext) cache.get(fqn, "bean");
-// } catch (CacheException e) {
-// log.error("nodeActivate(): can't retrieve bean instance from: " +fqn + " with exception: " +e);
-// return;
-// }
-//
-// if(bean == null)
-// {
-// throw new IllegalStateException("nodeActivate(): null bean instance.");
-// }
-//
-//// log.debug("nodeActivate(): send postActivate event on fqn: " +fqn);
-// if(log.isTraceEnabled())
-// {
-// log.trace("nodeActivate(): send postActivate event on fqn: " +fqn);
-// }
-//
-// bean.postActivate();
-// }
-
+ // BES 11/18/2006 using nodeActivated callback was causing stack overflow;
+ // switched to nodeLoaded, which gives direct access to the data
@Override
public void nodeLoaded(Fqn fqn, boolean pre, Map nodeData)
{
@@ -360,9 +379,23 @@
{
log.trace("nodeLoaded(): send postActivate event to bean at fqn: " +fqn);
}
-
- bean.postActivate();
+ ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ ClassLoader cl = classloader.get();
+ if (cl != null)
+ {
+ Thread.currentThread().setContextClassLoader(cl);
+ }
+
+ bean.activateAfterReplication();
+ }
+ finally
+ {
+ Thread.currentThread().setContextClassLoader(oldCl);
+ }
+
}
@Override
@@ -381,19 +414,26 @@
ctx.setOptionOverrides(getBypassOption());
bean = (StatefulBeanContext) cache.get(fqn, "bean");
if (bean != null)
- {
- if (bean.inUse)
+ {
+ ClassLoader cl = classloader.get();
+ if (cl != null)
{
+ Thread.currentThread().setContextClassLoader(cl);
+ }
+
+ if (!bean.getCanPassivate())
+ {
// Abort the eviction
- throw new IllegalStateException("Cannot passivate bean " + fqn + " -- it is currently in use");
+ throw new ContextInUseException("Cannot passivate bean " + fqn +
+ " -- it or one if its children is currently in use");
}
if(log.isTraceEnabled())
{
log.trace("nodePassivated(): send prePassivate event to bean at fqn: " +fqn);
}
- Thread.currentThread().setContextClassLoader(((EJBContainer) bean.getContainer()).getClassloader());
- bean.prePassivate();
+
+ bean.passivateAfterReplication();
}
}
catch (CacheException e)
@@ -403,6 +443,8 @@
}
catch (NoSuchEJBException e)
{
+ // TODO is this still necessary? Don't think we
+ // should have orphaned proxies any more
if (bean instanceof ProxiedStatefulBeanContext)
{
// This is probably an orphaned proxy; double check and remove it
@@ -438,7 +480,7 @@
}
}
- private Option getBypassOption()
+ private static Option getBypassOption()
{
try
{
@@ -450,7 +492,7 @@
}
}
- private Option getLocalOnlyOption()
+ private static Option getLocalOnlyOption()
{
try
{
@@ -461,4 +503,16 @@
throw new RuntimeException(e);
}
}
+
+ private static Option getGravitateOption()
+ {
+ try
+ {
+ return GRAVITATE_OPTION.clone();
+ }
+ catch (CloneNotSupportedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list