[jboss-cvs] JBossAS SVN: r60622 - trunk/ejb3/src/main/org/jboss/ejb3/stateful.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Feb 18 22:57:43 EST 2007


Author: bstansberry at jboss.com
Date: 2007-02-18 22:57:43 -0500 (Sun, 18 Feb 2007)
New Revision: 60622

Modified:
   trunk/ejb3/src/main/org/jboss/ejb3/stateful/ProxiedStatefulBeanContext.java
   trunk/ejb3/src/main/org/jboss/ejb3/stateful/StatefulBeanContext.java
Log:
[EJBTHREE-867] Loosen coupling of lifecycle of nested SFSBs
[EJBTHREE-849] Properly handle passivation/activation callbacks for nested SFSBs
[EJBTHREE-851] Invoke passivation/activation callbacks around replication

Modified: trunk/ejb3/src/main/org/jboss/ejb3/stateful/ProxiedStatefulBeanContext.java
===================================================================
--- trunk/ejb3/src/main/org/jboss/ejb3/stateful/ProxiedStatefulBeanContext.java	2007-02-19 03:56:26 UTC (rev 60621)
+++ trunk/ejb3/src/main/org/jboss/ejb3/stateful/ProxiedStatefulBeanContext.java	2007-02-19 03:57:43 UTC (rev 60622)
@@ -35,9 +35,12 @@
 import org.jboss.ejb3.interceptor.InterceptorInfo;
 
 /**
- * Comment
+ * Proxy to a NestedStatefulBeanContext that can be independently passivated,
+ * activated and replicated without disturbing the object it is proxying.
  * 
  * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Brian Stansberry
+ * 
  * @version $Revision$
  */
 public class ProxiedStatefulBeanContext extends StatefulBeanContext implements
@@ -59,6 +62,7 @@
       this.delegate = delegate;
       oid = delegate.getId();
       containerId = delegate.getContainer().getObjectName().getCanonicalName();
+      parentRef = new StatefulBeanContextReference(delegate.getContainedIn());
    }
 
    public ProxiedStatefulBeanContext()
@@ -83,6 +87,11 @@
          }
          if (delegate == null)
             throw new RuntimeException("Failed to read delegate");
+         
+         // If we just read our delegate, it's possible there's been a 
+         // failover and a remote node still has a ref to a stale delegate.
+         // So, we should be replicated to invalidate the remote node.
+         this.markedForReplication = true;
       }
       return delegate;
    }
@@ -91,11 +100,6 @@
    {
       out.writeObject(oid);
       out.writeUTF(containerId);
-      if(parentRef == null)
-      {
-         parentRef = new StatefulBeanContextReference(getDelegate()
-            .getContainedIn());
-      }
       out.writeObject(parentRef);
    }
 
@@ -107,158 +111,317 @@
       parentRef = (StatefulBeanContextReference) in.readObject();
    }
 
-   /**
-    *  Ignores the call, as passivation of this proxy context
-    *  does not affect the underlying bean (which is passivated
-    *  along with its parent context).
-    */
-   public void prePassivate()
-   {
-   }
-
-   /**
-    *  Ignores the call, as activation of this proxy context
-    *  does not affect the underlying bean (which is activated
-    *  along with its parent context).
-    */
-   public void postActivate()
-   {
-   }
-
+   @Override
    public List<StatefulBeanContext> getContains()
    {
       return getDelegate().getContains();
    }
 
+   @Override
    public EntityManager getExtendedPersistenceContext(String id)
    {
       return getDelegate().getExtendedPersistenceContext(id);
    }
 
+   @Override
    public void addExtendedPersistenceContext(String id, EntityManager pc)
    {
       getDelegate().addExtendedPersistenceContext(id, pc);
    }
 
+   @Override
    public Map<String, EntityManager> getExtendedPersistenceContexts()
    {
       return getDelegate().getExtendedPersistenceContexts();
    }
 
+   @Override
+   public void removeExtendedPersistenceContext(String id)
+   {
+      getDelegate().removeExtendedPersistenceContext(id);
+   }
+
+   @Override
+   public boolean scanForExtendedPersistenceContext(String id, StatefulBeanContext ignore)
+   {
+      return getDelegate().scanForExtendedPersistenceContext(id, ignore);
+   }
+
+   @Override
    public StatefulBeanContext getContainedIn()
    {
       return getDelegate().getContainedIn();
    }
 
+   @Override
    public void addContains(StatefulBeanContext ctx)
    {
       getDelegate().addContains(ctx);
    }
 
+   @Override
+   public void removeContains(StatefulBeanContext ctx)
+   {
+      getDelegate().removeContains(ctx);
+   }
+
+   @Override
    public StatefulBeanContext pushContainedIn()
    {
       return getDelegate().pushContainedIn();
    }
 
+   @Override
    public void popContainedIn()
    {
       getDelegate().popContainedIn();
    }
 
+   @Override
+   public StatefulBeanContext getUltimateContainedIn()
+   {
+      return getDelegate().getUltimateContainedIn();
+   }
+
+   @Override
+   public boolean isInUse()
+   {
+      // Don't call delegate
+      return super.isInUse();
+   }
+
+   @Override
+   public void setInUse(boolean inUse)
+   {
+      super.setInUse(inUse);
+      // delegate needs to know this for getCanPassivate()
+      getDelegate().setInUse(inUse);
+      
+      if (!inUse)
+      {
+         // drop ref to delegate, as the delegate can be passivated/activated
+         // without our knowledge, and if that happens we have a ref to a
+         // stale delegate.
+         delegate = null;
+      }
+   }
+
+   @Override
    public boolean isDiscarded()
    {
       return getDelegate().isDiscarded();
    }
 
+   @Override
    public void setDiscarded(boolean discarded)
    {
       getDelegate().setDiscarded(discarded);
    }
 
+   @Override
+   public boolean isRemoved()
+   {
+      return getDelegate().isRemoved();
+   }
+
+   @Override
    public ReentrantLock getLock()
    {
       return getDelegate().getLock();
    }
 
+   @Override
    public boolean isInInvocation()
    {
       return getDelegate().isInInvocation();
    }
 
+   @Override
    public void setInInvocation(boolean inInvocation)
    {
       getDelegate().setInInvocation(inInvocation);
    }
 
+   @Override
    public Object getId()
    {
       return getDelegate().getId();
    }
 
+   @Override
    public void setId(Object id)
    {
       this.oid = id;
       getDelegate().setId(id);
    }
 
+   @Override
    public boolean isTxSynchronized()
    {
       return getDelegate().isTxSynchronized();
    }
 
+   @Override
    public void setTxSynchronized(boolean txSynchronized)
    {
       getDelegate().setTxSynchronized(txSynchronized);
    }
 
+   @Override
    public void remove()
    {
       getDelegate().remove();
    }
 
+   @Override
    public void setContainer(Container container)
    {
       getDelegate().setContainer(container);
    }
 
+   @Override
    public Container getContainer()
    {
       return getDelegate().getContainer();
    }
 
+   @Override
    public Object getInstance()
    {
       return getDelegate().getInstance();
    }
 
+   @Override
    public SimpleMetaData getMetaData()
    {
       return getDelegate().getMetaData();
    }
 
+   @Override
    public Object[] getInterceptorInstances(InterceptorInfo[] interceptorInfos)
    {
       return getDelegate().getInterceptorInstances(interceptorInfos);
    }
 
+   @Override
    public void extractBeanAndInterceptors()
    {
       getDelegate().extractBeanAndInterceptors();
    }
 
+   @Override
    public void setInstance(Object instance)
    {
       getDelegate().setInstance(instance);
    }
 
+   @Override
    public void initialiseInterceptorInstances()
    {
       getDelegate().initialiseInterceptorInstances();
    }
 
+   @Override
    public EJBContext getEJBContext()
    {
       return getDelegate().getEJBContext();
    }
 
+   /**
+    *  Ignores the call, as passivation of this proxy context
+    *  does not affect the underlying bean (which is passivated
+    *  along with its parent context).
+    */
+   @Override
+   public void prePassivate()
+   {
+   }
+
+   /**
+    *  Ignores the call, as activation of this proxy context
+    *  does not affect the underlying bean (which is activated
+    *  along with its parent context).
+    */
+   @Override
+   public void postActivate()
+   {
+   }
+
+   /**
+    *  Ignores the call, as passivation of this proxy context
+    *  does not affect the underlying bean (which is passivated
+    *  along with its parent context).
+    */
+   @Override
+   public void passivateAfterReplication()
+   {
+      // ignore
+   }
+
+   /**
+    *  Ignores the call, as activation of this proxy context
+    *  does not affect the underlying bean (which is activated
+    *  along with its parent context).
+    */
+   @Override
+   public void activateAfterReplication()
+   {
+      // ignore
+   }
+
+   @Override
+   public boolean getCanPassivate()
+   {
+      if (delegate == null)
+      {
+         // If we haven't deserialized our delegate, we're not in use
+         // on this node
+         return true;
+      }
+      // Just check if *we* are in use; whether any children are
+      // in use doesn't matter, since passivating this proxy
+      // doesn't affect children
+      return (isInUse() == false);
+   }
+
+   @Override
+   public boolean getCanRemoveFromCache()
+   {
+      return getDelegate().getCanRemoveFromCache();
+   }
+
+   @Override
+   public boolean getReplicationIsPassivation()
+   {
+      return getDelegate().getReplicationIsPassivation();
+   }
+
+   @Override
+   public void setReplicationIsPassivation(boolean replicationIsPassivation)
+   {
+      getDelegate().setReplicationIsPassivation(replicationIsPassivation);
+   }
+
+   /**
+    * Ignores the call, as replication of this proxy context
+    * does not affect the underlying bean (which is replicated
+    * along with its parent context).
+    */
+   @Override
+   public void preReplicate()
+   {
+      // ignore
+   }
+
+   /**
+    * Ignores the call, as replication of this proxy context
+    * does not affect the underlying bean (which is replicated
+    * along with its parent context).
+    */
+   @Override
+   public void postReplicate()
+   {
+      // ignore
+   }
+
 }

Modified: trunk/ejb3/src/main/org/jboss/ejb3/stateful/StatefulBeanContext.java
===================================================================
--- trunk/ejb3/src/main/org/jboss/ejb3/stateful/StatefulBeanContext.java	2007-02-19 03:56:26 UTC (rev 60621)
+++ trunk/ejb3/src/main/org/jboss/ejb3/stateful/StatefulBeanContext.java	2007-02-19 03:57:43 UTC (rev 60622)
@@ -21,33 +21,38 @@
  */
 package org.jboss.ejb3.stateful;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.persistence.EntityManager;
+import javax.transaction.Synchronization;
+import javax.transaction.Transaction;
+
 import org.jboss.aop.metadata.SimpleMetaData;
 import org.jboss.ejb3.BaseContext;
 import org.jboss.ejb3.Container;
 import org.jboss.ejb3.Ejb3Registry;
 import org.jboss.ejb3.ThreadLocalStack;
+import org.jboss.ejb3.cache.StatefulCache;
 import org.jboss.ejb3.interceptor.InterceptorInfo;
 import org.jboss.ejb3.tx.TxUtil;
 import org.jboss.serial.io.MarshalledObject;
 import org.jboss.tm.TxUtils;
 
-import javax.persistence.EntityManager;
-import javax.transaction.Synchronization;
-import javax.transaction.Transaction;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
 /**
- * Comment
+ * BeanContext for a stateful session bean.
  *
  * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Brian Stansberry
+ * 
  * @version $Revision$
  */
 public class StatefulBeanContext extends BaseContext implements Externalizable
@@ -67,7 +72,7 @@
 
    protected boolean discarded;
 
-   // this two are needed for propagated extended persistence contexts when one
+   // these two are needed for propagated extended persistence contexts when one
    // SFSB injects another.
    public static ThreadLocalStack<StatefulBeanContext> propagatedContainedIn = new ThreadLocalStack<StatefulBeanContext>();
 
@@ -82,9 +87,12 @@
    protected boolean removed;
 
    protected String containerName;
+   
+   protected boolean replicationIsPassivation = true;
+   
+   protected transient boolean passivated = false;
 
    public StatefulBeanContext()
-
    {
 
    }
@@ -95,6 +103,28 @@
          extractBeanAndInterceptors();
       return contains;
    }
+   
+   /**
+    * Makes a copy of the contains list so nested callback iterators
+    * can iterate over it without concern that another thread will
+    * remove the context.
+    * 
+    * TODO replace contains list with a concurrent collection
+    */
+   private List<StatefulBeanContext> getThreadSafeContains()
+   {
+      // Call getContains() to ensure unmarshalling
+      List<StatefulBeanContext> orig = getContains();
+      List<StatefulBeanContext> copy = null;
+      if (orig != null)
+      {
+         synchronized (orig)
+         {
+            copy = new ArrayList<StatefulBeanContext>(orig);
+         }
+      }
+      return copy;
+   }
 
    public EntityManager getExtendedPersistenceContext(String id)
    {
@@ -122,13 +152,63 @@
       }
       extendedPCS.put(id, pc);
    }
+   
+   public boolean scanForExtendedPersistenceContext(String id, StatefulBeanContext ignore)
+   {
+      if (this.equals(ignore))
+         return false;
+      
+      if (!removed)
+      {
+         Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
+         if (extendedPCS != null && extendedPCS.containsKey(id))
+            return true;
+      }
+      
+      if (getContains() != null)
+      {
+         synchronized (contains)
+         {
+            for (StatefulBeanContext contained : contains)
+            {
+               if (!contained.equals(ignore))
+               {
+                  if (contained.scanForExtendedPersistenceContext(id, ignore))
+                     return true;
+               }
+            }
+         }
+      }
+      
+      return false;
+   }
+   
+   public void removeExtendedPersistenceContext(String id)
+   {
+      Map<String, EntityManager> extendedPCS = getExtendedPersistenceContexts();
+      if (extendedPCS != null)
+      {
+         extendedPCS.remove(id);
+      }
+      
+      if (getContains() != null)
+      {
+         synchronized (contains)
+         {
+            for (StatefulBeanContext contained: contains)
+            {
+               contained.removeExtendedPersistenceContext(id);
+            }
+         }
+      }
+   }
 
    public Map<String, EntityManager> getExtendedPersistenceContexts()
    {
       if (persistenceContexts == null)
       {
          if (bean == null)
-            getInstance(); // unmarshall
+            extractBeanAndInterceptors(); // unmarshall
       }
       return persistenceContexts;
    }
@@ -138,13 +218,73 @@
       return containedIn;
    }
 
+   public StatefulBeanContext getUltimateContainedIn()
+   {
+      StatefulBeanContext child = this;
+      StatefulBeanContext parent = containedIn;
+      
+      while (parent != null)
+      {
+         child = parent;
+         parent = parent.getContainedIn();
+      }
+      
+      if (parent == null && this != child)
+      {
+         // Don't hand out a ref to our parent obtained by walking the
+         // tree. Rather, get it from its cache.  This gives the cache
+         // a chance to activate it if it hasn't been.  We don't want
+         // to mark the parent as in use though.
+         StatefulCache ultimateCache = ((StatefulContainer)child.getContainer()).getCache();
+         child = ultimateCache.get(child.getId(), false);
+      }
+      
+      return child;
+   }
+
    public void addContains(StatefulBeanContext ctx)
    {
-      if (contains == null)
+      if (getContains() == null)
          contains = new ArrayList<StatefulBeanContext>();
-      contains.add(ctx);
-      ctx.containedIn = this;
+      
+      synchronized (contains)
+      {
+         contains.add(ctx);
+         ctx.containedIn = this;
+      }      
    }
+   
+   public void removeContains(StatefulBeanContext ctx)
+   {
+      if (getContains() != null) // call getContains() to ensure unmarshalling
+      {
+         // Need to be thread safe
+         synchronized (contains)
+         {
+            if (contains.remove(ctx))
+            {             
+               ctx.containedIn = null;
+            }
+         }
+         
+         if (removed)
+         {
+            // Close out any XPCs that are no longer referenced
+            cleanExtendedPCs();
+         }
+         
+         if (getCanRemoveFromCache())
+         {  
+            if (containedIn != null)
+            {
+               containedIn.removeContains(this);               
+            }
+            
+            //  Notify our cache to remove us as we no longer have children            
+            ((StatefulContainer) getContainer()).getCache().remove(getId());
+         }
+      }
+   }
 
    public StatefulBeanContext pushContainedIn()
    {
@@ -162,53 +302,216 @@
          // cache. If placed in the cache, it could be independently passivated,
          // activated and replicated, again breaking object references due to
          // independent marshalling. Instead, we return a proxy to it that will 
-         // be stored in its container's cache
+         // be stored in its container's cache         
          containedIn = propagatedContainedIn.get();
          NestedStatefulBeanContext nested = new NestedStatefulBeanContext();
          nested.id = id;
          nested.container = getContainer();
          nested.containerName = containerName;
          nested.bean = bean;
+         nested.replicationIsPassivation = replicationIsPassivation;
          containedIn.addContains(nested);
          thisPtr = new ProxiedStatefulBeanContext(nested);
       }
       propagatedContainedIn.push(thisPtr);
       return thisPtr;
    }
+   
+   /**
+    * Checks whether this context or any of its children are in use.
+    */
+   public boolean getCanPassivate()
+   {
+      boolean canPassivate = (removed || !inUse);
+      
+      if (canPassivate && getContains() != null)
+      {
+         synchronized (contains)
+         {
+            for (StatefulBeanContext contained : contains)
+            {
+               if (!contained.getCanPassivate())
+               {
+                  canPassivate = false;
+                  break;
+               }
+            }
+         }
+      }
+      
+      return canPassivate;
+   }
 
+   /**
+    * Notification from a non-clustered StatefulCache to inform
+    * that we are about to be passivated.
+    */
    public void prePassivate()
    {
-      getContainer().invokePrePassivate(this);
+      if (!removed && !passivated)
+      {
+         getContainer().invokePrePassivate(this);
+         passivated = true;
+      }
       
       // Pass the call on to any nested children
-      if (contains != null)
+      List<StatefulBeanContext> children = getThreadSafeContains();
+      if (children != null)
       {
-         for (StatefulBeanContext contained : contains)
+         for (StatefulBeanContext contained : children)
          {
             contained.prePassivate();
          }
       }
    }
 
+   /**
+    * Notification from a non-clustered StatefulCache to inform
+    * that we have been activated.
+    */
    public void postActivate()
    {
-      getContainer().invokePostActivate(this); // handled in getInstance()
+      if (!removed && passivated)
+      {
+         getContainer().invokePostActivate(this);
+         passivated = false;
+      }
       
       // Pass the call on to any nested children
-      if (contains != null)
-      {
-         for (StatefulBeanContext contained : contains)
+      List<StatefulBeanContext> children = getThreadSafeContains();
+      if (children != null)
+      {  
+         for (StatefulBeanContext contained : children)
          {
             contained.postActivate();
          }
       }
    }
+   
+   /**
+    * Notification from a ClusteredStatefulCache to inform
+    * that a bean that is stored in the distributed cache is now
+    * being passivated as well. Something of a misnomer
+    * as it is possible the bean wasn't replicated (if it implements
+    * Optimized it may have been activated and then a reference left
+    * in the cache without the bean ever being replicated).
+    */
+   public void passivateAfterReplication()
+   {
+      if (!removed && !passivated)
+      {  
+         getInstance(); // make sure we're unmarshalled
+         getContainer().invokePrePassivate(this);
+         passivated = true;
+      }
+      
+      // Pass the call on to any nested children
+      List<StatefulBeanContext> children = getThreadSafeContains();
+      if (children != null)
+      {
+         for (StatefulBeanContext contained : children)
+         {
+            contained.passivateAfterReplication();
+         }
+      }
+   }
+   
+   public void activateAfterReplication()
+   {
+      if (!removed && passivated)
+      {         
+         getInstance(); // make sure we're unmarshalled
+         getContainer().invokePostActivate(this);
+         passivated = false;
+      }
+      
+      // Pass the call on to any nested children
+      List<StatefulBeanContext> children = getThreadSafeContains();
+      if (children != null)
+      {
+         for (StatefulBeanContext contained : children)
+         {
+            contained.activateAfterReplication();
+         }
+      }
+   }
 
+   public boolean getReplicationIsPassivation()
+   {
+      return replicationIsPassivation;
+   }
+
+   public void setReplicationIsPassivation(boolean replicationIsPassivation)
+   {
+      this.replicationIsPassivation = replicationIsPassivation;
+   }
+
+   /**
+    * Notification from a ClusteredStatefulCache before a bean is
+    * replicated.
+    */
+   public void preReplicate()
+   {
+      if (!removed && replicationIsPassivation && !passivated)
+      {
+         getContainer().invokePrePassivate(this);
+         passivated = true;
+      }
+      
+      // Pass the call on to any nested children
+      List<StatefulBeanContext> children = getThreadSafeContains();
+      if (children != null)
+      {
+         for (StatefulBeanContext contained : children)
+         {
+            contained.preReplicate();
+         }
+      }
+   }
+
+   /**
+    * Notification from a ClusteredStatefulCache after the bean
+    * is fetched from the distributed cache. Something of a misnomer
+    * as it is possible the bean wasn't replicated (if it implements
+    * Optimized it can be fetched from the cache twice without ever
+    * being replicated).
+    */
+   public void postReplicate()
+   {
+      // We may not have been replicated, so only invoke @PostActivate
+      // if we are marked as passivated
+      if (!removed && passivated)
+      {
+         getContainer().invokePostActivate(this);
+         passivated = false;
+      }
+      
+      // Pass the call on to any nested children
+      List<StatefulBeanContext> children = getThreadSafeContains();
+      if (children != null)
+      {
+         for (StatefulBeanContext contained : children)
+         {
+            contained.postReplicate();
+         }
+      }
+   }
+
    public void popContainedIn()
    {
       propagatedContainedIn.pop();
    }
 
+   public boolean isInUse()
+   {
+      return inUse;
+   }
+
+   public void setInUse(boolean inUse)
+   {
+      this.inUse = inUse;
+   }
+
    public boolean isDiscarded()
    {
       return discarded;
@@ -265,23 +568,61 @@
          return;
       removed = true;
       RuntimeException exceptionThrown = null;
-      if (contains != null)
+      
+      // Close any XPCs that haven't been injected into live 
+      // beans in our family
+      try
       {
-         for (StatefulBeanContext contained : contains)
+         cleanExtendedPCs();
+      }
+      catch (RuntimeException e)
+      {
+         // we still need to remove ourself from any parent, so save
+         // the thrown exception and rethrow it after we have cleaned up.
+         if (exceptionThrown == null)
+            exceptionThrown = e;
+      }
+      
+      if (containedIn != null && getCanRemoveFromCache())
+      {
+         try
          {
-            try
-            {
-               ((StatefulContainer) contained.getContainer()).getCache().remove(
-                       contained.getId());
-            }
-            catch (RuntimeException e)
-            {
-               // we still need to remove every contained SFSB
-               // save the thrown exception and rethrow it after we have cleaned up.
+            containedIn.removeContains(this);
+         }
+         catch (RuntimeException e)
+         {
+            // we still need to clean internal state, so save the
+            // thrown exception and rethrow it after we have cleaned up.
+            if (exceptionThrown == null)
                exceptionThrown = e;
-            }
          }
       }
+            
+      // Clear out refs to our bean and interceptors, to reduce our footprint
+      // in case we are still cached for our refs to any XPCs
+      bean = null;
+      interceptorInstances = null;
+      
+      if (exceptionThrown != null) throw new RuntimeException("exception thrown while removing SFSB", exceptionThrown);
+   }
+   
+   public boolean getCanRemoveFromCache()
+   {
+      boolean canRemove = removed;
+      
+      if (canRemove && getContains() != null) // call getContains() to ensure unmarshalling
+      {
+         synchronized (contains)
+         {
+            canRemove = (contains.size() == 0);
+         }
+      }
+      
+      return canRemove;
+   }
+   
+   private void cleanExtendedPCs()
+   {    
       try
       {
          Transaction tx = TxUtil.getTransactionManager().getTransaction();
@@ -292,7 +633,7 @@
                public void beforeCompletion()
                {
                }
-
+      
                public void afterCompletion(int status)
                {
                   closeExtendedPCs();
@@ -304,11 +645,14 @@
             closeExtendedPCs();
          }
       }
+      catch (RuntimeException e)
+      {
+         throw e;
+      }
       catch (Exception e)
       {
-         throw new RuntimeException("Exception thrown while removing SFSB", e);
+         throw new RuntimeException("Error cleaning PersistenceContexts in SFSB removal", e);
       }
-      if (exceptionThrown != null) throw new RuntimeException("exception thrown while removing SFSB", exceptionThrown);
    }
 
    private void closeExtendedPCs()
@@ -317,18 +661,58 @@
       if (extendedPCS != null)
       {
          RuntimeException exceptionThrown = null;
-         for (EntityManager pc : extendedPCS.values())
+         
+         List<String> closedXPCs = new ArrayList<String>();
+         StatefulBeanContext topCtx = getUltimateContainedIn();
+         
+         for (Iterator<Map.Entry<String,EntityManager>> iter = extendedPCS.entrySet().iterator(); 
+               iter.hasNext();)
          {
-            try
+            Map.Entry<String,EntityManager> entry = iter.next();
+            String id = entry.getKey();
+            EntityManager xpc = entry.getValue();
+            
+            // Only close the XPC if our live parent(s) or cousins 
+            // don't also have a ref to it
+            boolean canClose = topCtx.scanForExtendedPersistenceContext(id, this);
+            
+            if (canClose && getContains() != null)
             {
-               pc.close();
+               // Only close the XPC if our live childrenScan don't have a ref
+               synchronized (contains)
+               {
+                  for (StatefulBeanContext contained : contains)
+                  {
+                     if (contained.scanForExtendedPersistenceContext(id, null))
+                     {
+                        canClose = false;
+                        break;
+                     }
+                  }
+               }
             }
-            catch (RuntimeException e)
+            
+            if (canClose)
             {
-               exceptionThrown = e;
+               try
+               {
+                  xpc.close();
+                  closedXPCs.add(id);
+               }
+               catch (RuntimeException e)
+               {
+                  exceptionThrown = e;
+               }
             }
          }
-         if (exceptionThrown != null) throw new RuntimeException("Error cleaning up PersistenceContexts in SFSB removal", exceptionThrown);
+         
+         // Clean all refs to the closed XPCs from the tree
+         for (String id : closedXPCs)
+         {
+            topCtx.removeExtendedPersistenceContext(id);
+         }            
+         
+         if (exceptionThrown != null) throw new RuntimeException("Error closing PersistenceContexts in SFSB removal", exceptionThrown);
       }
    }
 
@@ -353,7 +737,6 @@
       if (bean == null)
       {
          extractBeanAndInterceptors();
-         // getContainer().invokePostActivate(this);
       }
       return bean;
    }
@@ -366,9 +749,14 @@
 
    // these are public for fast concurrent access/update
    public volatile boolean markedForPassivation = false;
+   
+   public volatile boolean markedForReplication = false;
+   
+   // BES 2007/02/16 make private and use a getter/setter as
+   // ProxiedStatefulBeanContext needs to pass the value on
+   // to its NestedStatefulBeanContext
+   private volatile boolean inUse = false;
 
-   public volatile boolean inUse = false;
-
    public volatile long lastUsed = System.currentTimeMillis();
 
    @Override
@@ -383,6 +771,9 @@
 
    protected void extractBeanAndInterceptors()
    {
+      if (beanMO == null)
+         return;
+      
       try
       {
          Object[] beanAndInterceptors = (Object[]) beanMO.get();
@@ -398,15 +789,19 @@
             }
          }
          contains = (List<StatefulBeanContext>) beanAndInterceptors[3];
-/* We should let pm to handle this.
+         // Reestablish links to our children; if they serialize a link
+         // to us for some reason serialization blows up
          if (contains != null)
          {
-            for (StatefulBeanContext ctx : contains)
+            for (StatefulBeanContext contained : contains)
             {
-               ctx.getContainer().invokePostActivate(ctx);
+               contained.containedIn = this;
             }
          }
-*/
+         
+         // Don't hold onto the beanMo, as its contents are mutable
+         // and we don't want to serialize a stale version of them
+         beanMO = null;
       }
       catch (IOException e)
       {
@@ -422,13 +817,10 @@
    {
       out.writeUTF(containerName);
       out.writeObject(id);
-      out.writeObject(metadata);
-      if (bean == null)
+      out.writeObject(metadata);      
+      
+      if (beanMO == null)
       {
-         out.writeObject(beanMO);
-      }
-      else
-      {
          Object[] beanAndInterceptors = new Object[4];
          beanAndInterceptors[0] = bean;
          beanAndInterceptors[1] = persistenceContexts;
@@ -439,18 +831,27 @@
             beanAndInterceptors[2] = list;
          }
          beanAndInterceptors[3] = contains;
-/* Since replication also uses this. We c'ant call this directly. Let pm handle this.
-         if (contains != null)
-         {
-            for (StatefulBeanContext ctx : contains)
-            {
-               ctx.prePassivate();
-            }
-         }
-*/
-         beanMO = new MarshalledObject(beanAndInterceptors);
+         
+         // BES 2007/02/12 Previously we were trying to hold a ref to
+         // beanMO after we created it, but that exposes the risk of
+         // two different versions of the constituent state that
+         // can fall out of sync.  So now we just write a local variable.
+         
+         MarshalledObject mo = new MarshalledObject(beanAndInterceptors);
+         out.writeObject(mo);         
+      }
+      else
+      {
+         // We've been deserialized and are now being re-serialized, but
+         // extractBeanAndInterceptors hasn't been called in between.
+         // This can happen if a passivated session is involved in a 
+         // JBoss Cache state transfer to a newly deployed node.
          out.writeObject(beanMO);
       }
+      
+
+      out.writeBoolean(removed);
+      out.writeBoolean(replicationIsPassivation);
    }
 
    public void readExternal(ObjectInput in) throws IOException,
@@ -460,6 +861,11 @@
       id = in.readObject();
       metadata = (SimpleMetaData) in.readObject();
       beanMO = (MarshalledObject) in.readObject();
+      removed = in.readBoolean();
+      replicationIsPassivation = in.readBoolean();
+      
+      // If we've just been deserialized, we are passivated
+      passivated = true;
    }
 
    public Object getInvokedMethodKey()




More information about the jboss-cvs-commits mailing list