[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Brian Stansberry brian.stansberry at jboss.com
Thu Jul 20 17:58:21 EDT 2006


  User: bstansberry
  Date: 06/07/20 17:58:21

  Modified:    src/org/jboss/cache/statetransfer    
                        StateTransferFactory.java
                        StateTransferGenerator_200.java
                        StateTransferIntegrator_200.java
  Added:       src/org/jboss/cache/statetransfer    
                        StateTransferManager.java
  Log:
  [JBCACHE-465] Extract the state transfer code out of TreeCache
  
  Revision  Changes    Path
  1.8       +7 -0      JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferFactory.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -b -r1.7 -r1.8
  --- StateTransferFactory.java	19 Jul 2006 21:34:44 -0000	1.7
  +++ StateTransferFactory.java	20 Jul 2006 21:58:21 -0000	1.8
  @@ -14,6 +14,13 @@
   import java.io.ByteArrayInputStream;
   import java.io.IOException;
   
  +/**
  + * Factory class able to create {@link StateTransferGenerator} and 
  + * {@link StateTransferIntegrator} instances.
  + * 
  + * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
  + * @version $Revision: 1.8 $
  + */
   public abstract class StateTransferFactory
   {
      private static final short RV_200 = Version.getVersionShort("2.0.0");
  
  
  
  1.3       +36 -55    JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferGenerator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- StateTransferGenerator_200.java	19 Jul 2006 08:29:18 -0000	1.2
  +++ StateTransferGenerator_200.java	20 Jul 2006 21:58:21 -0000	1.3
  @@ -12,21 +12,17 @@
   import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
   import org.jboss.cache.Version;
  -import org.jboss.cache.aop.InternalDelegate;
  -import org.jboss.cache.aop.PojoCache;
  -import org.jboss.cache.aop.util.ObjectUtil;
   import org.jboss.cache.loader.NodeData;
   import org.jboss.cache.util.ExposedByteArrayOutputStream;
   import org.jboss.invocation.MarshalledValueOutputStream;
   
   import java.io.IOException;
  -import java.io.ObjectOutputStream;
   import java.io.OutputStream;
   import java.util.Iterator;
   import java.util.Map;
   import java.util.Set;
   
  -class StateTransferGenerator_200 implements StateTransferGenerator
  +public class StateTransferGenerator_200 implements StateTransferGenerator
   {
      public static final short STATE_TRANSFER_VERSION = 
         Version.getVersionShort("2.0.0.GA");
  @@ -36,7 +32,7 @@
      private TreeCache cache;
      private Set internalFqns;
   
  -   StateTransferGenerator_200(TreeCache cache)
  +   protected StateTransferGenerator_200(TreeCache cache)
      {
         this.cache        = cache;
         this.internalFqns = cache.getInternalFqns();
  @@ -57,7 +53,6 @@
         int[] sizes = new int[3];
         byte[] retval = null;
         int lastSize;
  -      MarshalledValueOutputStream out;
   
         ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(1024);
         try {
  @@ -76,20 +71,16 @@
         try {
            
            if(generateTransient) {
  -            out = new MarshalledValueOutputStream(baos);
  -            marshallTransientState(rootNode, out);
  -            out.close();
  +            marshallTransientState(rootNode, baos);
               sizes[0] = baos.size() - lastSize;
               lastSize = baos.size();
               if (debug) {
                  log.debug("generated the in-memory state (" + sizes[0] + 
                            " bytes)");
               }
  +            
               // Return any state associated with the subtree but not stored in it
  -            if (cache instanceof PojoCache) {
  -               out = new MarshalledValueOutputStream(baos);
  -               marshallAssociatedState(fqn, out);
  -               out.close();
  +            marshallAssociatedState(fqn, baos);
                  sizes[1] = baos.size() - lastSize;
                  lastSize = baos.size();
                  if (debug) {
  @@ -98,7 +89,6 @@
                  }
               }
            }
  -      }
         catch(Throwable t) {
            log.error("failed getting the in-memory (transient) state", t);
            if (!suppressErrors) 
  @@ -168,7 +158,11 @@
         
      }
      
  -   private void initializeStateTransfer(OutputStream baos) throws IOException
  +   /**
  +    * Write the state transfer version as well as placeholder ints for the
  +    * sizes of the components of the state transfer.
  +    */
  +   protected void initializeStateTransfer(OutputStream baos) throws IOException
      {
         MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos);
         out.writeShort(STATE_TRANSFER_VERSION);
  @@ -181,12 +175,15 @@
   
      /**
       * Do a preorder traversal: visit the node first, then the node's children
  +    * 
       * @param out
       * @throws Exception
       */
  -   private void marshallTransientState(DataNode node, 
  -                                       ObjectOutputStream out) throws Exception 
  +   protected void marshallTransientState(DataNode node, 
  +                                         OutputStream baos) throws Exception 
      {  
  +      MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos);
  +      
         if (internalFqns.contains(node.getFqn()))
            return;
         
  @@ -209,45 +206,29 @@
            Map.Entry entry = (Map.Entry) it.next();
            marshallTransientState((DataNode) entry.getValue(), out);
         }
  +      
  +      out.close();
      }
      
      /**
  -    * For each node in the internal reference map that is associated with the 
  -    * given Fqn, writes an Object[] to the stream containing the node's
  -    * name and the value of its sole attribute.  Does nothing if the Fqn is the 
  -    * root node (i.e. "/") or if it is in the internal reference area itself.
  +    * Does nothing in this base class; can be overridden in a subclass.
       */
  -   private void marshallAssociatedState(Fqn fqn, ObjectOutputStream out) 
  -         throws Exception
  +   protected void marshallAssociatedState(Fqn fqn,
  +                                          OutputStream baos) throws Exception 
      {
  -      if (fqn == null 
  -            || fqn.size() == 0 
  -            || fqn.isChildOf(InternalDelegate.JBOSS_INTERNAL))
  -         return;
  -
  -      DataNode refMapNode = cache.get(InternalDelegate.JBOSS_INTERNAL_MAP);
  -      
  -      Map children = null;
  -      if (refMapNode != null && (children = refMapNode.getChildren()) != null) {
  -         
  -         String targetFqn = ObjectUtil.getIndirectFqn(fqn.toString());
  -         
  -         Map.Entry entry;
  -         String key;
  -         DataNode value;
  -         for (Iterator iter = children.entrySet().iterator(); iter.hasNext();) {
  -            entry = (Map.Entry) iter.next();
  -            key = (String) entry.getKey();
  -            if (key.startsWith(targetFqn)) {
  -               value = (DataNode) entry.getValue();
  -               out.writeObject(new Object[] { key, value.get(key) });
  -            }
  -         }
  +      // no-op in this base class      
         }
         
  +   protected TreeCache getTreeCache()
  +   {
  +      return cache;
      }
      
  -   static void overwriteInt(byte[] bytes, int startpos, int newVal) 
  +   /**
  +    * Overwrites the bytes in the given array starting at the given position
  +    * with another new integer.
  +    */
  +   public static void overwriteInt(byte[] bytes, int startpos, int newVal) 
      {   
          bytes[startpos]     = (byte) (newVal >>> 24);
          bytes[startpos + 1] = (byte) (newVal >>> 16);
  
  
  
  1.4       +54 -48    JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferIntegrator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -b -r1.3 -r1.4
  --- StateTransferIntegrator_200.java	19 Jul 2006 21:34:44 -0000	1.3
  +++ StateTransferIntegrator_200.java	20 Jul 2006 21:58:21 -0000	1.4
  @@ -11,9 +11,6 @@
   import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
  -import org.jboss.cache.TreeNode;
  -import org.jboss.cache.aop.InternalDelegate;
  -import org.jboss.cache.aop.PojoCache;
   import org.jboss.cache.buddyreplication.BuddyManager;
   import org.jboss.cache.factories.NodeFactory;
   import org.jboss.cache.loader.CacheLoader;
  @@ -29,16 +26,16 @@
   import java.util.Map;
   import java.util.Set;
   
  -class StateTransferIntegrator_200 implements StateTransferIntegrator
  +public class StateTransferIntegrator_200 implements StateTransferIntegrator
   {
      /** Number of bytes at the beginning of the state transfer byte[]
       *  utilized by meta-information about the composition of the byte[] 
       *  (6 for stream header, 2 for version short, 
       *  3 * 4 for lengths of the state components, 2 bytes for close)   
       */
  -   private static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
  +   protected static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
      
  -   private Log log = LogFactory.getLog(getClass().getName());
  +   protected Log log = LogFactory.getLog(getClass().getName());
      
      private TreeCache cache;
      private Fqn     targetFqn;
  @@ -52,7 +49,7 @@
      private Set internalFqns;
      
      
  -   StateTransferIntegrator_200(byte[] state, Fqn targetFqn,  
  +   protected StateTransferIntegrator_200(byte[] state, Fqn targetFqn,  
                                  TreeCache cache) throws Exception
      {
         this.targetFqn = targetFqn;
  @@ -118,46 +115,14 @@
         }
      }
      
  -   private void integrateAssociatedState() throws Exception
  +   /**
  +    * Provided for subclasses that deal with associated state.
  +    * 
  +    * @throws Exception
  +    */
  +   protected void integrateAssociatedState() throws Exception
      {
  -      if (associatedSize > 0 && cache instanceof PojoCache) {
  -         
  -         DataNode refMapNode = cache.get(InternalDelegate.JBOSS_INTERNAL_MAP);
  -
  -         ByteArrayInputStream in_stream=new ByteArrayInputStream(state, HEADER_LENGTH + transientSize, associatedSize);
  -         MarshalledValueInputStream in=new MarshalledValueInputStream(in_stream);
  -         
  -         try {
  -            Object[] nameValue;
  -            while ((nameValue = (Object[]) in.readObject()) != null) {
  -               TreeNode target = refMapNode.getChild(nameValue[0]);
  -               
  -               if (target == null) {
  -                  // Create the node
  -                  Fqn fqn = new Fqn(InternalDelegate.JBOSS_INTERNAL_MAP, nameValue[0]);
  -                  target = factory.createDataNode(nodeType, 
  -                                                  nameValue[0], 
  -                                                  fqn, 
  -                                                  refMapNode, 
  -                                                  null, 
  -                                                  true,
  -                                                  cache);
  -                  refMapNode.addChild(nameValue[0], target);
  -               }
  -               
  -               target.put(nameValue[0], nameValue[1]);
  -            }
  -         }
  -         catch (EOFException eof) {
  -            // all done
  -         }
  -         
  -         if (log.isTraceEnabled())
  -            log.trace("associated state successfully integrated for " + targetFqn);
  -      }
  -      else if (log.isTraceEnabled()) {
  -         log.trace("No need to integrate associated state for " + targetFqn);
  -      }
  +      // no-op in this base class
      }
      
      public void integratePersistentState() throws Exception
  @@ -180,8 +145,7 @@
                  log.trace("setting the persistent state");
               // cache_loader.remove(Fqn.fromString("/"));
               byte[] persistentState = getPersistentState();
  -            loader.storeState(persistentState,
  -                                                      targetFqn);
  +            loader.storeState(persistentState, targetFqn);
               if (log.isTraceEnabled())
                  log.trace("setting the persistent state was successful");
            }            
  @@ -365,4 +329,46 @@
            integrateRetainedNode(child, descendant);
         }
      }
  +
  +   protected int getAssociatedSize()
  +   {
  +      return associatedSize;
  +   }
  +
  +   protected TreeCache getCache()
  +   {
  +      return cache;
  +   }
  +
  +   protected NodeFactory getFactory()
  +   {
  +      return factory;
  +   }
  +
  +   protected byte getNodeType()
  +   {
  +      return nodeType;
  +   }
  +
  +   protected int getPersistentSize()
  +   {
  +      return persistentSize;
  +   }
  +
  +   protected byte[] getState()
  +   {
  +      return state;
  +   }
  +
  +   protected int getTransientSize()
  +   {
  +      return transientSize;
  +   }
  +
  +   protected Fqn getTargetFqn()
  +   {
  +      return targetFqn;
  +   }
  +   
  +   
   }
  
  
  
  1.1      date: 2006/07/20 21:58:21;  author: bstansberry;  state: Exp;JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  Index: StateTransferManager.java
  ===================================================================
  package org.jboss.cache.statetransfer;
  
  import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
  import java.util.Vector;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jboss.cache.CacheException;
  import org.jboss.cache.DataNode;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.TreeCache;
  import org.jboss.cache.config.Option;
  import org.jboss.cache.loader.CacheLoaderManager;
  import org.jboss.cache.lock.TimeoutException;
  import org.jboss.cache.marshall.MethodCallFactory;
  import org.jboss.cache.marshall.MethodDeclarations;
  import org.jboss.cache.marshall.VersionAwareMarshaller;
  import org.jgroups.blocks.MethodCall;
  
  public class StateTransferManager
  {
     protected final static Log log = LogFactory.getLog(StateTransferManager.class);
     
     private TreeCache treeCache;
     private long[] loadStateTimeouts =  { 400, 800, 1200 };
     
     public StateTransferManager(TreeCache cache)
     {
        this.treeCache = cache;
     }
     
     public TreeCache getTreeCache()
     {
        return treeCache;
     }
     
     public void setTreeCache(TreeCache cache)
     {
        this.treeCache = cache;
     }
     
     public long[] getLoadStateTimeouts()
     {
        return loadStateTimeouts;
     }
  
     public void setLoadStateTimeouts(long[] loadStateTimeouts)
     {
        this.loadStateTimeouts = loadStateTimeouts;
     }
  
     /**
      * Returns the state for the portion of the tree named by <code>fqn</code>.
      * <p/>
      * State returned is a serialized byte[][], element 0 is the transient state
      * (or null), and element 1 is the persistent state (or null).
      *
      * @param fqn            Fqn indicating the uppermost node in the
      *                       portion of the tree whose state should be returned.
      * @param timeout        max number of ms this method should wait to acquire
      *                       a read lock on the nodes being transferred
      * @param force          if a read lock cannot be acquired after
      *                       <code>timeout</code> ms, should the lock acquisition
      *                       be forced, and any existing transactions holding locks
      *                       on the nodes be rolled back? <strong>NOTE:</strong>
      *                       In release 1.2.4, this parameter has no effect.
      * @param suppressErrors should any Throwable thrown be suppressed?
      * @return a serialized byte[][], element 0 is the transient state
      *         (or null), and element 1 is the persistent state (or null).
      * @throws UnsupportedOperationException if persistent state transfer is
      *                                       enabled, the requested Fqn is not the root node, and the
      *                                       cache loader does not implement {@link ExtendedCacheLoader}.
      */
     public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
     {
        TreeCache cache = getTreeCache();
        
        VersionAwareMarshaller marshaller_ = null;
        if (cache.getConfiguration().isUseRegionBasedMarshalling())
        {
           marshaller_ = cache.getMarshaller();
        }
        
        if (marshaller_ != null)
        {
           // can't give state for regions currently being activated/inactivated
           if (cache.isActivatingDeactivating(fqn))
           {
              if (log.isDebugEnabled())
                 log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
              return null;
           }
     
           // Can't give state for inactive nodes
           if (marshaller_.isInactive(fqn.toString()))
           {
              if (log.isDebugEnabled())
                 log.debug("ignoring _getState() for inactive region " + fqn);
              return null;
           }
        }
  
        DataNode rootNode = cache.findNode(fqn);
        if (rootNode == null)
           return null;
  
        boolean fetchTransientState  = cache.getConfiguration().isFetchInMemoryState();
        CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
        boolean fetchPersistentState = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState();
  
        Object owner = getOwnerForLock();
  
        try
        {
           if (fetchTransientState || fetchPersistentState)
           {
              log.info("locking the " + fqn + " subtree to return the in-memory (transient) state");
              acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
           }
  
           StateTransferGenerator generator = getStateTransferGenerator();
  
           return generator.generateStateTransfer(rootNode,
              fetchTransientState,
              fetchPersistentState,
              suppressErrors);
        }
        finally
        {
           releaseStateTransferLocks(rootNode, owner, true);
        }
     }
     
     /**
      * Requests state from each of the given source nodes in the cluster
      * until it gets it or no node replies with a timeout exception.  If state
      * is returned, integrates it into the given DataNode.  If no state is
      * returned but a node replies with a timeout exception, the calls will be
      * repeated with a longer timeout, until 3 attempts have been made.
      *
      * @param subtreeRoot     Fqn of the topmost node in the subtree whose
      *                        state should be transferred.
      * @param integrationRoot the DataNode into which state should be integrated
      * @param sources         the cluster nodes to query for state
      * @param cl              the classloader to use to unmarshal the state.
      *                        Can be <code>null</code>.
      * @throws Exception
      */
     public void loadState(Fqn subtreeRoot, DataNode integrationRoot,
                            Object[] sources, ClassLoader cl)
        throws Exception
     {
        TreeCache cache = getTreeCache();
        // Call each node in the cluster with progressively longer timeouts
        // until we get state or no cluster node returns a TimeoutException
        long[] timeouts = getLoadStateTimeouts();
        Object ourself = cache.getLocalAddress(); // ignore ourself when we call
        boolean stateSet = false;
        TimeoutException timeoutException = null;
        Object timeoutTarget = null;
  
        boolean trace = log.isTraceEnabled();
  
        for (int i = 0; i < timeouts.length; i++)
        {
           timeoutException = null;
  
           Boolean force = (i == timeouts.length - 1) ? Boolean.TRUE
              : Boolean.FALSE;
  
           MethodCall psmc = MethodCallFactory.create(MethodDeclarations.getPartialStateMethod,
              new Object[]{subtreeRoot,
                 new Long(timeouts[i]),
                 force,
                 Boolean.FALSE});
  
           MethodCall replPsmc = MethodCallFactory.create(MethodDeclarations.replicateMethod,
              new Object[]{psmc});
  
           // Iterate over the group members, seeing if anyone
           // can give us state for this region
           for (int j = 0; j < sources.length; j++)
           {
              Object target = sources[j];
              if (ourself.equals(target))
                 continue;
  
              Vector targets = new Vector();
              targets.add(target);
  
              List responses = cache.callRemoteMethods(targets, replPsmc, true,
                 true, cache.getConfiguration().getSyncReplTimeout());
              Object rsp = null;
              if (responses != null && responses.size() > 0)
              {
                 rsp = responses.get(0);
                 if (rsp instanceof byte[])
                 {
                    setState((byte[]) rsp, integrationRoot, cl);
                    stateSet = true;
  
                    if (log.isDebugEnabled())
                    {
                       log.debug("loadState(): " + ourself +
                          " got state from " + target);
                    }
  
                    break;
                 }
                 else if (rsp instanceof TimeoutException)
                 {
                    timeoutException = (TimeoutException) rsp;
                    timeoutTarget = target;
                    if (trace)
                    {
                       log.trace("TreeCache.activateRegion(): " + ourself +
                          " got a TimeoutException from " + target);
                    }
                 }
              }
  
              if (trace)
              {
                 log.trace("TreeCache.activateRegion(): " + ourself +
                    " No usable response from node " + target +
                    (rsp == null ? "" : (" -- received " + rsp)));
              }
           }
  
           // We've looped through all targets; if we got state or didn't
           // but no one sent a timeout (which means no one had state)
           // we don't want to try again
           if (stateSet || timeoutException == null)
              break;
        }
  
        if (!stateSet)
        {
           // If we got a timeout exception on the final try,
           // this is a failure condition
           if (timeoutException != null)
           {
              throw new CacheException("Failed getting state due to timeout on " +
                 timeoutTarget, timeoutException);
           }
  
           if (log.isDebugEnabled())
              log.debug("TreeCache.activateRegion(): No nodes able to give state");
        }
     }
  
     /**
      * Set the portion of the cache rooted in <code>targetRoot</code>
      * to match the given state. Updates the contents of <code>targetRoot</code>
      * to reflect those in <code>new_state</code>.
      * <p/>
      * <strong>NOTE:</strong> This method performs no locking of nodes; it
      * is up to the caller to lock <code>targetRoot</code> before calling
      * this method.
      *
      * @param new_state  a serialized byte[][] array where element 0 is the
      *                   transient state (or null) , and element 1 is the
      *                   persistent state (or null)
      * @param targetRoot fqn of the node into which the state should be integrated
      * @param cl         classloader to use to unmarshal the state, or
      *                   <code>null</code> if the TCCL should be used               
      */
     public void setState(byte[] new_state, Fqn targetRoot, ClassLoader cl)
        throws Exception
     {
        TreeCache cache = getTreeCache();
        DataNode target = cache.findNode(targetRoot);
        if (target == null)
        {
           // Create the integration root, but do not replicate
           Option option = new Option();
           option.setCacheModeLocal(true);
           cache.put(targetRoot, null, option);
           target = cache.findNode(targetRoot);
        }
        
        setState(new_state, target, cl);
     }
  
     /**
      * Set the portion of the cache rooted in <code>targetRoot</code>
      * to match the given state. Updates the contents of <code>targetRoot</code>
      * to reflect those in <code>new_state</code>.
      * <p/>
      * <strong>NOTE:</strong> This method performs no locking of nodes; it
      * is up to the caller to lock <code>targetRoot</code> before calling
      * this method.
      *
      * @param new_state a serialized byte[][] array where element 0 is the
      *                  transient state (or null) , and element 1 is the
      *                  persistent state (or null)
      * @param targetRoot node into which the state should be integrated
      * @param cl         classloader to use to unmarshal the state, or
      *                   <code>null</code> if the TCCL should be used  
      */
     private void setState(byte[] new_state, DataNode targetRoot, ClassLoader cl)
        throws Exception
     {
        if (new_state == null)
        {
           log.info("new_state is null (may be first member in cluster)");
           return;
        }
  
        log.info("received the state (size=" + new_state.length + " bytes)");
  
        Object owner = getOwnerForLock();
        try
        {
           // Acquire a lock on the root node
           acquireLocksForStateTransfer(targetRoot, owner, 
                                        getTreeCache().getConfiguration().getInitialStateRetrievalTimeout(),
                                        true, true);
  
           // 1. Unserialize the states into transient and persistent state
           StateTransferIntegrator integrator = getStateTransferIntegrator(new_state,
                                                                           targetRoot.getFqn());
  
           // 2. If transient state is available, integrate it
           try
           {
              integrator.integrateTransientState(targetRoot, cl);
              notifyAllNodesCreated(targetRoot);
           }
           catch (Throwable t)
           {
              log.error("failed setting transient state", t);
           }
  
           // 3. Store any persistent state
           integrator.integratePersistentState();
        }
        finally
        {
           releaseStateTransferLocks(targetRoot, owner, true);
        }
  
     }
  
  
     /**
      * Acquires locks on a root node for an owner for state transfer.
      */
     protected void acquireLocksForStateTransfer(DataNode root,
                                                 Object lockOwner,
                                                 long timeout,
                                                 boolean lockChildren,
                                                 boolean force)
        throws Exception
     {
        try
        {
           if (lockChildren)
              root.acquireAll(lockOwner, timeout, DataNode.LOCK_TYPE_READ);
           else
              root.acquire(lockOwner, timeout, DataNode.LOCK_TYPE_READ);
        }
        catch (TimeoutException te)
        {
           log.error("Caught TimeoutException acquiring locks on region " +
              root.getFqn(), te);
           if (force)
           {
              // Until we have FLUSH in place, don't force locks
  //            forceAcquireLock(root, lockOwner, lockChildren);
              throw te;
  
           }
           else
           {
              throw te;
           }
        }
     }
  
     /**
      * Releases all state transfer locks acquired.
      *
      * @see #acquireLocksForStateTransfer
      */
     protected void releaseStateTransferLocks(DataNode root,
                                              Object lockOwner,
                                              boolean childrenLocked)
     {
        try
        {
           if (childrenLocked)
              root.releaseAll(lockOwner);
           else
              root.release(lockOwner);
        }
        catch (Throwable t)
        {
           log.error("failed releasing locks", t);
        }
     }
     
     protected StateTransferGenerator getStateTransferGenerator()
     {
        return StateTransferFactory.getStateTransferGenerator(getTreeCache());
     }
     
     protected StateTransferIntegrator getStateTransferIntegrator(byte[] state, Fqn targetFqn)
           throws Exception
     {
        return StateTransferFactory.getStateTransferIntegrator(state, targetFqn, getTreeCache());
     }
  
     /**
      * Generates NodeAdded notifications for all nodes of the tree. This is
      * called whenever the tree is initially retrieved (state transfer)
      */
     private void notifyAllNodesCreated(DataNode curr)
     {
        DataNode n;
        Map children;
  
        if (curr == null) return;
        getTreeCache().notifyNodeCreated(curr.getFqn());
        if ((children = curr.getChildren()) != null)
        {
           for (Iterator it = children.values().iterator(); it.hasNext();)
           {
              n = (DataNode) it.next();
              notifyAllNodesCreated(n);
           }
        }
     }
     
     /**
      * Returns an object suitable for use in node locking, either the current
      * transaction or the current thread if there is no transaction.
      */
     private Object getOwnerForLock()
     {
        Object owner = getTreeCache().getCurrentTransaction();
        if (owner == null)
           owner = Thread.currentThread();
  
        return owner;
     }
  }
  
  
  



More information about the jboss-cvs-commits mailing list