[jboss-cvs] JBossAS SVN: r60617 - trunk/ejb3/src/main/org/jboss/ejb3/cache/tree.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Feb 18 22:52:25 EST 2007


Author: bstansberry at jboss.com
Date: 2007-02-18 22:52:25 -0500 (Sun, 18 Feb 2007)
New Revision: 60617

Modified:
   trunk/ejb3/src/main/org/jboss/ejb3/cache/tree/StatefulTreeCache.java
Log:
[EJBTHREE-867] Loosen coupling of lifecycle of nested SFSBs
[EJBTHREE-849] Properly handle passivation/activation callbacks for nested SFSBs
[EJBTHREE-851] Invoke passivation/activation callbacks around replication
[EJBTHREE-846] Transfer passivated sessions on startup
[EJBTHREE-856] Add gravitation call needed for buddy replication

Modified: trunk/ejb3/src/main/org/jboss/ejb3/cache/tree/StatefulTreeCache.java
===================================================================
--- trunk/ejb3/src/main/org/jboss/ejb3/cache/tree/StatefulTreeCache.java	2007-02-19 03:48:42 UTC (rev 60616)
+++ trunk/ejb3/src/main/org/jboss/ejb3/cache/tree/StatefulTreeCache.java	2007-02-19 03:52:25 UTC (rev 60617)
@@ -21,6 +21,7 @@
   */
 package org.jboss.ejb3.cache.tree;
 
+import java.lang.ref.WeakReference;
 import java.util.Map;
 
 import javax.ejb.EJBException;
@@ -28,30 +29,30 @@
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.jboss.annotation.ejb.cache.tree.CacheConfig;
 import org.jboss.aop.Advisor;
-import org.jboss.cache.eviction.EvictionPolicyConfig;
-import org.jboss.cache.eviction.LRUConfiguration;
-import org.jboss.cache.jmx.CacheJmxWrapperMBean;
 import org.jboss.cache.AbstractCacheListener;
 import org.jboss.cache.Cache;
 import org.jboss.cache.CacheException;
 import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.Region;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.eviction.EvictionPolicyConfig;
+import org.jboss.cache.eviction.LRUConfiguration;
+import org.jboss.cache.jmx.CacheJmxWrapperMBean;
 import org.jboss.ejb3.Container;
 import org.jboss.ejb3.EJBContainer;
 import org.jboss.ejb3.Pool;
 import org.jboss.ejb3.cache.ClusteredStatefulCache;
+import org.jboss.ejb3.stateful.NestedStatefulBeanContext;
 import org.jboss.ejb3.stateful.ProxiedStatefulBeanContext;
 import org.jboss.ejb3.stateful.StatefulBeanContext;
+import org.jboss.logging.Logger;
 import org.jboss.mx.util.MBeanProxyExt;
 import org.jboss.mx.util.MBeanServerLocator;
-import org.jboss.logging.Logger;
-import org.jboss.annotation.ejb.cache.tree.CacheConfig;
 
-import org.jboss.cache.Fqn;
-import org.jboss.cache.config.Option;
-
 /**
  * Comment
  *
@@ -60,23 +61,27 @@
  */
 public class StatefulTreeCache implements ClusteredStatefulCache
 {
-   protected static Logger log = Logger.getLogger(StatefulTreeCache.class);
-   static int FQN_SIZE = 2; // depth of fqn that we store the session in.
-
+   private static final int FQN_SIZE = 2; // depth of fqn that we store the session in.
+   
    private static Option BYPASS_OPTION = new Option();
    private static Option LOCAL_ONLY_OPTION = new Option();
+   private static Option GRAVITATE_OPTION = new Option();
    static
    {
       BYPASS_OPTION.setBypassInterceptorChain(true);
       LOCAL_ONLY_OPTION.setCacheModeLocal(true);
+      GRAVITATE_OPTION.setForceDataGravitation(true);
    }
    
+   private Logger log = Logger.getLogger(StatefulTreeCache.class);
    private Pool pool;
+   private WeakReference<ClassLoader> classloader;
    private Cache cache;
    private Fqn cacheNode;
    private Region region;
    private ClusteredStatefulCacheListener listener;
    
+   
    public static long MarkInUseWaitTime = 15000;
 
    public StatefulBeanContext create()
@@ -85,8 +90,12 @@
       try
       {
          ctx = (StatefulBeanContext) pool.get();
-         cache.put(new Fqn(cacheNode, ctx.getId().toString()), "bean", ctx);
-         ctx.inUse = true;
+         if (log.isTraceEnabled())
+         {
+            log.trace("Caching context " + ctx.getId() + " of type " + ctx.getClass());
+         }
+         putInCache(ctx);
+         ctx.setInUse(true);
          ctx.lastUsed = System.currentTimeMillis();
       }
       catch (EJBException e)
@@ -106,9 +115,12 @@
       try
       {
          ctx = (StatefulBeanContext) pool.get(initTypes, initValues);
-         Fqn id = new Fqn(cacheNode, ctx.getId().toString());
-         cache.put(id, "bean", ctx);
-         ctx.inUse = true;
+         if (log.isTraceEnabled())
+         {
+            log.trace("Caching context " + ctx.getId() + " of type " + ctx.getClass());
+         }
+         putInCache(ctx);
+         ctx.setInUse(true);
          ctx.lastUsed = System.currentTimeMillis();
       }
       catch (EJBException e)
@@ -124,28 +136,48 @@
 
    public StatefulBeanContext get(Object key) throws EJBException
    {
+      return get(key, true);
+   }
+   
+   public StatefulBeanContext get(Object key, boolean markInUse) throws EJBException
+   {
       StatefulBeanContext entry = null;
       Fqn id = new Fqn(cacheNode, key.toString());
       try
       {
-         Object obj = cache.get(id, "bean");
-         entry = (StatefulBeanContext) obj;
+         // If need be, gravitate
+         InvocationContext ictx = cache.getInvocationContext();
+         ictx.setOptionOverrides(getGravitateOption());
+         entry = (StatefulBeanContext) cache.get(id, "bean");
       }
       catch (CacheException e)
       {
          RuntimeException re = convertToRuntimeException(e);
          throw re;
       }
+      
       if (entry == null)
       {
-         throw new NoSuchEJBException("Could not find Stateful bean: " + key);
+         throw new NoSuchEJBException("Could not find stateful bean: " + key);
       }
-      entry.inUse = true;
-      // Mark it to eviction thread that don't passivate it yet.
-      // Note the Fqn we use is relative to the region!
-      region.markNodeCurrentlyInUse(new Fqn(key.toString()), MarkInUseWaitTime);
-      entry.lastUsed = System.currentTimeMillis();
+      else if (markInUse && entry.isRemoved())
+      {
+         throw new NoSuchEJBException("Could not find stateful bean: " + key + 
+                                      " (bean was marked as removed)");
+      }
       
+      entry.postReplicate();
+      
+      if (markInUse)
+      {
+         entry.setInUse(true);
+         
+         // Mark the Fqn telling the eviction thread not to passivate it yet.
+         // Note the Fqn we use is relative to the region!
+         region.markNodeCurrentlyInUse(new Fqn(key.toString()), MarkInUseWaitTime);
+         entry.lastUsed = System.currentTimeMillis();
+      }
+      
       if(log.isTraceEnabled())
       {
          log.trace("get: retrieved bean with cache id " +id.toString());
@@ -164,10 +196,15 @@
             log.trace("remove: cache id " +id.toString());
          }
          StatefulBeanContext ctx = (StatefulBeanContext) cache.get(id, "bean"); 
+         
+         if (ctx != null)
+         {
+            if (!ctx.isRemoved())
+               pool.remove(ctx);
             
-         cache.removeNode(id);
-         
-         if (ctx != null) pool.remove(ctx);
+            if (ctx.getCanRemoveFromCache())
+               cache.removeNode(id);
+         }
       }
       catch (CacheException e)
       {
@@ -180,7 +217,7 @@
    {
       synchronized (ctx)
       {
-         ctx.inUse = false;
+         ctx.setInUse(false);
          ctx.lastUsed = System.currentTimeMillis();
          // OK, it is free to passivate now.
          // Note the Fqn we use is relative to the region!
@@ -190,21 +227,17 @@
 
    public void replicate(StatefulBeanContext ctx)
    {
+      // StatefulReplicationInterceptor should only pass us the ultimate
+      // parent context for a tree of nested beans, which should always be
+      // a standard StatefulBeanContext
+      if (ctx instanceof NestedStatefulBeanContext)
+      {
+         throw new IllegalArgumentException("Received unexpected replicate call for nested context " + ctx.getId());
+      }
+      
       try
       {
-         // Don't replicate proxies, as they are in the cache
-         // as elements of their containing bean context
-         // TODO -- if this is a ProxiedStatefulBeanContext
-         // should we replicate the parent context, since that's where
-         // the data is? 
-         if (!(ctx instanceof ProxiedStatefulBeanContext))
-         {
-            cache.put(new Fqn(cacheNode, ctx.getId().toString()), "bean", ctx);
-         }
-         else if (log.isTraceEnabled())
-         {
-            log.trace("replicate(): ignoring replicate call for proxy to " + ctx.getId());
-         }
+         putInCache(ctx);
       }
       catch (CacheException e)
       {
@@ -215,8 +248,13 @@
 
    public void initialize(Container container) throws Exception
    {
+      log = Logger.getLogger(getClass().getName() + "." + container.getEjbName());
+      
+      this.pool = container.getPool();
+      ClassLoader cl = ((EJBContainer) container).getClassloader();
+      this.classloader = new WeakReference(cl);
+      
       Advisor advisor = (Advisor) container;
-      this.pool = container.getPool();
       CacheConfig config = (CacheConfig) advisor.resolveAnnotation(CacheConfig.class);
       MBeanServer server = MBeanServerLocator.locateJBoss();
       ObjectName cacheON = new ObjectName(config.name());
@@ -230,15 +268,19 @@
       EvictionPolicyConfig epc = getEvictionPolicyConfig((int) config.idleTimeoutSeconds(),
             config.maxSize());
       region.setEvictionPolicy(epc);
-      // BES 11/16/2006 uncomment if we switch to per-region marshalling
-//      region.registerContextClassLoader(Thread.currentThread().getContextClassLoader());
-//      region.activate();
-      log.debug("initialize(): created eviction region: " +region + " for ejb: " +container.getEjbName());
+      
+      // Transfer over the state for the region
+      region.registerContextClassLoader(cl);
+      region.activate();
+      
+      log.debug("initialize(): created region: " +region + " for ejb: " +container.getEjbName());
    }
    
    protected EvictionPolicyConfig getEvictionPolicyConfig(int timeToLiveSeconds, int maxNodes)
    {
       LRUConfiguration epc = new LRUConfiguration();
+      // Override the standard policy class
+      epc.setEvictionPolicyClass(AbortableLRUPolicy.class.getName());
       epc.setTimeToLiveSeconds(timeToLiveSeconds);
       epc.setMaxNodes(maxNodes);
       return epc;
@@ -260,22 +302,9 @@
       // Remove the listener
       cache.removeCacheListener(listener);
       
-      // BES 11/16/2006 uncomment if we switch to per-region marshalling
-      // If we do, we don't need to remove the node below; deactivate() does it
-//      if (region != null)
-//      {
-//         region.deactivate();
-//         region.unregisterContextClassLoader();
-//      }
-
-      // FIXME this method needs to be in Cache
-      ((CacheSPI) cache).getRegionManager().removeRegion(region.getFqn());
-      // Clear any queues
-      region.resetEvictionQueues();
-      region = null;
-      
       try {
-         // remove locally.  
+         // Remove locally. We do this to clean up the persistent store,
+         // which is not affected by the region.deactivate call below.
          InvocationContext ctx = cache.getInvocationContext();
          ctx.setOptionOverrides(getLocalOnlyOption());
          cache.removeNode(cacheNode);
@@ -284,10 +313,31 @@
       {
          log.error("stop(): can't remove bean from the underlying distributed cache");
       }
-
+      
+      if (region != null)
+      {
+         region.deactivate();
+         region.unregisterContextClassLoader();      
+         
+         // FIXME this method needs to be in Cache
+         ((CacheSPI) cache).getRegionManager().removeRegion(region.getFqn());
+         // Clear any queues
+         region.resetEvictionQueues();
+         region = null;
+      }
+      
+      classloader = null;
+      
       log.debug("stop(): StatefulTreeCache stopped successfully for " +cacheNode);
    }
    
+   private void putInCache(StatefulBeanContext ctx)
+   {
+      ctx.preReplicate();
+      cache.put(new Fqn(cacheNode, ctx.getId().toString()), "bean", ctx);
+      ctx.markedForReplication = false;      
+   }
+   
    /** 
     * Creates a RuntimeException, but doesn't pass CacheException as the cause
     * as it is a type that likely doesn't exist on a client.
@@ -307,39 +357,8 @@
     */
    public class ClusteredStatefulCacheListener extends AbstractCacheListener
    {
-      protected Logger log = Logger.getLogger(ClusteredStatefulCacheListener.class);
-
-      // BES 11/18/2006 this was causing stack overflow; switched to nodeLoaded,
-      // which gives direct access to the data
-//      @Override
-//      public void nodeActivated(Fqn fqn, boolean pre) {
-//         if(pre) return;  // we are not interested in preActivate event
-//         if(fqn.size() != FQN_SIZE) return;
-//         if(!fqn.isChildOrEquals(cacheNode)) return;  // don't care about fqn that doesn't belong to me.
-//
-//         StatefulBeanContext bean = null;
-//         try {
-//            // TODO Can this cause deadlock in the cache level? Should be ok but need review.
-//            bean = (StatefulBeanContext) cache.get(fqn, "bean");
-//         } catch (CacheException e) {
-//            log.error("nodeActivate(): can't retrieve bean instance from: " +fqn + " with exception: " +e);
-//            return;
-//         }
-//         
-//         if(bean == null)
-//         {
-//            throw new IllegalStateException("nodeActivate(): null bean instance.");
-//         }
-//
-////         log.debug("nodeActivate(): send postActivate event on fqn: " +fqn);
-//         if(log.isTraceEnabled())
-//         {
-//            log.trace("nodeActivate(): send postActivate event on fqn: " +fqn);
-//         }
-//
-//         bean.postActivate();
-//      }
-      
+      // BES 11/18/2006 using nodeActivated callback was causing stack overflow; 
+      // switched to nodeLoaded, which gives direct access to the data      
       @Override
       public void nodeLoaded(Fqn fqn, boolean pre, Map nodeData)
       {
@@ -360,9 +379,23 @@
          {
             log.trace("nodeLoaded(): send postActivate event to bean at fqn: " +fqn);
          }
-
-         bean.postActivate();
          
+         ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
+         try
+         {  
+            ClassLoader cl = classloader.get();
+            if (cl != null)
+            {
+               Thread.currentThread().setContextClassLoader(cl);
+            }
+            
+            bean.activateAfterReplication();
+         }
+         finally
+         {
+            Thread.currentThread().setContextClassLoader(oldCl);
+         }
+         
       }
 
       @Override
@@ -381,19 +414,26 @@
             ctx.setOptionOverrides(getBypassOption());
             bean = (StatefulBeanContext) cache.get(fqn, "bean");
             if (bean != null)
-            {               
-               if (bean.inUse)
+            {
+               ClassLoader cl = classloader.get();
+               if (cl != null)
                {
+                  Thread.currentThread().setContextClassLoader(cl);
+               }
+               
+               if (!bean.getCanPassivate())
+               {
                   // Abort the eviction
-                  throw new IllegalStateException("Cannot passivate bean " + fqn + " -- it is currently in use");
+                  throw new ContextInUseException("Cannot passivate bean " + fqn + 
+                        " -- it or one if its children is currently in use");
                }
                
                if(log.isTraceEnabled())
                {
                   log.trace("nodePassivated(): send prePassivate event to bean at fqn: " +fqn);
                }
-               Thread.currentThread().setContextClassLoader(((EJBContainer) bean.getContainer()).getClassloader());
-               bean.prePassivate();
+               
+               bean.passivateAfterReplication();
             }
          } 
          catch (CacheException e) 
@@ -403,6 +443,8 @@
          }
          catch (NoSuchEJBException e)
          {
+            // TODO is this still necessary? Don't think we
+            // should have orphaned proxies any more
             if (bean instanceof ProxiedStatefulBeanContext)
             {
                // This is probably an orphaned proxy; double check and remove it
@@ -438,7 +480,7 @@
       }
    }
    
-   private Option getBypassOption()
+   private static Option getBypassOption()
    {
       try
       {
@@ -450,7 +492,7 @@
       }
    }
    
-   private Option getLocalOnlyOption()
+   private static Option getLocalOnlyOption()
    {
       try
       {
@@ -461,4 +503,16 @@
          throw new RuntimeException(e);
       }
    }
+   
+   private static Option getGravitateOption()
+   {
+      try
+      {
+         return GRAVITATE_OPTION.clone();
+      }
+      catch (CloneNotSupportedException e)
+      {
+         throw new RuntimeException(e);
+      }
+   }
 }




More information about the jboss-cvs-commits mailing list