[jboss-cvs] JBossAS SVN: r60622 - trunk/ejb3/src/main/org/jboss/ejb3/stateful.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Feb 18 22:57:43 EST 2007
Author: bstansberry at jboss.com
Date: 2007-02-18 22:57:43 -0500 (Sun, 18 Feb 2007)
New Revision: 60622
Modified:
trunk/ejb3/src/main/org/jboss/ejb3/stateful/ProxiedStatefulBeanContext.java
trunk/ejb3/src/main/org/jboss/ejb3/stateful/StatefulBeanContext.java
Log:
[EJBTHREE-867] Loosen coupling of lifecycle of nested SFSBs
[EJBTHREE-849] Properly handle passivation/activation callbacks for nested SFSBs
[EJBTHREE-851] Invoke passivation/activation callbacks around replication
Modified: trunk/ejb3/src/main/org/jboss/ejb3/stateful/ProxiedStatefulBeanContext.java
===================================================================
--- trunk/ejb3/src/main/org/jboss/ejb3/stateful/ProxiedStatefulBeanContext.java 2007-02-19 03:56:26 UTC (rev 60621)
+++ trunk/ejb3/src/main/org/jboss/ejb3/stateful/ProxiedStatefulBeanContext.java 2007-02-19 03:57:43 UTC (rev 60622)
@@ -35,9 +35,12 @@
import org.jboss.ejb3.interceptor.InterceptorInfo;
/**
- * Comment
+ * Proxy to a NestedStatefulBeanContext that can be independently passivated,
+ * activated and replicated without disturbing the object it is proxying.
*
* @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Brian Stansberry
+ *
* @version $Revision$
*/
public class ProxiedStatefulBeanContext extends StatefulBeanContext implements
@@ -59,6 +62,7 @@
this.delegate = delegate;
oid = delegate.getId();
containerId = delegate.getContainer().getObjectName().getCanonicalName();
+ parentRef = new StatefulBeanContextReference(delegate.getContainedIn());
}
public ProxiedStatefulBeanContext()
@@ -83,6 +87,11 @@
}
if (delegate == null)
throw new RuntimeException("Failed to read delegate");
+
+ // If we just read our delegate, it's possible there's been a
+ // failover and a remote node still has a ref to a stale delegate.
+ // So, we should be replicated to invalidate the remote node.
+ this.markedForReplication = true;
}
return delegate;
}
@@ -91,11 +100,6 @@
{
out.writeObject(oid);
out.writeUTF(containerId);
- if(parentRef == null)
- {
- parentRef = new StatefulBeanContextReference(getDelegate()
- .getContainedIn());
- }
out.writeObject(parentRef);
}
@@ -107,158 +111,317 @@
parentRef = (StatefulBeanContextReference) in.readObject();
}
- /**
- * Ignores the call, as passivation of this proxy context
- * does not affect the underlying bean (which is passivated
- * along with its parent context).
- */
- public void prePassivate()
- {
- }
-
- /**
- * Ignores the call, as activation of this proxy context
- * does not affect the underlying bean (which is activated
- * along with its parent context).
- */
- public void postActivate()
- {
- }
-
+ @Override
public List<StatefulBeanContext> getContains()
{
return getDelegate().getContains();
}
+ @Override
public EntityManager getExtendedPersistenceContext(String id)
{
return getDelegate().getExtendedPersistenceContext(id);
}
+ @Override
public void addExtendedPersistenceContext(String id, EntityManager pc)
{
getDelegate().addExtendedPersistenceContext(id, pc);
}
+ @Override
public Map<String, EntityManager> getExtendedPersistenceContexts()
{
return getDelegate().getExtendedPersistenceContexts();
}
+ @Override
+ public void removeExtendedPersistenceContext(String id)
+ {
+ getDelegate().removeExtendedPersistenceContext(id);
+ }
+
+ @Override
+ public boolean scanForExtendedPersistenceContext(String id, StatefulBeanContext ignore)
+ {
+ return getDelegate().scanForExtendedPersistenceContext(id, ignore);
+ }
+
+ @Override
public StatefulBeanContext getContainedIn()
{
return getDelegate().getContainedIn();
}
+ @Override
public void addContains(StatefulBeanContext ctx)
{
getDelegate().addContains(ctx);
}
+ @Override
+ public void removeContains(StatefulBeanContext ctx)
+ {
+ getDelegate().removeContains(ctx);
+ }
+
+ @Override
public StatefulBeanContext pushContainedIn()
{
return getDelegate().pushContainedIn();
}
+ @Override
public void popContainedIn()
{
getDelegate().popContainedIn();
}
+ @Override
+ public StatefulBeanContext getUltimateContainedIn()
+ {
+ return getDelegate().getUltimateContainedIn();
+ }
+
+ @Override
+ public boolean isInUse()
+ {
+ // Don't call delegate
+ return super.isInUse();
+ }
+
+ @Override
+ public void setInUse(boolean inUse)
+ {
+ super.setInUse(inUse);
+ // delegate needs to know this for getCanPassivate()
+ getDelegate().setInUse(inUse);
+
+ if (!inUse)
+ {
+ // drop ref to delegate, as the delegate can be passivated/activated
+ // without our knowledge, and if that happens we have a ref to a
+ // stale delegate.
+ delegate = null;
+ }
+ }
+
+ @Override
public boolean isDiscarded()
{
return getDelegate().isDiscarded();
}
+ @Override
public void setDiscarded(boolean discarded)
{
getDelegate().setDiscarded(discarded);
}
+ @Override
+ public boolean isRemoved()
+ {
+ return getDelegate().isRemoved();
+ }
+
+ @Override
public ReentrantLock getLock()
{
return getDelegate().getLock();
}
+ @Override
public boolean isInInvocation()
{
return getDelegate().isInInvocation();
}
+ @Override
public void setInInvocation(boolean inInvocation)
{
getDelegate().setInInvocation(inInvocation);
}
+ @Override
public Object getId()
{
return getDelegate().getId();
}
+ @Override
public void setId(Object id)
{
this.oid = id;
getDelegate().setId(id);
}
+ @Override
public boolean isTxSynchronized()
{
return getDelegate().isTxSynchronized();
}
+ @Override
public void setTxSynchronized(boolean txSynchronized)
{
getDelegate().setTxSynchronized(txSynchronized);
}
+ @Override
public void remove()
{
getDelegate().remove();
}
+ @Override
public void setContainer(Container container)
{
getDelegate().setContainer(container);
}
+ @Override
public Container getContainer()
{
return getDelegate().getContainer();
}
+ @Override
public Object getInstance()
{
return getDelegate().getInstance();
}
+ @Override
public SimpleMetaData getMetaData()
{
return getDelegate().getMetaData();
}
+ @Override
public Object[] getInterceptorInstances(InterceptorInfo[] interceptorInfos)
{
return getDelegate().getInterceptorInstances(interceptorInfos);
}
+ @Override
public void extractBeanAndInterceptors()
{
getDelegate().extractBeanAndInterceptors();
}
+ @Override
public void setInstance(Object instance)
{
getDelegate().setInstance(instance);
}
+ @Override
public void initialiseInterceptorInstances()
{
getDelegate().initialiseInterceptorInstances();
}
+ @Override
public EJBContext getEJBContext()
{
return getDelegate().getEJBContext();
}
+ /**
+ * Ignores the call, as passivation of this proxy context
+ * does not affect the underlying bean (which is passivated
+ * along with its parent context).
+ */
+ @Override
+ public void prePassivate()
+ {
+ }
+
+ /**
+ * Ignores the call, as activation of this proxy context
+ * does not affect the underlying bean (which is activated
+ * along with its parent context).
+ */
+ @Override
+ public void postActivate()
+ {
+ }
+
+ /**
+ * Ignores the call, as passivation of this proxy context
+ * does not affect the underlying bean (which is passivated
+ * along with its parent context).
+ */
+ @Override
+ public void passivateAfterReplication()
+ {
+ // ignore
+ }
+
+ /**
+ * Ignores the call, as activation of this proxy context
+ * does not affect the underlying bean (which is activated
+ * along with its parent context).
+ */
+ @Override
+ public void activateAfterReplication()
+ {
+ // ignore
+ }
+
+ @Override
+ public boolean getCanPassivate()
+ {
+ if (delegate == null)
+ {
+ // If we haven't deserialized our delegate, we're not in use
+ // on this node
+ return true;
+ }
+ // Just check if *we* are in use; whether any children are
+ // in use doesn't matter, since passivating this proxy
+ // doesn't affect children
+ return (isInUse() == false);
+ }
+
+ @Override
+ public boolean getCanRemoveFromCache()
+ {
+ return getDelegate().getCanRemoveFromCache();
+ }
+
+ @Override
+ public boolean getReplicationIsPassivation()
+ {
+ return getDelegate().getReplicationIsPassivation();
+ }
+
+ @Override
+ public void setReplicationIsPassivation(boolean replicationIsPassivation)
+ {
+ getDelegate().setReplicationIsPassivation(replicationIsPassivation);
+ }
+
+ /**
+ * Ignores the call, as replication of this proxy context
+ * does not affect the underlying bean (which is replicated
+ * along with its parent context).
+ */
+ @Override
+ public void preReplicate()
+ {
+ // ignore
+ }
+
+ /**
+ * Ignores the call, as replication of this proxy context
+ * does not affect the underlying bean (which is replicated
+ * along with its parent context).
+ */
+ @Override
+ public void postReplicate()
+ {
+ // ignore
+ }
+
}
Modified: trunk/ejb3/src/main/org/jboss/ejb3/stateful/StatefulBeanContext.java
===================================================================
--- trunk/ejb3/src/main/org/jboss/ejb3/stateful/StatefulBeanContext.java 2007-02-19 03:56:26 UTC (rev 60621)
+++ trunk/ejb3/src/main/org/jboss/ejb3/stateful/StatefulBeanContext.java 2007-02-19 03:57:43 UTC (rev 60622)
@@ -21,33 +21,38 @@
*/
package org.jboss.ejb3.stateful;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.persistence.EntityManager;
+import javax.transaction.Synchronization;
+import javax.transaction.Transaction;
+
import org.jboss.aop.metadata.SimpleMetaData;
import org.jboss.ejb3.BaseContext;
import org.jboss.ejb3.Container;
import org.jboss.ejb3.Ejb3Registry;
import org.jboss.ejb3.ThreadLocalStack;
+import org.jboss.ejb3.cache.StatefulCache;
import org.jboss.ejb3.interceptor.InterceptorInfo;
import org.jboss.ejb3.tx.TxUtil;
import org.jboss.serial.io.MarshalledObject;
import org.jboss.tm.TxUtils;
-import javax.persistence.EntityManager;
-import javax.transaction.Synchronization;
-import javax.transaction.Transaction;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
/**
- * Comment
+ * BeanContext for a stateful session bean.
*
* @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Brian Stansberry
+ *
* @version $Revision$
*/
public class StatefulBeanContext extends BaseContext implements Externalizable
@@ -67,7 +72,7 @@
protected boolean discarded;
- // this two are needed for propagated extended persistence contexts when one
+ // these two are needed for propagated extended persistence contexts when one
// SFSB injects another.
public static ThreadLocalStack<StatefulBeanContext> propagatedContainedIn = new ThreadLocalStack<StatefulBeanContext>();
@@ -82,9 +87,12 @@
protected boolean removed;
protected String containerName;
+
+ protected boolean replicationIsPassivation = true;
+
+ protected transient boolean passivated = false;
public StatefulBeanContext()
-
{
}
@@ -95,6 +103,28 @@
extractBeanAndInterceptors();
return contains;
}
+
+ /**
+ * Makes a copy of the contains list so nested callback iterators
+ * can iterate over it without concern that another thread will
+ * remove the context.
+ *
+ * TODO replace contains list with a concurrent collection
+ */
+ private List<StatefulBeanContext> getThreadSafeContains()
+ {
+ // Call getContains() to ensure unmarshalling
+ List<StatefulBeanContext> orig = getContains();
+ List<StatefulBeanContext> copy = null;
+ if (orig != null)
+ {
+ synchronized (orig)
+ {
+ copy = new ArrayList<StatefulBeanContext>(orig);
+ }
+ }
+ return copy;
+ }
public EntityManager getExtendedPersistenceContext(String id)
{
@@ -122,13 +152,63 @@
}
extendedPCS.put(id, pc);
}
+
+ public boolean scanForExtendedPersistenceContext(String id, StatefulBeanContext ignore)
+ {
+ if (this.equals(ignore))
+ return false;
+
+ if (!removed)
+ {
+ Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
+ if (extendedPCS != null && extendedPCS.containsKey(id))
+ return true;
+ }
+
+ if (getContains() != null)
+ {
+ synchronized (contains)
+ {
+ for (StatefulBeanContext contained : contains)
+ {
+ if (!contained.equals(ignore))
+ {
+ if (contained.scanForExtendedPersistenceContext(id, ignore))
+ return true;
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public void removeExtendedPersistenceContext(String id)
+ {
+ Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
+ if (extendedPCS != null)
+ {
+ extendedPCS.remove(id);
+ }
+
+ if (getContains() != null)
+ {
+ synchronized (contains)
+ {
+ for (StatefulBeanContext contained: contains)
+ {
+ contained.removeExtendedPersistenceContext(id);
+ }
+ }
+ }
+ }
public Map<String, EntityManager> getExtendedPersistenceContexts()
{
if (persistenceContexts == null)
{
if (bean == null)
- getInstance(); // unmarshall
+ extractBeanAndInterceptors(); // unmarshall
}
return persistenceContexts;
}
@@ -138,13 +218,73 @@
return containedIn;
}
+ public StatefulBeanContext getUltimateContainedIn()
+ {
+ StatefulBeanContext child = this;
+ StatefulBeanContext parent = containedIn;
+
+ while (parent != null)
+ {
+ child = parent;
+ parent = parent.getContainedIn();
+ }
+
+ if (parent == null && this != child)
+ {
+ // Don't hand out a ref to our parent obtained by walking the
+ // tree. Rather, get it from its cache. This gives the cache
+ // a chance to activate it if it hasn't been. We don't want
+ // to mark the parent as in use though.
+ StatefulCache ultimateCache = ((StatefulContainer)child.getContainer()).getCache();
+ child = ultimateCache.get(child.getId(), false);
+ }
+
+ return child;
+ }
+
public void addContains(StatefulBeanContext ctx)
{
- if (contains == null)
+ if (getContains() == null)
contains = new ArrayList<StatefulBeanContext>();
- contains.add(ctx);
- ctx.containedIn = this;
+
+ synchronized (contains)
+ {
+ contains.add(ctx);
+ ctx.containedIn = this;
+ }
}
+
+ public void removeContains(StatefulBeanContext ctx)
+ {
+ if (getContains() != null) // call getContains() to ensure unmarshalling
+ {
+ // Need to be thread safe
+ synchronized (contains)
+ {
+ if (contains.remove(ctx))
+ {
+ ctx.containedIn = null;
+ }
+ }
+
+ if (removed)
+ {
+ // Close out any XPCs that are no longer referenced
+ cleanExtendedPCs();
+ }
+
+ if (getCanRemoveFromCache())
+ {
+ if (containedIn != null)
+ {
+ containedIn.removeContains(this);
+ }
+
+ // Notify our cache to remove us as we no longer have children
+ ((StatefulContainer) getContainer()).getCache().remove(getId());
+ }
+ }
+ }
public StatefulBeanContext pushContainedIn()
{
@@ -162,53 +302,216 @@
// cache. If placed in the cache, it could be independently passivated,
// activated and replicated, again breaking object references due to
// independent marshalling. Instead, we return a proxy to it that will
- // be stored in its container's cache
+ // be stored in its container's cache
containedIn = propagatedContainedIn.get();
NestedStatefulBeanContext nested = new NestedStatefulBeanContext();
nested.id = id;
nested.container = getContainer();
nested.containerName = containerName;
nested.bean = bean;
+ nested.replicationIsPassivation = replicationIsPassivation;
containedIn.addContains(nested);
thisPtr = new ProxiedStatefulBeanContext(nested);
}
propagatedContainedIn.push(thisPtr);
return thisPtr;
}
+
+ /**
+ * Checks whether this context or any of its children are in use.
+ */
+ public boolean getCanPassivate()
+ {
+ boolean canPassivate = (removed || !inUse);
+
+ if (canPassivate && getContains() != null)
+ {
+ synchronized (contains)
+ {
+ for (StatefulBeanContext contained : contains)
+ {
+ if (!contained.getCanPassivate())
+ {
+ canPassivate = false;
+ break;
+ }
+ }
+ }
+ }
+
+ return canPassivate;
+ }
+ /**
+ * Notification from a non-clustered StatefulCache to inform
+ * that we are about to be passivated.
+ */
public void prePassivate()
{
- getContainer().invokePrePassivate(this);
+ if (!removed && !passivated)
+ {
+ getContainer().invokePrePassivate(this);
+ passivated = true;
+ }
// Pass the call on to any nested children
- if (contains != null)
+ List<StatefulBeanContext> children = getThreadSafeContains();
+ if (children != null)
{
- for (StatefulBeanContext contained : contains)
+ for (StatefulBeanContext contained : children)
{
contained.prePassivate();
}
}
}
+ /**
+ * Notification from a non-clustered StatefulCache to inform
+ * that we have been activated.
+ */
public void postActivate()
{
- getContainer().invokePostActivate(this); // handled in getInstance()
+ if (!removed && passivated)
+ {
+ getContainer().invokePostActivate(this);
+ passivated = false;
+ }
// Pass the call on to any nested children
- if (contains != null)
- {
- for (StatefulBeanContext contained : contains)
+ List<StatefulBeanContext> children = getThreadSafeContains();
+ if (children != null)
+ {
+ for (StatefulBeanContext contained : children)
{
contained.postActivate();
}
}
}
+
+ /**
+ * Notification from a ClusteredStatefulCache to inform
+ * that a bean that is stored in the distributed cache is now
+ * being passivated as well. Something of a misnomer
+ * as it is possible the bean wasn't replicated (if it implements
+ * Optimized it may have been activated and then a reference left
+ * in the cache without the bean ever being replicated).
+ */
+ public void passivateAfterReplication()
+ {
+ if (!removed && !passivated)
+ {
+ getInstance(); // make sure we're unmarshalled
+ getContainer().invokePrePassivate(this);
+ passivated = true;
+ }
+
+ // Pass the call on to any nested children
+ List<StatefulBeanContext> children = getThreadSafeContains();
+ if (children != null)
+ {
+ for (StatefulBeanContext contained : children)
+ {
+ contained.passivateAfterReplication();
+ }
+ }
+ }
+
+ public void activateAfterReplication()
+ {
+ if (!removed && passivated)
+ {
+ getInstance(); // make sure we're unmarshalled
+ getContainer().invokePostActivate(this);
+ passivated = false;
+ }
+
+ // Pass the call on to any nested children
+ List<StatefulBeanContext> children = getThreadSafeContains();
+ if (children != null)
+ {
+ for (StatefulBeanContext contained : children)
+ {
+ contained.activateAfterReplication();
+ }
+ }
+ }
+ public boolean getReplicationIsPassivation()
+ {
+ return replicationIsPassivation;
+ }
+
+ public void setReplicationIsPassivation(boolean replicationIsPassivation)
+ {
+ this.replicationIsPassivation = replicationIsPassivation;
+ }
+
+ /**
+ * Notification from a ClusteredStatefulCache before a bean is
+ * replicated.
+ */
+ public void preReplicate()
+ {
+ if (!removed && replicationIsPassivation && !passivated)
+ {
+ getContainer().invokePrePassivate(this);
+ passivated = true;
+ }
+
+ // Pass the call on to any nested children
+ List<StatefulBeanContext> children = getThreadSafeContains();
+ if (children != null)
+ {
+ for (StatefulBeanContext contained : children)
+ {
+ contained.preReplicate();
+ }
+ }
+ }
+
+ /**
+ * Notification from a ClusteredStatefulCache after the bean
+ * is fetched from the distributed cache. Something of a misnomer
+ * as it is possible the bean wasn't replicated (if it implements
+ * Optimized it can be fetched from the cache twice without ever
+ * being replicated).
+ */
+ public void postReplicate()
+ {
+ // We may not have been replicated, so only invoke @PostActivate
+ // if we are marked as passivated
+ if (!removed && passivated)
+ {
+ getContainer().invokePostActivate(this);
+ passivated = false;
+ }
+
+ // Pass the call on to any nested children
+ List<StatefulBeanContext> children = getThreadSafeContains();
+ if (children != null)
+ {
+ for (StatefulBeanContext contained : children)
+ {
+ contained.postReplicate();
+ }
+ }
+ }
+
public void popContainedIn()
{
propagatedContainedIn.pop();
}
+ public boolean isInUse()
+ {
+ return inUse;
+ }
+
+ public void setInUse(boolean inUse)
+ {
+ this.inUse = inUse;
+ }
+
public boolean isDiscarded()
{
return discarded;
@@ -265,23 +568,61 @@
return;
removed = true;
RuntimeException exceptionThrown = null;
- if (contains != null)
+
+ // Close any XPCs that haven't been injected into live
+ // beans in our family
+ try
{
- for (StatefulBeanContext contained : contains)
+ cleanExtendedPCs();
+ }
+ catch (RuntimeException e)
+ {
+ // we still need to remove ourself from any parent, so save
+ // the thrown exception and rethrow it after we have cleaned up.
+ if (exceptionThrown == null)
+ exceptionThrown = e;
+ }
+
+ if (containedIn != null && getCanRemoveFromCache())
+ {
+ try
{
- try
- {
- ((StatefulContainer) contained.getContainer()).getCache().remove(
- contained.getId());
- }
- catch (RuntimeException e)
- {
- // we still need to remove every contained SFSB
- // save the thrown exception and rethrow it after we have cleaned up.
+ containedIn.removeContains(this);
+ }
+ catch (RuntimeException e)
+ {
+ // we still need to clean internal state, so save the
+ // thrown exception and rethrow it after we have cleaned up.
+ if (exceptionThrown == null)
exceptionThrown = e;
- }
}
}
+
+ // Clear out refs to our bean and interceptors, to reduce our footprint
+ // in case we are still cached for our refs to any XPCs
+ bean = null;
+ interceptorInstances = null;
+
+ if (exceptionThrown != null) throw new RuntimeException("exception thrown while removing SFSB", exceptionThrown);
+ }
+
+ public boolean getCanRemoveFromCache()
+ {
+ boolean canRemove = removed;
+
+ if (canRemove && getContains() != null) // call getContains() to ensure unmarshalling
+ {
+ synchronized (contains)
+ {
+ canRemove = (contains.size() == 0);
+ }
+ }
+
+ return canRemove;
+ }
+
+ private void cleanExtendedPCs()
+ {
try
{
Transaction tx = TxUtil.getTransactionManager().getTransaction();
@@ -292,7 +633,7 @@
public void beforeCompletion()
{
}
-
+
public void afterCompletion(int status)
{
closeExtendedPCs();
@@ -304,11 +645,14 @@
closeExtendedPCs();
}
}
+ catch (RuntimeException e)
+ {
+ throw e;
+ }
catch (Exception e)
{
- throw new RuntimeException("Exception thrown while removing SFSB", e);
+ throw new RuntimeException("Error cleaning PersistenceContexts in SFSB removal", e);
}
- if (exceptionThrown != null) throw new RuntimeException("exception thrown while removing SFSB", exceptionThrown);
}
private void closeExtendedPCs()
@@ -317,18 +661,58 @@
if (extendedPCS != null)
{
RuntimeException exceptionThrown = null;
- for (EntityManager pc : extendedPCS.values())
+
+ List<String> closedXPCs = new ArrayList<String>();
+ StatefulBeanContext topCtx = getUltimateContainedIn();
+
+ for (Iterator<Map.Entry<String,EntityManager>> iter = extendedPCS.entrySet().iterator();
+ iter.hasNext();)
{
- try
+ Map.Entry<String,EntityManager> entry = iter.next();
+ String id = entry.getKey();
+ EntityManager xpc = entry.getValue();
+
+ // Only close the XPC if our live parent(s) or cousins
+ // don't also have a ref to it
+ boolean canClose = topCtx.scanForExtendedPersistenceContext(id, this);
+
+ if (canClose && getContains() != null)
{
- pc.close();
+ // Only close the XPC if our live childrenScan don't have a ref
+ synchronized (contains)
+ {
+ for (StatefulBeanContext contained : contains)
+ {
+ if (contained.scanForExtendedPersistenceContext(id, null))
+ {
+ canClose = false;
+ break;
+ }
+ }
+ }
}
- catch (RuntimeException e)
+
+ if (canClose)
{
- exceptionThrown = e;
+ try
+ {
+ xpc.close();
+ closedXPCs.add(id);
+ }
+ catch (RuntimeException e)
+ {
+ exceptionThrown = e;
+ }
}
}
- if (exceptionThrown != null) throw new RuntimeException("Error cleaning up PersistenceContexts in SFSB removal", exceptionThrown);
+
+ // Clean all refs to the closed XPCs from the tree
+ for (String id : closedXPCs)
+ {
+ topCtx.removeExtendedPersistenceContext(id);
+ }
+
+ if (exceptionThrown != null) throw new RuntimeException("Error closing PersistenceContexts in SFSB removal", exceptionThrown);
}
}
@@ -353,7 +737,6 @@
if (bean == null)
{
extractBeanAndInterceptors();
- // getContainer().invokePostActivate(this);
}
return bean;
}
@@ -366,9 +749,14 @@
// these are public for fast concurrent access/update
public volatile boolean markedForPassivation = false;
+
+ public volatile boolean markedForReplication = false;
+
+ // BES 2007/02/16 make private and use a getter/setter as
+ // ProxiedStatefulBeanContext needs to pass the value on
+ // to its NestedStatefulBeanContext
+ private volatile boolean inUse = false;
- public volatile boolean inUse = false;
-
public volatile long lastUsed = System.currentTimeMillis();
@Override
@@ -383,6 +771,9 @@
protected void extractBeanAndInterceptors()
{
+ if (beanMO == null)
+ return;
+
try
{
Object[] beanAndInterceptors = (Object[]) beanMO.get();
@@ -398,15 +789,19 @@
}
}
contains = (List<StatefulBeanContext>) beanAndInterceptors[3];
-/* We should let pm to handle this.
+ // Reestablish links to our children; if they serialize a link
+ // to us for some reason serialization blows up
if (contains != null)
{
- for (StatefulBeanContext ctx : contains)
+ for (StatefulBeanContext contained : contains)
{
- ctx.getContainer().invokePostActivate(ctx);
+ contained.containedIn = this;
}
}
-*/
+
+ // Don't hold onto the beanMo, as its contents are mutable
+ // and we don't want to serialize a stale version of them
+ beanMO = null;
}
catch (IOException e)
{
@@ -422,13 +817,10 @@
{
out.writeUTF(containerName);
out.writeObject(id);
- out.writeObject(metadata);
- if (bean == null)
+ out.writeObject(metadata);
+
+ if (beanMO == null)
{
- out.writeObject(beanMO);
- }
- else
- {
Object[] beanAndInterceptors = new Object[4];
beanAndInterceptors[0] = bean;
beanAndInterceptors[1] = persistenceContexts;
@@ -439,18 +831,27 @@
beanAndInterceptors[2] = list;
}
beanAndInterceptors[3] = contains;
-/* Since replication also uses this. We c'ant call this directly. Let pm handle this.
- if (contains != null)
- {
- for (StatefulBeanContext ctx : contains)
- {
- ctx.prePassivate();
- }
- }
-*/
- beanMO = new MarshalledObject(beanAndInterceptors);
+
+ // BES 2007/02/12 Previously we were trying to hold a ref to
+ // beanMO after we created it, but that exposes the risk of
+ // two different versions of the constituent state that
+ // can fall out of sync. So now we just write a local variable.
+
+ MarshalledObject mo = new MarshalledObject(beanAndInterceptors);
+ out.writeObject(mo);
+ }
+ else
+ {
+ // We've been deserialized and are now being re-serialized, but
+ // extractBeanAndInterceptors hasn't been called in between.
+ // This can happen if a passivated session is involved in a
+ // JBoss Cache state transfer to a newly deployed node.
out.writeObject(beanMO);
}
+
+
+ out.writeBoolean(removed);
+ out.writeBoolean(replicationIsPassivation);
}
public void readExternal(ObjectInput in) throws IOException,
@@ -460,6 +861,11 @@
id = in.readObject();
metadata = (SimpleMetaData) in.readObject();
beanMO = (MarshalledObject) in.readObject();
+ removed = in.readBoolean();
+ replicationIsPassivation = in.readBoolean();
+
+ // If we've just been deserialized, we are passivated
+ passivated = true;
}
public Object getInvokedMethodKey()
More information about the jboss-cvs-commits
mailing list