[jboss-cvs] JBossAS SVN: r80313 - in projects/ejb3/trunk/core/src: test/java/org/jboss/ejb3/core/test/ejbthree1549 and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 31 08:45:14 EDT 2008


Author: wolfc
Date: 2008-10-31 08:45:14 -0400 (Fri, 31 Oct 2008)
New Revision: 80313

Modified:
   projects/ejb3/trunk/core/src/main/java/org/jboss/ejb3/cache/simple/SimpleStatefulCache.java
   projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/ForcePassivationCache.java
   projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/unit/PassivationDoesNotPreventNewActivityUnitTestCase.java
Log:
EJBTHREE-1549: passivating off a queue

Modified: projects/ejb3/trunk/core/src/main/java/org/jboss/ejb3/cache/simple/SimpleStatefulCache.java
===================================================================
--- projects/ejb3/trunk/core/src/main/java/org/jboss/ejb3/cache/simple/SimpleStatefulCache.java	2008-10-31 12:00:10 UTC (rev 80312)
+++ projects/ejb3/trunk/core/src/main/java/org/jboss/ejb3/cache/simple/SimpleStatefulCache.java	2008-10-31 12:45:14 UTC (rev 80313)
@@ -25,6 +25,9 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.ejb.EJBException;
 import javax.ejb.NoSuchEJBException;
@@ -62,8 +65,10 @@
    private int createCount = 0;
    private int passivatedCount = 0;
    private int removeCount = 0;
+   
+   private Queue<StatefulBeanContext> passivationQueue = new LinkedBlockingQueue<StatefulBeanContext>();
 
-   protected class CacheMap extends LinkedHashMap
+   protected class CacheMap extends LinkedHashMap<Object, StatefulBeanContext>
    {
       private static final long serialVersionUID = 4514182777643616159L;
 
@@ -72,12 +77,13 @@
          super(maxSize, 0.75F, true);
       }
       
-      public CacheMap(Map original)
+      public CacheMap(Map<? extends Object, ? extends StatefulBeanContext> original)
       {
          super(original);
       }
 
-      public boolean removeEldestEntry(Map.Entry entry)
+      @Override
+      public boolean removeEldestEntry(Entry<Object, StatefulBeanContext> entry)
       {
          boolean removeIt = size() > maxSize;
          if (removeIt)
@@ -136,11 +142,11 @@
                {
                   if (!running) return;
                    
-                  Iterator it = cacheMap.entrySet().iterator();
+                  Iterator<Entry<Object, StatefulBeanContext>> it = cacheMap.entrySet().iterator();
                   while (it.hasNext())
                   {
-                     Map.Entry entry = (Map.Entry) it.next();
-                     StatefulBeanContext centry = (StatefulBeanContext) entry.getValue();
+                     Entry<Object, StatefulBeanContext> entry = it.next();
+                     StatefulBeanContext centry = entry.getValue();
                      if (now - centry.lastUsed >= removalTimeout * 1000)
                      {
                         synchronized (centry)
@@ -184,12 +190,20 @@
       }
       
       /**
-       * Just provides a hook
+       * I'm done passivating.
        */
-      public void passivationCompleted()
+      protected void passivationCompleted()
       {
          
       }
+      
+      /**
+       * I'm done selecting candidates for passivation.
+       */
+      protected void prePassivationCompleted()
+      {
+         
+      }
 
       public void run()
       {
@@ -206,95 +220,65 @@
             }
             try
             {
-
                /*
                 * EJBTHREE-1549
                 * 
                 * Passivation is potentially a long-running
                 * operation, so copy the contents quickly and 
-                * perform passivation on the unpublished 
-                * local stack variable copy
+                * perform passivation off a queue.
                 */
-
-               // Initialize
-               CacheMap newMap = null;
-
-               // Copy the contents of the internal map
                synchronized (cacheMap)
                {
-                  newMap = new CacheMap(cacheMap);
-               }
-
-               /*
-                * End EJBTHREE-1549
-                */
-
-               if (!running)
-                  return;
-
-               boolean trace = log.isTraceEnabled();
-               Iterator it = newMap.entrySet().iterator();
-               long now = System.currentTimeMillis();
-               while (it.hasNext())
-               {
-                  Map.Entry entry = (Map.Entry) it.next();
-                  StatefulBeanContext centry = (StatefulBeanContext) entry.getValue();
-                  if (now - centry.lastUsed >= sessionTimeout * 1000)
+                  if (!running) return;
+                  
+                  boolean trace = log.isTraceEnabled();
+                  Iterator<Entry<Object, StatefulBeanContext>> it = cacheMap.entrySet().iterator();
+                  long now = System.currentTimeMillis();
+                  while (it.hasNext())
                   {
-                     synchronized (centry)
+                     Entry<Object, StatefulBeanContext> entry = it.next();
+                     StatefulBeanContext centry = entry.getValue();
+                     if (now - centry.lastUsed >= sessionTimeout * 1000)
                      {
-                        if (centry.getCanPassivate())
-                        {
-                           if (!centry.getCanRemoveFromCache())
+                        synchronized (centry)
+                        {                     
+                           if (centry.getCanPassivate())
                            {
-                              passivate(centry);
+                              if (!centry.getCanRemoveFromCache())
+                              {
+                                 passivationQueue.add(centry);
+                              }
+                              else if (trace)
+                              {
+                                 log.trace("Removing " + entry.getKey() + " from cache");
+                              }
                            }
-                           else if (trace)
+                           else
                            {
-                              log.trace("Removing " + entry.getKey() + " from cache");
+                              centry.markedForPassivation = true;                              
+                              assert centry.isInUse() : centry + " is not in use, and thus will never be passivated";
                            }
+                           // its ok to evict because it will be passivated
+                           // or we determined above that we can remove it
+                           it.remove();
                         }
-                        else
-                        {
-                           centry.markedForPassivation = true;
-                           assert centry.isInUse() : centry + " is not in use, and thus will never be passivated";
-                        }
-
-                        // its ok to evict because it will be passivated
-                        // or we determined above that we can remove it
-
-                        // Remove from the copy
-                        it.remove();
-
-                        /*
-                         * EJBTHREE-1549
-                         */
-
-                        // Remove from the internal cacheMap
-                        Object removed = null;
-                        Object key = entry.getKey();
-                        synchronized (cacheMap)
-                        {
-                           removed = cacheMap.remove(key);
-                        }
-
-                        // Perform some assertions
-                        assert removed != null : "Could not remove key " + key
-                              + " from internal cacheMap as there was no corresponding entry";
-                        assert removed == centry : "Removed " + removed
-                              + " from internal cacheMap did not match the object we were expecting: " + centry;
-
-                        /*
-                         * End EJBTHREE-1549
-                         */
                      }
-                  }
-                  else if (trace)
-                  {
-                     log.trace("Not passivating; id=" + centry.getId() + " only inactive "
-                           + Math.max(0, now - centry.lastUsed) + " ms");
-                  }
+                     else if (trace)
+                     {
+                        log.trace("Not passivating; id=" + centry.getId() +
+                              " only inactive " + Math.max(0, now - centry.lastUsed) + " ms");
+                     }
+                  }                  
                }
+               
+               prePassivationCompleted();
+               
+               StatefulBeanContext ctx;
+               while((ctx = passivationQueue.poll()) != null)
+               {
+                  passivate(ctx);
+               }
+               
                // Make internal callback that we're done
                this.passivationCompleted();
             }
@@ -372,7 +356,10 @@
       try
       {
          Thread.currentThread().setContextClassLoader(((EJBContainer) ctx.getContainer()).getClassloader());
-         pm.passivateSession(ctx);
+         synchronized(pm)
+         {
+            pm.passivateSession(ctx);
+         }
          ++passivatedCount;
       }
       finally
@@ -399,9 +386,9 @@
          synchronized (cacheMap)
          {
             cacheMap.put(ctx.getId(), ctx);
+            ctx.setInUse(true);
+            ctx.lastUsed = System.currentTimeMillis();
          }
-         ctx.setInUse(true);
-         ctx.lastUsed = System.currentTimeMillis();
          ++createCount;
       }
       catch (EJBException e)
@@ -427,28 +414,64 @@
       StatefulBeanContext entry = null;
       synchronized (cacheMap)
       {
-         entry = (StatefulBeanContext) cacheMap.get(key);
+         entry = cacheMap.get(key);
       }
-      if (entry == null)
+      if(entry == null)
       {
-         entry = (StatefulBeanContext) pm.activateSession(key);
-         if (entry == null)
+         // TODO: optimize
+         synchronized (cacheMap)
          {
-            throw new NoSuchEJBException("Could not find stateful bean: " + key);
+            entry = cacheMap.get(key);
+            if(entry == null)
+            {
+               Iterator<StatefulBeanContext> i = passivationQueue.iterator();
+               while(i.hasNext())
+               {
+                  StatefulBeanContext ctx = i.next();
+                  if(ctx.getId().equals(key))
+                  {
+                     boolean passivationCanceled = passivationQueue.remove(ctx);
+                     if(passivationCanceled)
+                     {
+                        entry = ctx;
+                        cacheMap.put(key, entry);
+                     }
+                     break;
+                  }
+               }
+            }
          }
-         --passivatedCount;
-         
-         // We cache the entry even if we will throw an exception below
-         // as we may still need it for its children and XPC references
-         if (log.isTraceEnabled())
+      }
+      if (entry == null)
+      {
+         synchronized(pm)
          {
-            log.trace("Caching activated context " + entry.getId() + " of type " + entry.getClass());
+            synchronized (cacheMap)
+            {
+               entry = cacheMap.get(key);
+            }
+            if(entry == null)
+            {
+               entry = pm.activateSession(key);
+               if (entry == null)
+               {
+                  throw new NoSuchEJBException("Could not find stateful bean: " + key);
+               }
+               --passivatedCount;
+               
+               // We cache the entry even if we will throw an exception below
+               // as we may still need it for its children and XPC references
+               if (log.isTraceEnabled())
+               {
+                  log.trace("Caching activated context " + entry.getId() + " of type " + entry.getClass());
+               }
+               
+               synchronized (cacheMap)
+               {
+                  cacheMap.put(key, entry);
+               }
+            }
          }
-         
-         synchronized (cacheMap)
-         {
-            cacheMap.put(key, entry);
-         }
       }
       
       // Now we know entry isn't null
@@ -494,7 +517,7 @@
       StatefulBeanContext ctx = null;
       synchronized (cacheMap)
       {
-         ctx = (StatefulBeanContext) cacheMap.get(key);
+         ctx = cacheMap.get(key);
       }
       if(ctx == null)
          throw new NoSuchEJBException("Could not find Stateful bean: " + key);

Modified: projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/ForcePassivationCache.java
===================================================================
--- projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/ForcePassivationCache.java	2008-10-31 12:00:10 UTC (rev 80312)
+++ projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/ForcePassivationCache.java	2008-10-31 12:45:14 UTC (rev 80313)
@@ -24,6 +24,8 @@
 import java.io.Serializable;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.jboss.ejb3.cache.simple.SimpleStatefulCache;
 import org.jboss.ejb3.stateful.StatefulBeanContext;
@@ -57,6 +59,8 @@
     */
    private static final Object START_PASSIVATION_LOCK = new Object();
    private static volatile boolean passivationForced = false;
+   
+   public static final CyclicBarrier PRE_PASSIVATE_BARRIER = new CyclicBarrier(2);
 
    // --------------------------------------------------------------------------------||
    // Functional Methods -------------------------------------------------------------||
@@ -186,7 +190,7 @@
       }
 
       @Override
-      public void passivationCompleted()
+      protected void passivationCompleted()
       {
          // Call super
          super.passivationCompleted();
@@ -212,6 +216,29 @@
             POST_PASSIVATE_BARRIER.reset();
          }
       }
+      
+      @Override
+      protected void prePassivationCompleted()
+      {
+         super.prePassivationCompleted();
+         
+         try
+         {
+            PRE_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+         }
+         catch (BrokenBarrierException e)
+         {
+            throw new RuntimeException("PRE_PASSIVATE_BARRIER prematurely broken", e);
+         }
+         catch(InterruptedException e)
+         {
+            throw new RuntimeException(e);
+         }
+         catch(TimeoutException e)
+         {
+            throw new RuntimeException(e);
+         }
+      }
    }
 
 }
\ No newline at end of file

Modified: projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/unit/PassivationDoesNotPreventNewActivityUnitTestCase.java
===================================================================
--- projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/unit/PassivationDoesNotPreventNewActivityUnitTestCase.java	2008-10-31 12:00:10 UTC (rev 80312)
+++ projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/unit/PassivationDoesNotPreventNewActivityUnitTestCase.java	2008-10-31 12:45:14 UTC (rev 80313)
@@ -114,9 +114,11 @@
          ForcePassivationCache.forcePassivation();
          log.info("Passivation forced, carrying out test");
 
+         ForcePassivationCache.PRE_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+         
          // Block until the PM is ready to passivate
          log.info("Waiting on common barrier for PM to run...");
-         BlockingPersistenceManager.BARRIER.await();
+         BlockingPersistenceManager.BARRIER.await(5, TimeUnit.SECONDS);
          log.info("PM and test have met barrier, passivation running (but will be blocked to complete by test)");
          
          Callable<Integer> task = new Callable<Integer>() {
@@ -152,6 +154,87 @@
       assertEquals("the postPassivation counter should be 1 higher than the previous (during passivation)", duringPassivation + 1, postPassivation);
    }
    
+   @Test
+   public void testInvokeSameSessionDuringPrePassivation() throws Throwable
+   {
+      final MyStatefulLocal bean = lookup(MyStatefulLocal.JNDI_NAME, MyStatefulLocal.class);
+      
+      // Get our bean's Session ID
+      StatefulLocalProxyInvocationHandler handler = (StatefulLocalProxyInvocationHandler) Proxy.getInvocationHandler(bean);
+      Serializable sessionId = handler.getSessionId();
+
+      // Invoke upon our bean
+      int next = bean.getNextCounter();
+      log.info("Got counter from " + sessionId + ": " + next);
+      TestCase.assertEquals("SFSB did not return expected next counter", 0, next);
+
+      // Get the Cache
+      ForcePassivationCache cache = (ForcePassivationCache) container.getCache();
+
+      // Get the lock to block the PM, now
+      boolean gotLock = BlockingPersistenceManager.PASSIVATION_LOCK.tryLock();
+
+      Future<Integer> result;
+      // Once PM lock is acquired, everything is in "try" so we release in "finally"
+      try
+      {
+         // Ensure we got the PM lock, else fail the test
+         TestCase.assertTrue("Test was not able to immediately get the lock to block the PersistenceManager", gotLock);
+         log.info("Locked " + BlockingPersistenceManager.class.getSimpleName());
+         
+         // Mark
+         cache.makeSessionEligibleForPassivation(sessionId);
+
+         /*
+          * Passivate
+          */
+
+         // Trigger Passivation
+         ForcePassivationCache.forcePassivation();
+         log.info("Passivation forced, carrying out test");
+
+         Callable<Integer> task = new Callable<Integer>() {
+            public Integer call() throws Exception
+            {
+               return bean.getNextCounter();
+            }
+         };
+         ExecutorService executor = Executors.newFixedThreadPool(1);
+         result = executor.submit(task);
+         
+         // TODO: there is no way to know where we are in StatefulInstanceInterceptor
+         Thread.sleep(5000);
+         
+         ForcePassivationCache.PRE_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+         
+         // Block until the PM is ready to passivate
+         /* we're not passivating, we yanked it out
+         log.info("Waiting on common barrier for PM to run...");
+         BlockingPersistenceManager.BARRIER.await(5, TimeUnit.SECONDS);
+         log.info("PM and test have met barrier, passivation running (but will be blocked to complete by test)");
+         */
+      }
+      finally
+      {
+         // Allow the Persistence Manager to finish up
+         log.info("Letting the PM perform passivation...");
+         BlockingPersistenceManager.PASSIVATION_LOCK.unlock();
+      }
+      
+      // We need to allow time to let the Cache finish passivation, so block until it's done
+      log.info("Waiting on Cache to tell us passivation is completed...");
+      ForcePassivationCache.POST_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+      log.info("Test sees Cache reports passivation completed.");
+      
+      int duringPassivation = result.get(5, TimeUnit.SECONDS);
+      log.info("Got counter from " + sessionId + ": " + duringPassivation);
+      
+      int postPassivation = bean.getNextCounter();
+      log.info("Got counter from " + sessionId + ": " + postPassivation);
+      
+      assertEquals("the postPassivation counter should be 1 higher than the previous (during passivation)", duringPassivation + 1, postPassivation);
+   }
+   
    /**
     * Tests that a new session may be created while another is being passivated
     */
@@ -313,9 +396,11 @@
          ForcePassivationCache.forcePassivation();
          log.info("Passivation forced, carrying out test");
 
+         ForcePassivationCache.PRE_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+         
          // Block until the PM is ready to passivate
          log.info("Waiting on common barrier for PM to run...");
-         BlockingPersistenceManager.BARRIER.await();
+         BlockingPersistenceManager.BARRIER.await(5, TimeUnit.SECONDS);
          log.info("PM and test have met barrier, passivation running (but will be blocked to complete by test)");
 
          /*
@@ -365,7 +450,7 @@
 
       // We need to allow time to let the Cache finish passivation, so block until it's done
       log.info("Waiting on Cache to tell us passivation is completed...");
-      ForcePassivationCache.POST_PASSIVATE_BARRIER.await();
+      ForcePassivationCache.POST_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
       log.info("Test sees Cache reports passivation completed.");
 
       /*
@@ -442,6 +527,7 @@
    public void after()
    {
       ForcePassivationCache.POST_PASSIVATE_BARRIER.reset();
+      ForcePassivationCache.PRE_PASSIVATE_BARRIER.reset();
       ForcePassivationCache cache = (ForcePassivationCache) container.getCache();
       cache.clear();
    }




More information about the jboss-cvs-commits mailing list