[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Thu Aug 24 18:05:34 EDT 2006


  User: vblagojevic
  Date: 06/08/24 18:05:34

  Modified:    src/org/jboss/cache/statetransfer    
                        StateTransferFactory.java StateTransferManager.java
  Added:       src/org/jboss/cache/statetransfer    
                        StreamingStateTransferGenerator_200.java
                        StreamingStateTransferIntegrator_200.java
  Log:
  streaming state transfer integration (work in progress)
  
  Revision  Changes    Path
  1.9       +39 -1     JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferFactory.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -b -r1.8 -r1.9
  --- StateTransferFactory.java	20 Jul 2006 21:58:21 -0000	1.8
  +++ StateTransferFactory.java	24 Aug 2006 22:05:34 -0000	1.9
  @@ -13,13 +13,15 @@
   
   import java.io.ByteArrayInputStream;
   import java.io.IOException;
  +import java.io.InputStream;
  +import java.io.OutputStream;
   
   /**
    * Factory class able to create {@link StateTransferGenerator} and 
    * {@link StateTransferIntegrator} instances.
    * 
    * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
  - * @version $Revision: 1.8 $
  + * @version $Revision: 1.9 $
    */
   public abstract class StateTransferFactory
   {
  @@ -47,6 +49,18 @@
            return new StateTransferGenerator_200(cache); // current default
      }
      
  +   public static StateTransferGenerator getStateTransferGenerator(OutputStream os, TreeCache cache)
  +   {
  +      short version = cache.getConfiguration().getReplicationVersion();
  +
  +      // Compiler won't let me use a switch
  +
  +      if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
  +         throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  +      else
  +         return new StreamingStateTransferGenerator_200(os,cache); // current default
  +   }
  +   
      /**
       * Gets a StateTransferIntegrator able to handle the given state.
       * 
  @@ -93,4 +107,28 @@
            catch (IOException io) {}
         }
      }
  +
  +   public static StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn, TreeCache treeCache)
  +         throws Exception
  +   {
  +      short version = 0;
  +      MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
  +      try
  +      {
  +         version = in.readShort();
  +      }
  +      catch (IOException io)
  +      {
  +         // No short at the head of the stream means version 123
  +         throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  +      }
  +
  +      // Compiler won't let me use a switch
  +
  +      if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
  +         throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  +      else
  +         return new StreamingStateTransferIntegrator_200(in, fqn, treeCache); // current default
  +   }      
  +           
   }
  
  
  
  1.4       +59 -13    JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -b -r1.3 -r1.4
  --- StateTransferManager.java	16 Aug 2006 10:52:50 -0000	1.3
  +++ StateTransferManager.java	24 Aug 2006 22:05:34 -0000	1.4
  @@ -1,5 +1,7 @@
   package org.jboss.cache.statetransfer;
   
  +import java.io.InputStream;
  +import java.io.OutputStream;
   import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
  @@ -13,6 +15,7 @@
   import org.jboss.cache.TreeCache;
   import org.jboss.cache.config.Option;
   import org.jboss.cache.loader.CacheLoaderManager;
  +import org.jboss.cache.loader.NodeData;
   import org.jboss.cache.lock.TimeoutException;
   import org.jboss.cache.marshall.MethodCallFactory;
   import org.jboss.cache.marshall.MethodDeclarations;
  @@ -23,6 +26,8 @@
   {
      protected final static Log log = LogFactory.getLog(StateTransferManager.class);
      
  +   public static final NodeData STREAMING_DELIMETER_NODE = new NodeData(Fqn.fromString("STREAMING_DELIMETER_NODE"),null);
  +   
      private TreeCache treeCache;
      private long[] loadStateTimeouts =  { 400, 800, 1200 };
      
  @@ -73,8 +78,9 @@
       *                                       enabled, the requested Fqn is not the root node, and the
       *                                       cache loader does not implement {@link ExtendedCacheLoader}.
       */
  -   public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
  +   public byte[] getState(OutputStream os,Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
      {
  +      boolean usingStreamingStateTransfer = os!=null;
         TreeCache cache = getTreeCache();
         
         VersionAwareMarshaller marshaller_ = null;
  @@ -120,7 +126,15 @@
               acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
            }
   
  -         StateTransferGenerator generator = getStateTransferGenerator();
  +         StateTransferGenerator generator = null;
  +         if (usingStreamingStateTransfer)
  +         {
  +            generator = getStateTransferGenerator(os);
  +         }
  +         else
  +         {
  +            generator = getStateTransferGenerator();
  +         }
   
            return generator.generateStateTransfer(rootNode,
               fetchTransientState,
  @@ -133,6 +147,11 @@
         }
      }
      
  +   public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
  +   {
  +      return getState(null,fqn,timeout,force,suppressErrors);
  +   }
  +   
      /**
       * Requests state from each of the given source nodes in the cluster
       * until it gets it or no node replies with a timeout exception.  If state
  @@ -267,7 +286,7 @@
       * @param cl         classloader to use to unmarshal the state, or
       *                   <code>null</code> if the TCCL should be used               
       */
  -   public void setState(byte[] new_state, Fqn targetRoot, ClassLoader cl)
  +   public void setState(Object state, Fqn targetRoot, ClassLoader cl)
         throws Exception
      {
         TreeCache cache = getTreeCache();
  @@ -281,7 +300,7 @@
            target = cache.findNode(targetRoot);
         }
         
  -      setState(new_state, target, cl);
  +      setState(state, target, cl);
      }
   
      /**
  @@ -300,16 +319,26 @@
       * @param cl         classloader to use to unmarshal the state, or
       *                   <code>null</code> if the TCCL should be used  
       */
  -   private void setState(byte[] new_state, DataNode targetRoot, ClassLoader cl)
  +   private void setState(Object state, DataNode targetRoot, ClassLoader cl)
         throws Exception
      {
  -      if (new_state == null)
  +      if (state == null)
         {
            log.info("new_state is null (may be first member in cluster)");
            return;
         }
   
  +      byte [] new_state = null;
  +      InputStream istream = null;
  +      if(state instanceof byte[])
  +      {
  +         new_state = (byte[]) state;     
         log.info("received the state (size=" + new_state.length + " bytes)");
  +      }
  +      else
  +      {
  +         istream = (InputStream) state;
  +      }      
   
         Object owner = getOwnerForLock();
         try
  @@ -319,11 +348,18 @@
                                         getTreeCache().getConfiguration().getInitialStateRetrievalTimeout(),
                                         true, true);
   
  -         // 1. Unserialize the states into transient and persistent state
  -         StateTransferIntegrator integrator = getStateTransferIntegrator(new_state,
  +         StateTransferIntegrator integrator =null;
  +         if(new_state!=null)
  +         {         
  +            integrator = getStateTransferIntegrator(new_state,targetRoot.getFqn());
  +         }
  +         else
  +         {
  +            integrator = getStateTransferIntegrator(istream,
                                                                            targetRoot.getFqn());
  +         }
   
  -         // 2. If transient state is available, integrate it
  +         // If transient state is available, integrate it
            try
            {
               integrator.integrateTransientState(targetRoot, cl);
  @@ -334,7 +370,7 @@
               log.error("failed setting transient state", t);
            }
   
  -         // 3. Store any persistent state
  +         // Store any persistent state
            integrator.integratePersistentState();
         }
         finally
  @@ -407,12 +443,22 @@
         return StateTransferFactory.getStateTransferGenerator(getTreeCache());
      }
      
  +   protected StateTransferGenerator getStateTransferGenerator(OutputStream os)
  +   {
  +      return StateTransferFactory.getStateTransferGenerator(os,getTreeCache());
  +   }
  +   
      protected StateTransferIntegrator getStateTransferIntegrator(byte[] state, Fqn targetFqn)
            throws Exception
      {
         return StateTransferFactory.getStateTransferIntegrator(state, targetFqn, getTreeCache());
      }
   
  +   private StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn) throws Exception
  +   {
  +      return StateTransferFactory.getStateTransferIntegrator(istream, fqn, getTreeCache());
  +   }
  +
      /**
       * Generates NodeAdded notifications for all nodes of the tree. This is
       * called whenever the tree is initially retrieved (state transfer)
  
  
  
  1.1      date: 2006/08/24 22:05:34;  author: vblagojevic;  state: Exp;JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java
  
  Index: StreamingStateTransferGenerator_200.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   * 
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.cache.statetransfer;
  
  import java.io.ObjectOutputStream;
  import java.io.OutputStream;
  import java.util.Iterator;
  import java.util.Map;
  import java.util.Set;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jboss.cache.DataNode;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.TreeCache;
  import org.jboss.cache.Version;
  import org.jboss.cache.loader.NodeData;
  import org.jboss.invocation.MarshalledValueOutputStream;
  
  public class StreamingStateTransferGenerator_200 implements StateTransferGenerator
  {
     public static final short STATE_TRANSFER_VERSION = Version.getVersionShort("2.0.0.GA");
  
     private Log log = LogFactory.getLog(getClass().getName());
  
     private TreeCache cache;
  
     private Set internalFqns;
  
     private OutputStream os;
  
     protected StreamingStateTransferGenerator_200(OutputStream os, TreeCache cache)
     {
        this.cache = cache;
        this.os = os;
        this.internalFqns = cache.getInternalFqns();
     }
  
     public byte[] generateStateTransfer(DataNode rootNode, boolean generateTransient, boolean generatePersistent,
           boolean suppressErrors) throws Throwable
     {
        Fqn fqn = rootNode.getFqn();
        MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
  
        try
        {
           out.writeShort(STATE_TRANSFER_VERSION);
           try
           {
              if (generateTransient)
              {
                 marshallTransientState(rootNode, out);
                 log.debug("generated the in-memory state");
  
                 // Return any state associated with the subtree but not stored in it
                 marshallAssociatedState(fqn, os);
                 log.debug("returning the associated state bytes)");
              }
           }
           catch (Throwable t)
           {
              log.error("failed getting the in-memory (transient) state", t);
              if (!suppressErrors)
                 throw t;
           }
           finally
           {
              log.debug("writing delimeter after transient state");
              out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
           }
  
           if (generatePersistent)
           {
              try
              {
                 //TODO
              }
              catch (Throwable t)
              {
                 log.error("failed getting the persistent state", t);
                 if (!suppressErrors)
                    throw t;
              }
           }
        }
        finally
        {
           out.close();
        }
        return null;
     }
  
     /**
      * Do a preorder traversal: visit the node first, then the node's children
      * 
      * @param out
      * @throws Exception
      */
     protected void marshallTransientState(DataNode node, ObjectOutputStream out) throws Exception
     {
  
        if (internalFqns.contains(node.getFqn()))
           return;
  
        Map attrs;
        NodeData nd;
  
        // first handle the current node
        attrs = node.getData();
        if (attrs == null || attrs.size() == 0)
           nd = new NodeData(node.getFqn());
        else
           nd = new NodeData(node.getFqn(), attrs);
        out.writeObject(nd);
  
        // then visit the children
        Map children = node.getChildren();
        if (children == null)
           return;
        for (Iterator it = children.entrySet().iterator(); it.hasNext();)
        {
           Map.Entry entry = (Map.Entry) it.next();
           marshallTransientState((DataNode) entry.getValue(), out);
        }
     }
  
     /**
      * Does nothing in this base class; can be overridden in a subclass.
      */
     protected void marshallAssociatedState(Fqn fqn, OutputStream baos) throws Exception
     {
        // no-op in this base class      
     }
  
     protected TreeCache getTreeCache()
     {
        return cache;
     }
  }
  
  
  
  1.1      date: 2006/08/24 22:05:34;  author: vblagojevic;  state: Exp;JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java
  
  Index: StreamingStateTransferIntegrator_200.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   * 
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.cache.statetransfer;
  
  import java.io.IOException;
  import java.io.ObjectInputStream;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.Map;
  import java.util.Set;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jboss.cache.DataNode;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.TreeCache;
  import org.jboss.cache.buddyreplication.BuddyManager;
  import org.jboss.cache.factories.NodeFactory;
  import org.jboss.cache.loader.NodeData;
  
  public class StreamingStateTransferIntegrator_200 implements StateTransferIntegrator
  {
     /** Number of bytes at the beginning of the state transfer byte[]
      *  utilized by meta-information about the composition of the byte[] 
      *  (6 for stream header, 2 for version short, 
      *  3 * 4 for lengths of the state components, 2 bytes for close)   
      */
     protected static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
  
     protected Log log = LogFactory.getLog(getClass().getName());
  
     private TreeCache cache;
  
     private Fqn targetFqn;
  
     private boolean transientSet;
  
     private NodeFactory factory;
  
     private byte nodeType;
  
     private Set internalFqns;
  
     ObjectInputStream in; //used in streaming state transfer
  
     public StreamingStateTransferIntegrator_200(ObjectInputStream inputStream, Fqn targetFqn, TreeCache cache)
     {
        this.targetFqn = targetFqn;
        this.cache = cache;
        this.factory = NodeFactory.getInstance();
        this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
              ? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE
              : NodeFactory.NODE_TYPE_TREENODE;
        this.internalFqns = cache.getInternalFqns();
        in = inputStream;
     }
  
     public void integrateTransientState(DataNode target, ClassLoader cl) throws Exception
     {
        ClassLoader oldCL = setClassLoader(cl);
        try
        {
           if (log.isTraceEnabled())
              log.trace("integrating transient state for " + target);
  
           integrateTransientState(target, in);
  
           transientSet = true;
  
           if (log.isTraceEnabled())
              log.trace("transient state successfully integrated for " + targetFqn);
  
           integrateAssociatedState();
        }
        finally
        {
           if (!transientSet)
           {
              // Clear any existing state from the targetRoot
              target.clear();
              target.removeAllChildren();
           }
  
           resetClassLoader(oldCL);
        }
     }
  
     /**
      * Provided for subclasses that deal with associated state.
      * 
      * @throws Exception
      */
     protected void integrateAssociatedState() throws Exception
     {
        // no-op in this base class
     }
  
     public void integratePersistentState() throws Exception
     {
  
        try
        {
           //TODO
        }
        finally
        {
           in.close();
        }
     }
  
     private ClassLoader setClassLoader(ClassLoader newLoader)
     {
        ClassLoader oldClassLoader = null;
        if (newLoader != null)
        {
           oldClassLoader = Thread.currentThread().getContextClassLoader();
           Thread.currentThread().setContextClassLoader(newLoader);
        }
        return oldClassLoader;
     }
  
     private void resetClassLoader(ClassLoader oldLoader)
     {
        if (oldLoader != null)
           Thread.currentThread().setContextClassLoader(oldLoader);
     }
  
     private void integrateTransientState(DataNode target, ObjectInputStream in) throws IOException,
           ClassNotFoundException
     {
        Set retainedNodes = retainInternalNodes(target);
  
        target.removeAllChildren();
  
        // Read the first NodeData and integrate into our target     
        NodeData nd = (NodeData) in.readObject();
  
        //are there any transient nodes at all?
        if (!StateTransferManager.STREAMING_DELIMETER_NODE.getFqn().equals(nd.getFqn()))
        {
           Map attrs = nd.getAttributes();
           if (attrs != null)
              target.put(attrs, true);
           else
              target.clear();
  
           // Check whether this is an integration into the buddy backup
           // subtree
           Fqn tferFqn = nd.getFqn();
           Fqn tgtFqn = target.getFqn();
           boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
                 && !tferFqn.isChildOrEquals(tgtFqn);
           // If it is an integration, calculate how many levels of offset
           int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
  
           integrateStateTransferChildren(target, offset, in);
  
           integrateRetainedNodes(target, retainedNodes);
        }
     }
  
     private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
           throws IOException, ClassNotFoundException
     {
        int parent_level = parent.getFqn().size();
        int target_level = parent_level + 1;
        Fqn fqn;
        int size;
        Object name;
        NodeData nd = (NodeData) in.readObject();
        while (nd != null && !StateTransferManager.STREAMING_DELIMETER_NODE.getFqn().equals(nd.getFqn()))
        {
           fqn = nd.getFqn();
           // If we need to integrate into the buddy backup subtree,
           // change the Fqn to fit under it
           if (offset > 0)
              fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
           size = fqn.size();
           if (size <= parent_level)
              return nd;
           else if (size > target_level)
              throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
  
           name = fqn.get(size - 1);
  
           // We handle this NodeData.  Create a DataNode and
           // integrate its data            
           DataNode target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), true, cache);
           parent.addChild(name, target);
  
           // Recursively call, which will walk down the tree
           // and return the next NodeData that's a child of our parent
           nd = integrateStateTransferChildren(target, offset, in);
        }
        return null;
     }
  
     private Set retainInternalNodes(DataNode target)
     {
        Set result = new HashSet();
        Fqn targetFqn = target.getFqn();
        for (Iterator it = internalFqns.iterator(); it.hasNext();)
        {
           Fqn internalFqn = (Fqn) it.next();
           if (internalFqn.isChildOf(targetFqn))
           {
              DataNode internalNode = getInternalNode(target, internalFqn);
              if (internalNode != null)
                 result.add(internalNode);
           }
        }
  
        return result;
     }
  
     private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
     {
        Object name = internalFqn.get(parent.getFqn().size());
        DataNode result = (DataNode) parent.getChild(name);
        if (result != null)
        {
           if (internalFqn.size() < result.getFqn().size())
           {
              // need to recursively walk down the tree
              result = getInternalNode(result, internalFqn);
           }
        }
        return result;
     }
  
     private void integrateRetainedNodes(DataNode root, Set retainedNodes)
     {
        Fqn rootFqn = root.getFqn();
        for (Iterator it = retainedNodes.iterator(); it.hasNext();)
        {
           DataNode retained = (DataNode) it.next();
           if (retained.getFqn().isChildOf(rootFqn))
           {
              integrateRetainedNode(root, retained);
           }
        }
     }
  
     private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
     {
        Fqn descFqn = descendant.getFqn();
        Fqn ancFqn = ancestor.getFqn();
        Object name = descFqn.get(ancFqn.size());
        DataNode child = (DataNode) ancestor.getChild(name);
        if (ancFqn.size() == descFqn.size() + 1)
        {
           if (child == null)
           {
              ancestor.addChild(name, descendant);
           }
           else
           {
              log.warn("Received unexpected internal node " + descFqn + " in transferred state");
           }
        }
        else
        {
           if (child == null)
           {
              // Missing level -- have to create empty node
              // This shouldn't really happen -- internal fqns should
              // be immediately under the root
              child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, cache);
              ancestor.addChild(name, child);
           }
  
           // Keep walking down the tree
           integrateRetainedNode(child, descendant);
        }
     }
  
     protected TreeCache getCache()
     {
        return cache;
     }
  
     protected NodeFactory getFactory()
     {
        return factory;
     }
  
     protected byte getNodeType()
     {
        return nodeType;
     }
  
     protected Fqn getTargetFqn()
     {
        return targetFqn;
     }
  
  }
  
  
  



More information about the jboss-cvs-commits mailing list