[jboss-cvs] JBossAS SVN: r80313 - in projects/ejb3/trunk/core/src: test/java/org/jboss/ejb3/core/test/ejbthree1549 and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 31 08:45:14 EDT 2008
Author: wolfc
Date: 2008-10-31 08:45:14 -0400 (Fri, 31 Oct 2008)
New Revision: 80313
Modified:
projects/ejb3/trunk/core/src/main/java/org/jboss/ejb3/cache/simple/SimpleStatefulCache.java
projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/ForcePassivationCache.java
projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/unit/PassivationDoesNotPreventNewActivityUnitTestCase.java
Log:
EJBTHREE-1549: passivating off a queue
Modified: projects/ejb3/trunk/core/src/main/java/org/jboss/ejb3/cache/simple/SimpleStatefulCache.java
===================================================================
--- projects/ejb3/trunk/core/src/main/java/org/jboss/ejb3/cache/simple/SimpleStatefulCache.java 2008-10-31 12:00:10 UTC (rev 80312)
+++ projects/ejb3/trunk/core/src/main/java/org/jboss/ejb3/cache/simple/SimpleStatefulCache.java 2008-10-31 12:45:14 UTC (rev 80313)
@@ -25,6 +25,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
import javax.ejb.EJBException;
import javax.ejb.NoSuchEJBException;
@@ -62,8 +65,10 @@
private int createCount = 0;
private int passivatedCount = 0;
private int removeCount = 0;
+
+ private Queue<StatefulBeanContext> passivationQueue = new LinkedBlockingQueue<StatefulBeanContext>();
- protected class CacheMap extends LinkedHashMap
+ protected class CacheMap extends LinkedHashMap<Object, StatefulBeanContext>
{
private static final long serialVersionUID = 4514182777643616159L;
@@ -72,12 +77,13 @@
super(maxSize, 0.75F, true);
}
- public CacheMap(Map original)
+ public CacheMap(Map<? extends Object, ? extends StatefulBeanContext> original)
{
super(original);
}
- public boolean removeEldestEntry(Map.Entry entry)
+ @Override
+ public boolean removeEldestEntry(Entry<Object, StatefulBeanContext> entry)
{
boolean removeIt = size() > maxSize;
if (removeIt)
@@ -136,11 +142,11 @@
{
if (!running) return;
- Iterator it = cacheMap.entrySet().iterator();
+ Iterator<Entry<Object, StatefulBeanContext>> it = cacheMap.entrySet().iterator();
while (it.hasNext())
{
- Map.Entry entry = (Map.Entry) it.next();
- StatefulBeanContext centry = (StatefulBeanContext) entry.getValue();
+ Entry<Object, StatefulBeanContext> entry = it.next();
+ StatefulBeanContext centry = entry.getValue();
if (now - centry.lastUsed >= removalTimeout * 1000)
{
synchronized (centry)
@@ -184,12 +190,20 @@
}
/**
- * Just provides a hook
+ * I'm done passivating.
*/
- public void passivationCompleted()
+ protected void passivationCompleted()
{
}
+
+ /**
+ * I'm done selecting candidates for passivation.
+ */
+ protected void prePassivationCompleted()
+ {
+
+ }
public void run()
{
@@ -206,95 +220,65 @@
}
try
{
-
/*
* EJBTHREE-1549
*
* Passivation is potentially a long-running
* operation, so copy the contents quickly and
- * perform passivation on the unpublished
- * local stack variable copy
+ * perform passivation off a queue.
*/
-
- // Initialize
- CacheMap newMap = null;
-
- // Copy the contents of the internal map
synchronized (cacheMap)
{
- newMap = new CacheMap(cacheMap);
- }
-
- /*
- * End EJBTHREE-1549
- */
-
- if (!running)
- return;
-
- boolean trace = log.isTraceEnabled();
- Iterator it = newMap.entrySet().iterator();
- long now = System.currentTimeMillis();
- while (it.hasNext())
- {
- Map.Entry entry = (Map.Entry) it.next();
- StatefulBeanContext centry = (StatefulBeanContext) entry.getValue();
- if (now - centry.lastUsed >= sessionTimeout * 1000)
+ if (!running) return;
+
+ boolean trace = log.isTraceEnabled();
+ Iterator<Entry<Object, StatefulBeanContext>> it = cacheMap.entrySet().iterator();
+ long now = System.currentTimeMillis();
+ while (it.hasNext())
{
- synchronized (centry)
+ Entry<Object, StatefulBeanContext> entry = it.next();
+ StatefulBeanContext centry = entry.getValue();
+ if (now - centry.lastUsed >= sessionTimeout * 1000)
{
- if (centry.getCanPassivate())
- {
- if (!centry.getCanRemoveFromCache())
+ synchronized (centry)
+ {
+ if (centry.getCanPassivate())
{
- passivate(centry);
+ if (!centry.getCanRemoveFromCache())
+ {
+ passivationQueue.add(centry);
+ }
+ else if (trace)
+ {
+ log.trace("Removing " + entry.getKey() + " from cache");
+ }
}
- else if (trace)
+ else
{
- log.trace("Removing " + entry.getKey() + " from cache");
+ centry.markedForPassivation = true;
+ assert centry.isInUse() : centry + " is not in use, and thus will never be passivated";
}
+ // its ok to evict because it will be passivated
+ // or we determined above that we can remove it
+ it.remove();
}
- else
- {
- centry.markedForPassivation = true;
- assert centry.isInUse() : centry + " is not in use, and thus will never be passivated";
- }
-
- // its ok to evict because it will be passivated
- // or we determined above that we can remove it
-
- // Remove from the copy
- it.remove();
-
- /*
- * EJBTHREE-1549
- */
-
- // Remove from the internal cacheMap
- Object removed = null;
- Object key = entry.getKey();
- synchronized (cacheMap)
- {
- removed = cacheMap.remove(key);
- }
-
- // Perform some assertions
- assert removed != null : "Could not remove key " + key
- + " from internal cacheMap as there was no corresponding entry";
- assert removed == centry : "Removed " + removed
- + " from internal cacheMap did not match the object we were expecting: " + centry;
-
- /*
- * End EJBTHREE-1549
- */
}
- }
- else if (trace)
- {
- log.trace("Not passivating; id=" + centry.getId() + " only inactive "
- + Math.max(0, now - centry.lastUsed) + " ms");
- }
+ else if (trace)
+ {
+ log.trace("Not passivating; id=" + centry.getId() +
+ " only inactive " + Math.max(0, now - centry.lastUsed) + " ms");
+ }
+ }
}
+
+ prePassivationCompleted();
+
+ StatefulBeanContext ctx;
+ while((ctx = passivationQueue.poll()) != null)
+ {
+ passivate(ctx);
+ }
+
// Make internal callback that we're done
this.passivationCompleted();
}
@@ -372,7 +356,10 @@
try
{
Thread.currentThread().setContextClassLoader(((EJBContainer) ctx.getContainer()).getClassloader());
- pm.passivateSession(ctx);
+ synchronized(pm)
+ {
+ pm.passivateSession(ctx);
+ }
++passivatedCount;
}
finally
@@ -399,9 +386,9 @@
synchronized (cacheMap)
{
cacheMap.put(ctx.getId(), ctx);
+ ctx.setInUse(true);
+ ctx.lastUsed = System.currentTimeMillis();
}
- ctx.setInUse(true);
- ctx.lastUsed = System.currentTimeMillis();
++createCount;
}
catch (EJBException e)
@@ -427,28 +414,64 @@
StatefulBeanContext entry = null;
synchronized (cacheMap)
{
- entry = (StatefulBeanContext) cacheMap.get(key);
+ entry = cacheMap.get(key);
}
- if (entry == null)
+ if(entry == null)
{
- entry = (StatefulBeanContext) pm.activateSession(key);
- if (entry == null)
+ // TODO: optimize
+ synchronized (cacheMap)
{
- throw new NoSuchEJBException("Could not find stateful bean: " + key);
+ entry = cacheMap.get(key);
+ if(entry == null)
+ {
+ Iterator<StatefulBeanContext> i = passivationQueue.iterator();
+ while(i.hasNext())
+ {
+ StatefulBeanContext ctx = i.next();
+ if(ctx.getId().equals(key))
+ {
+ boolean passivationCanceled = passivationQueue.remove(ctx);
+ if(passivationCanceled)
+ {
+ entry = ctx;
+ cacheMap.put(key, entry);
+ }
+ break;
+ }
+ }
+ }
}
- --passivatedCount;
-
- // We cache the entry even if we will throw an exception below
- // as we may still need it for its children and XPC references
- if (log.isTraceEnabled())
+ }
+ if (entry == null)
+ {
+ synchronized(pm)
{
- log.trace("Caching activated context " + entry.getId() + " of type " + entry.getClass());
+ synchronized (cacheMap)
+ {
+ entry = cacheMap.get(key);
+ }
+ if(entry == null)
+ {
+ entry = pm.activateSession(key);
+ if (entry == null)
+ {
+ throw new NoSuchEJBException("Could not find stateful bean: " + key);
+ }
+ --passivatedCount;
+
+ // We cache the entry even if we will throw an exception below
+ // as we may still need it for its children and XPC references
+ if (log.isTraceEnabled())
+ {
+ log.trace("Caching activated context " + entry.getId() + " of type " + entry.getClass());
+ }
+
+ synchronized (cacheMap)
+ {
+ cacheMap.put(key, entry);
+ }
+ }
}
-
- synchronized (cacheMap)
- {
- cacheMap.put(key, entry);
- }
}
// Now we know entry isn't null
@@ -494,7 +517,7 @@
StatefulBeanContext ctx = null;
synchronized (cacheMap)
{
- ctx = (StatefulBeanContext) cacheMap.get(key);
+ ctx = cacheMap.get(key);
}
if(ctx == null)
throw new NoSuchEJBException("Could not find Stateful bean: " + key);
Modified: projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/ForcePassivationCache.java
===================================================================
--- projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/ForcePassivationCache.java 2008-10-31 12:00:10 UTC (rev 80312)
+++ projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/ForcePassivationCache.java 2008-10-31 12:45:14 UTC (rev 80313)
@@ -24,6 +24,8 @@
import java.io.Serializable;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.jboss.ejb3.cache.simple.SimpleStatefulCache;
import org.jboss.ejb3.stateful.StatefulBeanContext;
@@ -57,6 +59,8 @@
*/
private static final Object START_PASSIVATION_LOCK = new Object();
private static volatile boolean passivationForced = false;
+
+ public static final CyclicBarrier PRE_PASSIVATE_BARRIER = new CyclicBarrier(2);
// --------------------------------------------------------------------------------||
// Functional Methods -------------------------------------------------------------||
@@ -186,7 +190,7 @@
}
@Override
- public void passivationCompleted()
+ protected void passivationCompleted()
{
// Call super
super.passivationCompleted();
@@ -212,6 +216,29 @@
POST_PASSIVATE_BARRIER.reset();
}
}
+
+ @Override
+ protected void prePassivationCompleted()
+ {
+ super.prePassivationCompleted();
+
+ try
+ {
+ PRE_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+ }
+ catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException("PRE_PASSIVATE_BARRIER prematurely broken", e);
+ }
+ catch(InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch(TimeoutException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
}
\ No newline at end of file
Modified: projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/unit/PassivationDoesNotPreventNewActivityUnitTestCase.java
===================================================================
--- projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/unit/PassivationDoesNotPreventNewActivityUnitTestCase.java 2008-10-31 12:00:10 UTC (rev 80312)
+++ projects/ejb3/trunk/core/src/test/java/org/jboss/ejb3/core/test/ejbthree1549/unit/PassivationDoesNotPreventNewActivityUnitTestCase.java 2008-10-31 12:45:14 UTC (rev 80313)
@@ -114,9 +114,11 @@
ForcePassivationCache.forcePassivation();
log.info("Passivation forced, carrying out test");
+ ForcePassivationCache.PRE_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+
// Block until the PM is ready to passivate
log.info("Waiting on common barrier for PM to run...");
- BlockingPersistenceManager.BARRIER.await();
+ BlockingPersistenceManager.BARRIER.await(5, TimeUnit.SECONDS);
log.info("PM and test have met barrier, passivation running (but will be blocked to complete by test)");
Callable<Integer> task = new Callable<Integer>() {
@@ -152,6 +154,87 @@
assertEquals("the postPassivation counter should be 1 higher than the previous (during passivation)", duringPassivation + 1, postPassivation);
}
+ @Test
+ public void testInvokeSameSessionDuringPrePassivation() throws Throwable
+ {
+ final MyStatefulLocal bean = lookup(MyStatefulLocal.JNDI_NAME, MyStatefulLocal.class);
+
+ // Get our bean's Session ID
+ StatefulLocalProxyInvocationHandler handler = (StatefulLocalProxyInvocationHandler) Proxy.getInvocationHandler(bean);
+ Serializable sessionId = handler.getSessionId();
+
+ // Invoke upon our bean
+ int next = bean.getNextCounter();
+ log.info("Got counter from " + sessionId + ": " + next);
+ TestCase.assertEquals("SFSB did not return expected next counter", 0, next);
+
+ // Get the Cache
+ ForcePassivationCache cache = (ForcePassivationCache) container.getCache();
+
+ // Get the lock to block the PM, now
+ boolean gotLock = BlockingPersistenceManager.PASSIVATION_LOCK.tryLock();
+
+ Future<Integer> result;
+ // Once PM lock is acquired, everything is in "try" so we release in "finally"
+ try
+ {
+ // Ensure we got the PM lock, else fail the test
+ TestCase.assertTrue("Test was not able to immediately get the lock to block the PersistenceManager", gotLock);
+ log.info("Locked " + BlockingPersistenceManager.class.getSimpleName());
+
+ // Mark
+ cache.makeSessionEligibleForPassivation(sessionId);
+
+ /*
+ * Passivate
+ */
+
+ // Trigger Passivation
+ ForcePassivationCache.forcePassivation();
+ log.info("Passivation forced, carrying out test");
+
+ Callable<Integer> task = new Callable<Integer>() {
+ public Integer call() throws Exception
+ {
+ return bean.getNextCounter();
+ }
+ };
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ result = executor.submit(task);
+
+ // TODO: there is no way to know where we are in StatefulInstanceInterceptor
+ Thread.sleep(5000);
+
+ ForcePassivationCache.PRE_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+
+ // Block until the PM is ready to passivate
+ /* we're not passivating, we yanked it out
+ log.info("Waiting on common barrier for PM to run...");
+ BlockingPersistenceManager.BARRIER.await(5, TimeUnit.SECONDS);
+ log.info("PM and test have met barrier, passivation running (but will be blocked to complete by test)");
+ */
+ }
+ finally
+ {
+ // Allow the Persistence Manager to finish up
+ log.info("Letting the PM perform passivation...");
+ BlockingPersistenceManager.PASSIVATION_LOCK.unlock();
+ }
+
+ // We need to allow time to let the Cache finish passivation, so block until it's done
+ log.info("Waiting on Cache to tell us passivation is completed...");
+ ForcePassivationCache.POST_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+ log.info("Test sees Cache reports passivation completed.");
+
+ int duringPassivation = result.get(5, TimeUnit.SECONDS);
+ log.info("Got counter from " + sessionId + ": " + duringPassivation);
+
+ int postPassivation = bean.getNextCounter();
+ log.info("Got counter from " + sessionId + ": " + postPassivation);
+
+ assertEquals("the postPassivation counter should be 1 higher than the previous (during passivation)", duringPassivation + 1, postPassivation);
+ }
+
/**
* Tests that a new session may be created while another is being passivated
*/
@@ -313,9 +396,11 @@
ForcePassivationCache.forcePassivation();
log.info("Passivation forced, carrying out test");
+ ForcePassivationCache.PRE_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
+
// Block until the PM is ready to passivate
log.info("Waiting on common barrier for PM to run...");
- BlockingPersistenceManager.BARRIER.await();
+ BlockingPersistenceManager.BARRIER.await(5, TimeUnit.SECONDS);
log.info("PM and test have met barrier, passivation running (but will be blocked to complete by test)");
/*
@@ -365,7 +450,7 @@
// We need to allow time to let the Cache finish passivation, so block until it's done
log.info("Waiting on Cache to tell us passivation is completed...");
- ForcePassivationCache.POST_PASSIVATE_BARRIER.await();
+ ForcePassivationCache.POST_PASSIVATE_BARRIER.await(5, TimeUnit.SECONDS);
log.info("Test sees Cache reports passivation completed.");
/*
@@ -442,6 +527,7 @@
public void after()
{
ForcePassivationCache.POST_PASSIVATE_BARRIER.reset();
+ ForcePassivationCache.PRE_PASSIVATE_BARRIER.reset();
ForcePassivationCache cache = (ForcePassivationCache) container.getCache();
cache.clear();
}
More information about the jboss-cvs-commits
mailing list