[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Thu Aug 31 16:30:45 EDT 2006


  User: vblagojevic
  Date: 06/08/31 16:30:45

  Modified:    src/org/jboss/cache/statetransfer        
                        StateTransferGenerator_200.java
                        StateTransferIntegrator.java
                        StateTransferIntegrator_200.java
                        StreamingStateTransferGenerator_200.java
                        StateTransferManager.java
                        StreamingStateTransferIntegrator_200.java
  Added:       src/org/jboss/cache/statetransfer        
                        AbstractStateTransferGenerator.java
                        AbstractStateTransferIntegrator.java
  Log:
  state transfer refactoring
  
  Revision  Changes    Path
  1.6       +8 -240    JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferGenerator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -b -r1.5 -r1.6
  --- StateTransferGenerator_200.java	31 Aug 2006 14:56:46 -0000	1.5
  +++ StateTransferGenerator_200.java	31 Aug 2006 20:30:45 -0000	1.6
  @@ -6,258 +6,26 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import org.apache.commons.logging.Log;
  -import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.DataNode;
  -import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
  -import org.jboss.cache.Version;
  -import org.jboss.cache.loader.NodeData;
   import org.jboss.cache.util.ExposedByteArrayOutputStream;
   import org.jboss.invocation.MarshalledValueOutputStream;
   
  -import java.io.ByteArrayOutputStream;
  -import java.io.IOException;
  -import java.io.ObjectOutputStream;
  -import java.io.OutputStream;
  -import java.util.Iterator;
  -import java.util.Map;
  -import java.util.Set;
  -
  -public class StateTransferGenerator_200 implements StateTransferGenerator
  +public class StateTransferGenerator_200 extends AbstractStateTransferGenerator implements StateTransferGenerator
   {
  -   public static final short STATE_TRANSFER_VERSION = 
  -      Version.getVersionShort("2.0.0.GA");
  -   
  -   //whenever we wrap stream A with object based stream B,B writes a few bytes 
  -   //of a stream header to underlying stream A   
  -   public static final int OBJECT_STREAM_MARKER_LENGTH =4;
  -   
  -   private Log log = LogFactory.getLog(getClass().getName());
  -   
  -   private TreeCache cache;
  -   private Set internalFqns;
   
      protected StateTransferGenerator_200(TreeCache cache)
      {
  -      this.cache        = cache;
  -      this.internalFqns = cache.getInternalFqns();
  -   }
  -   
  -   public byte[] generateStateTransfer(DataNode rootNode,
  -                                       boolean generateTransient,
  -                                       boolean generatePersistent,
  -                                       boolean suppressErrors)
  -         throws Throwable
  -   {       
  -      boolean debug = log.isDebugEnabled();
  -      
  -      Fqn fqn = rootNode.getFqn();
  -      
  -      byte[][] states=new byte[3][]; // [transient][associated][persistent]
  -      states[0]=states[1]=states[2]=null;
  -      int[] sizes = new int[3];
  -      byte[] retval = null;
  -      int lastSize;
  -
  -      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(1024);
  -      try {
  -         initializeStateTransfer(baos);
  -         lastSize = baos.size();
  -      }
  -      catch (Throwable t)
  -      {
  -         log.error("failed initialing state transfer byte[]", t);
  -         if (!suppressErrors) 
  -            throw t;
  -         
  -         return null;
  -      }
  -
  -      try {
  -         
  -         if(generateTransient) {
  -        	MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos); 
  -            marshallTransientState(rootNode, out);
  -            sizes[0] = baos.size() - lastSize;
  -            lastSize = baos.size();
  -            if (debug) {
  -               log.debug("generated the in-memory state (" + sizes[0] + 
  -                         " bytes)");
  -            }
  -            
  -            // Return any state associated with the subtree but not stored in it
  -            marshallAssociatedState(fqn, baos);
  -            sizes[1] = baos.size() - lastSize;
  -            lastSize = baos.size();
  -            if (debug) {
  -               log.debug("returning the associated state (" + sizes[1] + 
  -                         " bytes)");
  -            }
  -         }
  -      }
  -      catch(Throwable t) {
  -         log.error("failed getting the in-memory (transient) state", t);
  -         if (!suppressErrors) 
  -            throw t;
  -         
  -         // Reset the byte array and see if we can continue with persistent state
  -         // TODO reconsider this -- why are errors suppressed at all?
  -         sizes[0] = sizes[1] = 0;
  -         baos.reset();
  -         try {
  -            initializeStateTransfer(baos);
  -         }
  -         catch (Throwable t1) {
  -            log.error("failed re-initializing state transfer", t1);
  -            return null;
  -         }
  -      }
  -      
  -      if (generatePersistent)
  -      {
  -         ByteArrayOutputStream out_stream = new ByteArrayOutputStream(1024);
  -         ObjectOutputStream out = new MarshalledValueOutputStream(out_stream);
  -         byte[] persState = null;
  -         boolean persistentStateProvidedOk = false;
  -         try
  -         {
  -            if (debug)
  -               log.debug("getting the persistent state from cacheloader " + cache.getCacheLoader().getClass());
  -            if (fqn.isRoot())
  -            {
  -               cache.getCacheLoader().loadEntireState(out);
  -            }
  -            else
  -            {
  -               cache.getCacheLoader().loadState(fqn, out);
  -            }
  -            persistentStateProvidedOk=true;            
  -         }
  -         catch (Throwable t)
  -         {
  -            log.error("cacheloader failed while getting the persistent state", t);
  -            if (!suppressErrors)
  -               throw t;
  +      super(cache);
            }
  -         finally
  -         {
  -            out.close();
  -            persState = out_stream.toByteArray();
  -            if (persistentStateProvidedOk && persState.length > OBJECT_STREAM_MARKER_LENGTH)
  -            {
  -               sizes[2] = persState.length;
  -               baos.write(persState);
  -            }
  -            else
  -            {
  -               sizes[2] = 0;
  -            }
  -
  -            if (debug)
  -            {
  -               log.debug("generated the persistent state (" + sizes[2] + " bytes)");
  -            }
  -         }
  -      }
  -   
  -      // Overwrite the placeholders used for the sizes of the state transfer
  -      // components with the correct values
  -      try {
  -         byte[] bytes = baos.getRawBuffer();
  -         overwriteInt(bytes, 8, sizes[0]);
  -         overwriteInt(bytes, 12, sizes[1]);
  -         overwriteInt(bytes, 16, sizes[2]);
  -         retval = bytes;
  -         
  -         log.info("returning the state for tree rooted in " + fqn.toString() +
  -                  "(" + retval.length + " bytes)");
            
  -         return retval;
  -      }
  -      catch(Throwable t) {
  -         log.error("failed serializing transient and persistent state", t);
  -         if (!suppressErrors)
  -            throw t;
  -         return null;
  -      }
  -      
  -   }
  -   
  -   /**
  -    * Write the state transfer version as well as placeholder ints for the
  -    * sizes of the components of the state transfer.
  -    */
  -   protected void initializeStateTransfer(OutputStream baos) throws IOException
  +   public byte[] generateStateTransfer(DataNode rootNode, boolean generateTransient, boolean generatePersistent,
  +         boolean suppressErrors) throws Throwable
      {
  +      int initialSize = 16*1024;
  +      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(initialSize);
         MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos);
  -      out.writeShort(STATE_TRANSFER_VERSION);
  -      // Write a placeholder for the 3 sizes we'll merge in later
  -      out.writeInt(0);
  -      out.writeInt(0);
  -      out.writeInt(0);
  -      out.close();
  -   }
  -
  -   /**
  -    * Do a preorder traversal: visit the node first, then the node's children
  -    * 
  -    * @param out
  -    * @throws Exception
  -    */
  -   protected void marshallTransientState(DataNode node, 
  -                                         ObjectOutputStream out) throws Exception 
  -   {        
  -      
  -      if (internalFqns.contains(node.getFqn()))
  -         return;
  -      
  -      Map       attrs;
  -      NodeData  nd;
  -
  -      // first handle the current node
  -      attrs=node.getData();
  -      if(attrs == null || attrs.size() == 0)
  -         nd=new NodeData(node.getFqn());
  -      else
  -         nd=new NodeData(node.getFqn(), attrs);
  -      out.writeObject(nd);
  -
  -      // then visit the children
  -      Map children = node.getChildren();
  -      if(children == null)
  -         return;
  -      for(Iterator it=children.entrySet().iterator(); it.hasNext();) {
  -         Map.Entry entry = (Map.Entry) it.next();
  -         marshallTransientState((DataNode) entry.getValue(), out);
  -      }
  -      
  -      out.close();
  -   }
  -   
  -   /**
  -    * Does nothing in this base class; can be overridden in a subclass.
  -    */
  -   protected void marshallAssociatedState(Fqn fqn,
  -                                          OutputStream baos) throws Exception 
  -   {  
  -      // no-op in this base class      
  -   }
  -   
  -   protected TreeCache getTreeCache()
  -   {
  -      return cache;
  -   }
  -   
  -   /**
  -    * Overwrites the bytes in the given array starting at the given position
  -    * with another new integer.
  -    */
  -   public static void overwriteInt(byte[] bytes, int startpos, int newVal) 
  -   {   
  -       bytes[startpos]     = (byte) (newVal >>> 24);
  -       bytes[startpos + 1] = (byte) (newVal >>> 16);
  -       bytes[startpos + 2] = (byte) (newVal >>> 8);
  -       bytes[startpos + 3] = (byte) (newVal >>> 0);
  +      streamState(out, rootNode, generateTransient, generatePersistent, suppressErrors);
  +      return baos.getRawBuffer();
      }
   }
  
  
  
  1.3       +2 -4      JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- StateTransferIntegrator.java	11 Oct 2005 20:15:15 -0000	1.2
  +++ StateTransferIntegrator.java	31 Aug 2006 20:30:45 -0000	1.3
  @@ -11,8 +11,6 @@
   public interface StateTransferIntegrator
   {
   
  -   void integrateTransientState(DataNode target, ClassLoader cl) throws Exception;
  -
  -   void integratePersistentState() throws Exception;
  +   void integrateState(DataNode target,ClassLoader cl)throws Exception;
   
   }
  \ No newline at end of file
  
  
  
  1.6       +17 -353   JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferIntegrator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -b -r1.5 -r1.6
  --- StateTransferIntegrator_200.java	31 Aug 2006 14:56:46 -0000	1.5
  +++ StateTransferIntegrator_200.java	31 Aug 2006 20:30:45 -0000	1.6
  @@ -6,374 +6,38 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import org.apache.commons.logging.Log;
  -import org.apache.commons.logging.LogFactory;
  +import java.io.ByteArrayInputStream;
  +
   import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
  -import org.jboss.cache.buddyreplication.BuddyManager;
  -import org.jboss.cache.factories.NodeFactory;
  -import org.jboss.cache.loader.CacheLoader;
  -import org.jboss.cache.loader.NodeData;
   import org.jboss.invocation.MarshalledValueInputStream;
   
  -import java.io.ByteArrayInputStream;
  -import java.io.EOFException;
  -import java.io.IOException;
  -import java.io.ObjectInputStream;
  -import java.util.HashSet;
  -import java.util.Iterator;
  -import java.util.Map;
  -import java.util.Set;
  -
  -public class StateTransferIntegrator_200 implements StateTransferIntegrator
  +public class StateTransferIntegrator_200 extends AbstractStateTransferIntegrator implements StateTransferIntegrator
   {
  -   /** Number of bytes at the beginning of the state transfer byte[]
  -    *  utilized by meta-information about the composition of the byte[] 
  -    *  (6 for stream header, 2 for version short, 
  -    *  3 * 4 for lengths of the state components, 2 bytes for close)   
  -    */
  -   protected static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
      
  -   protected Log log = LogFactory.getLog(getClass().getName());
  +   byte[] state = null;
      
  -   private TreeCache cache;
  -   private Fqn     targetFqn;
  -   private byte[]  state;
  -   private int     transientSize;
  -   private int     associatedSize;
  -   private int     persistentSize;
  -   private boolean transientSet;
  -   private NodeFactory factory;
  -   private byte nodeType;
  -   private Set internalFqns;
  -   
  -   
  -   protected StateTransferIntegrator_200(byte[] state, Fqn targetFqn,  
  -                               TreeCache cache) throws Exception
  +   protected StateTransferIntegrator_200(byte[] state, Fqn targetFqn, TreeCache cache) throws Exception
      {
  -      this.targetFqn = targetFqn;
  -      this.cache     = cache;
  +      super(targetFqn, cache);
         this.state      = state;
  -      this.factory = NodeFactory.getInstance();
  -      this.nodeType = cache.getConfiguration().isNodeLockingOptimistic() 
  -                                    ? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE 
  -                                    : NodeFactory.NODE_TYPE_TREENODE;
  -      this.internalFqns = cache.getInternalFqns();
  +   }
         
  +   public void integrateState(DataNode target, ClassLoader cl) throws Exception
  +   {
         ByteArrayInputStream bais = new ByteArrayInputStream(state);
         MarshalledValueInputStream in = new MarshalledValueInputStream(bais);
         in.readShort(); // the version, which we discard
  -      transientSize  = in.readInt();
  -      associatedSize = in.readInt();
  -      persistentSize = in.readInt();
  -      in.close();
  -      if (log.isTraceEnabled()) {
  -            log.trace("transient state: " + transientSize + " bytes");
  -            log.trace("associated state: " + associatedSize + " bytes");
  -            log.trace("persistent state: " + persistentSize + " bytes");
  -      }
  -   }
  -   
  -   public void integrateTransientState(DataNode target, ClassLoader cl) 
  -      throws Exception
  -   {
  -      if (transientSize > 0) {
  -         
  -         ClassLoader oldCL = null;         
  -         try {
  -            if (cl != null) {
  -               oldCL = Thread.currentThread().getContextClassLoader(); 
  -               Thread.currentThread().setContextClassLoader(cl);
  -            }
  -            
  -            if (log.isTraceEnabled())
  -               log.trace("integrating transient state for " + target);
  -            
  -            integrateTransientState(target);
  -            
  -            transientSet = true;
  -            
  -            if (log.isTraceEnabled())
  -               log.trace("transient state successfully integrated for " + 
  -                         targetFqn);
  -            
  -            // 3. Set the associated state.  We only do this if the normal
  -            // transient state was set.
  -            integrateAssociatedState();
  -         }
  -         finally {
  -            if (!transientSet) {
  -               // Clear any existing state from the targetRoot
  -               target.clear();
  -               target.removeAllChildren();
  -            }
  -            
  -            if (oldCL != null)
  -               Thread.currentThread().setContextClassLoader(oldCL);            
  -         }
  -      }
  -   }
  -   
  -   /**
  -    * Provided for subclasses that deal with associated state.
  -    * 
  -    * @throws Exception
  -    */
  -   protected void integrateAssociatedState() throws Exception
  -   {
  -      // no-op in this base class
  -   }
  -   
  -   public void integratePersistentState() throws Exception
  -   {
  -      if (persistentSize > 0)
  -      {
  -         CacheLoader loader = cache.getCacheLoader();
  -         if (loader == null)
  -         {
  -            log.error("cache loader is null, cannot set persistent state");
  -         }
  -         else
  -         {
  -            ByteArrayInputStream in_stream = new ByteArrayInputStream(getPersistentState());
  -            MarshalledValueInputStream in = new MarshalledValueInputStream(in_stream);
  -
  -            if (log.isTraceEnabled())
  -               log.trace("setting the persistent state with " + loader.getClass());
  -
  -            if (targetFqn.isRoot())
  -            {
  -               loader.storeEntireState(in);
  -            }
  -            else
  -            {
  -               loader.storeState(targetFqn, in);
  -            }
  -
  -            if (log.isTraceEnabled())
  -               log.trace("setting the persistent state was successful");
  -         }
  -      }
  -   }
  -   
  -   private void integrateTransientState(DataNode target)
  -         throws IOException, ClassNotFoundException
  -   {
  -      Set retainedNodes = retainInternalNodes(target);
  -      
  -      target.removeAllChildren();
         
  -      ByteArrayInputStream in_stream=new ByteArrayInputStream(state, HEADER_LENGTH, transientSize);
  -      MarshalledValueInputStream in=new MarshalledValueInputStream(in_stream);
  -      
  -      // Read the first NodeData and integrate into our target
  -      NodeData nd = (NodeData) in.readObject();
  -      Map attrs = nd.getAttributes();
  -      if (attrs != null)
  -         target.put(attrs, true);
  -      else
  -         target.clear();
  -      
  -      // Check whether this is an integration into the buddy backup subtree
  -      Fqn tferFqn = nd.getFqn();
  -      Fqn tgtFqn = target.getFqn();      
  -      boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
  -                     && !tferFqn.isChildOrEquals(tgtFqn);
  -      // If it is an integration, calculate how many levels of offset
  -      int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
  -      
  -      integrateStateTransferChildren(target, offset, in);
  -      
  -      in.close();
  -      
  -      integrateRetainedNodes(target, retainedNodes);
  -   }
  -   
  -   private NodeData integrateStateTransferChildren(DataNode parent,
  -                                                   int offset,
  -                                                   ObjectInputStream in)
  -         throws IOException, ClassNotFoundException
  -   {
  -      int parent_level = parent.getFqn().size();
  -      int target_level = parent_level + 1;
  -      Fqn fqn;
  -      int size;
  -      Object name;
         try
         {
  -         NodeData nd = (NodeData) in.readObject();
  -         while (nd != null) {            
  -            fqn = nd.getFqn();
  -            // If we need to integrate into the buddy backup subtree,
  -            // change the Fqn to fit under it
  -            if (offset > 0)
  -               fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
  -            size = fqn.size();
  -            if (size <= parent_level)
  -               return nd;
  -            else if (size > target_level)
  -               throw new IllegalStateException("NodeData " + fqn +
  -                                               " is not a direct child of " +
  -                                               parent.getFqn());
  -
  -            name = fqn.get(size - 1);
  -            
  -            // We handle this NodeData.  Create a DataNode and
  -            // integrate its data            
  -            DataNode target = factory.createDataNode(nodeType, 
  -                                                     name, 
  -                                                     fqn, 
  -                                                     parent, 
  -                                                     nd.getAttributes(), 
  -                                                     true,
  -                                                     cache);
  -            parent.addChild(name, target);
  -            
  -            // Recursively call, which will walk down the tree
  -            // and return the next NodeData that's a child of our parent
  -            nd = integrateStateTransferChildren(target, offset, in);
  -         }
  -      }
  -      catch (EOFException eof) {
  -         // all done
  -      }
  -      
  -      return null;
  +         integrateTransientState(in, target, cl);
  +         integratePersistentState(in);
      }
  -   
  -   private byte[] getPersistentState()
  -   {
  -      byte[] result = new byte[persistentSize];
  -      System.arraycopy(state, HEADER_LENGTH + transientSize + associatedSize, result, 0, persistentSize);
  -      return result;
  -   }
  -   
  -   private Set retainInternalNodes(DataNode target)
  -   {
  -      Set result = new HashSet();
  -      Fqn targetFqn = target.getFqn();
  -      for (Iterator it = internalFqns.iterator(); it.hasNext();)
  -      {
  -         Fqn internalFqn = (Fqn) it.next();
  -         if (internalFqn.isChildOf(targetFqn))
  -         {
  -            DataNode internalNode = getInternalNode(target, internalFqn);
  -            if (internalNode != null)
  -               result.add(internalNode);
  -         }
  -      }
  -      
  -      return result;
  -   }
  -   
  -   private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
  -   {
  -      Object name = internalFqn.get(parent.getFqn().size());
  -      DataNode result = (DataNode) parent.getChild(name);
  -      if (result != null)
  -      {
  -         if (internalFqn.size() < result.getFqn().size())
  -         {
  -            // need to recursively walk down the tree
  -            result = getInternalNode(result, internalFqn);
  -         }
  -      }
  -      return result;
  -   }
  -   
  -   private void integrateRetainedNodes(DataNode root, Set retainedNodes)
  -   {
  -      Fqn rootFqn = root.getFqn();
  -      for (Iterator it = retainedNodes.iterator(); it.hasNext();)
  -      {
  -         DataNode retained = (DataNode) it.next();
  -         if (retained.getFqn().isChildOf(rootFqn))
  -         { 
  -            integrateRetainedNode(root, retained);
  -         }
  -      }
  -   }
  -   
  -   private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
  -   {
  -      Fqn descFqn = descendant.getFqn();
  -      Fqn ancFqn = ancestor.getFqn();
  -      Object name = descFqn.get(ancFqn.size());
  -      DataNode child = (DataNode) ancestor.getChild(name);
  -      if (ancFqn.size() == descFqn.size() + 1)
  -      {
  -         if (child == null)
  +      finally
            {
  -            ancestor.addChild(name, descendant);
  -         }
  -         else
  -         {
  -            log.warn("Received unexpected internal node " + descFqn + 
  -                     " in transferred state");
  -         }
  -      }
  -      else
  -      {
  -         if (child == null)
  -         {
  -            // Missing level -- have to create empty node
  -            // This shouldn't really happen -- internal fqns should
  -            // be immediately under the root
  -            child = factory.createDataNode(nodeType, 
  -                                           name, 
  -                                           new Fqn(ancFqn, name), 
  -                                           ancestor, 
  -                                           null, 
  -                                           true,
  -                                           cache);
  -            ancestor.addChild(name, child);
  -         }
  -         
  -         // Keep walking down the tree
  -         integrateRetainedNode(child, descendant);
  -      }
  -   }
  -
  -   protected int getAssociatedSize()
  -   {
  -      return associatedSize;
  -   }
  -
  -   protected TreeCache getCache()
  -   {
  -      return cache;
  -   }
  -
  -   protected NodeFactory getFactory()
  -   {
  -      return factory;
  -   }
  -
  -   protected byte getNodeType()
  -   {
  -      return nodeType;
  -   }
  -
  -   protected int getPersistentSize()
  -   {
  -      return persistentSize;
  -   }
  -
  -   protected byte[] getState()
  -   {
  -      return state;
  -   }
  -
  -   protected int getTransientSize()
  -   {
  -      return transientSize;
  +         in.close();
      }
  -
  -   protected Fqn getTargetFqn()
  -   {
  -      return targetFqn;
      }
  -   
  -   
   }
  
  
  
  1.3       +6 -147    JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StreamingStateTransferGenerator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- StreamingStateTransferGenerator_200.java	31 Aug 2006 14:56:46 -0000	1.2
  +++ StreamingStateTransferGenerator_200.java	31 Aug 2006 20:30:45 -0000	1.3
  @@ -6,168 +6,27 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import java.io.ObjectOutputStream;
   import java.io.OutputStream;
  -import java.util.Iterator;
  -import java.util.Map;
  -import java.util.Set;
   
  -import org.apache.commons.logging.Log;
  -import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.DataNode;
  -import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
  -import org.jboss.cache.Version;
  -import org.jboss.cache.loader.NodeData;
   import org.jboss.invocation.MarshalledValueOutputStream;
   
  -public class StreamingStateTransferGenerator_200 implements StateTransferGenerator
  +public class StreamingStateTransferGenerator_200 extends AbstractStateTransferGenerator implements StateTransferGenerator
   {
  -   public static final short STATE_TRANSFER_VERSION = Version.getVersionShort("2.0.0.GA");
  -
  -   private Log log = LogFactory.getLog(getClass().getName());
  -
  -   private TreeCache cache;
  -
  -   private Set internalFqns;
  -
      private OutputStream os;
   
      protected StreamingStateTransferGenerator_200(OutputStream os, TreeCache cache)
      {
  -      this.cache = cache;
  +      super(cache);
         this.os = os;
  -      this.internalFqns = cache.getInternalFqns();
      }
   
      public byte[] generateStateTransfer(DataNode rootNode, boolean generateTransient, boolean generatePersistent,
            boolean suppressErrors) throws Throwable
      {
  -      Fqn fqn = rootNode.getFqn();
         MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
  -
  -      try
  -      {
  -         try
  -         {
  -            out.writeShort(STATE_TRANSFER_VERSION);
  -            if (generateTransient)
  -            {
  -               if (log.isTraceEnabled())
  -                  log.trace("writing transient state for " + fqn);
  -
  -               marshallTransientState(rootNode, out);
  -
  -               if (log.isTraceEnabled())
  -                  log.trace("transient state succesfully written");
  -
  -               if (log.isTraceEnabled())
  -                  log.trace("writing associated state");
  -
  -               marshallAssociatedState(fqn, os);
  -
  -               if (log.isTraceEnabled())
  -                  log.trace("associated state succesfully written");
  -            }
  -         }
  -         catch (Throwable t)
  -         {
  -            log.error("failed getting the in-memory (transient) state", t);
  -            if (!suppressErrors)
  -               throw t;
  -         }
  -         finally
  -         {
  -            if (log.isTraceEnabled())
  -               log.trace("writing delimeter after transient state");
  -
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  -         }
  -         try
  -         {
  -            if (generatePersistent)
  -            {
  -               if (log.isTraceEnabled())
  -                  log.trace("writing persistent state for " + fqn);
  -
  -               if (fqn.isRoot())
  -               {
  -                  cache.getCacheLoader().loadEntireState(out);
  -               }
  -               else
  -               {
  -                  cache.getCacheLoader().loadState(fqn, out);
  -               }
  -
  -               if (log.isTraceEnabled())
  -                  log.trace("persistent state succesfully written");
  -            }
  -         }
  -         catch (Throwable t)
  -         {
  -            log.error("failed getting the persistent state", t);
  -            if (!suppressErrors)
  -               throw t;
  -         }
  -         finally
  -         {
  -            if (log.isTraceEnabled())
  -               log.trace("writing delimeter after persistent state");
  -
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  -         }
  -      }
  -      finally
  -      {
  -         out.close();
  -      }
  +      streamState(out, rootNode, generateTransient, generatePersistent, suppressErrors);
         return null;
      }
  -
  -   /**
  -    * Do a preorder traversal: visit the node first, then the node's children
  -    * 
  -    * @param out
  -    * @throws Exception
  -    */
  -   protected void marshallTransientState(DataNode node, ObjectOutputStream out) throws Exception
  -   {
  -
  -      if (internalFqns.contains(node.getFqn()))
  -         return;
  -
  -      Map attrs;
  -      NodeData nd;
  -
  -      // first handle the current node
  -      attrs = node.getData();
  -      if (attrs == null || attrs.size() == 0)
  -         nd = new NodeData(node.getFqn());
  -      else
  -         nd = new NodeData(node.getFqn(), attrs);
  -      out.writeObject(nd);
  -
  -      // then visit the children
  -      Map children = node.getChildren();
  -      if (children == null)
  -         return;
  -      for (Iterator it = children.entrySet().iterator(); it.hasNext();)
  -      {
  -         Map.Entry entry = (Map.Entry) it.next();
  -         marshallTransientState((DataNode) entry.getValue(), out);
  -      }
  -   }
  -
  -   /**
  -    * Does nothing in this base class; can be overridden in a subclass.
  -    */
  -   protected void marshallAssociatedState(Fqn fqn, OutputStream baos) throws Exception
  -   {
  -      // no-op in this base class      
  -   }
  -
  -   protected TreeCache getTreeCache()
  -   {
  -      return cache;
  -   }
   }
  
  
  
  1.8       +21 -35    JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -b -r1.7 -r1.8
  --- StateTransferManager.java	31 Aug 2006 14:56:46 -0000	1.7
  +++ StateTransferManager.java	31 Aug 2006 20:30:45 -0000	1.8
  @@ -1,3 +1,9 @@
  +/*
  + * JBoss, the OpenSource J2EE webOS
  + * 
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
  + */
   package org.jboss.cache.statetransfer;
   
   import java.io.InputStream;
  @@ -328,17 +334,7 @@
            return;
         }
         
  -      byte [] new_state = null;
  -      InputStream istream = null;
  -      if(state instanceof byte[])
  -      {
  -         new_state = (byte[]) state;     
  -         log.info("received the state (size=" + new_state.length + " bytes)");
  -      }
  -      else
  -      {
  -         istream = (InputStream) state;
  -      }      
  +      boolean usingStreamTransfer = (state instanceof InputStream)?true:false;     
   
         Object owner = getOwnerForLock();
         try
  @@ -349,39 +345,29 @@
                                         true, true);
   
            StateTransferIntegrator integrator =null;
  -         if(new_state!=null)
  +         if(usingStreamTransfer)
            {         
  -            integrator = getStateTransferIntegrator(new_state,targetRoot.getFqn());
  +            integrator = getStateTransferIntegrator((InputStream)state,
  +                  targetRoot.getFqn());
            }
            else
            {
  -            integrator = getStateTransferIntegrator(istream,
  -                  targetRoot.getFqn());
  +            byte [] new_state = (byte[]) state;     
  +            log.info("received the state (size=" + new_state.length + " bytes)");
  +            integrator = getStateTransferIntegrator(new_state,targetRoot.getFqn());
            }
   
  -         // If transient state is available, integrate it
            try
            {
  -            integrator.integrateTransientState(targetRoot, cl);
  -            notifyAllNodesCreated(targetRoot);
  +            integrator.integrateState(targetRoot, cl);            
            }
            catch (Throwable t)
            {
  -            log.error("failed setting transient state", t);
  +            log.error("failed setting state", t);
            }
  -
  -         // Store any persistent state
  -         integrator.integratePersistentState();
         }
         finally
         {
  -         if(istream!=null)
  -         {
  -            try
  -            {
  -               istream.close();
  -            }catch(Exception ignored){}
  -         }
            releaseStateTransferLocks(targetRoot, owner, true);
         }
   
  
  
  
  1.3       +7 -291    JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StreamingStateTransferIntegrator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- StreamingStateTransferIntegrator_200.java	31 Aug 2006 14:56:46 -0000	1.2
  +++ StreamingStateTransferIntegrator_200.java	31 Aug 2006 20:30:45 -0000	1.3
  @@ -6,317 +6,33 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import java.io.IOException;
   import java.io.ObjectInputStream;
  -import java.util.HashSet;
  -import java.util.Iterator;
  -import java.util.Map;
  -import java.util.Set;
   
  -import org.apache.commons.logging.Log;
  -import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
  -import org.jboss.cache.buddyreplication.BuddyManager;
  -import org.jboss.cache.factories.NodeFactory;
  -import org.jboss.cache.loader.CacheLoader;
  -import org.jboss.cache.loader.NodeData;
   
  -public class StreamingStateTransferIntegrator_200 implements StateTransferIntegrator
  +public class StreamingStateTransferIntegrator_200 extends AbstractStateTransferIntegrator implements StateTransferIntegrator
   {
  -   /** Number of bytes at the beginning of the state transfer byte[]
  -    *  utilized by meta-information about the composition of the byte[] 
  -    *  (6 for stream header, 2 for version short, 
  -    *  3 * 4 for lengths of the state components, 2 bytes for close)   
  -    */
  -   protected static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
  -
  -   protected Log log = LogFactory.getLog(getClass().getName());
  -
  -   private TreeCache cache;
  -
  -   private Fqn targetFqn;
  -
  -   private boolean transientSet;
  -
  -   private NodeFactory factory;
  -
  -   private byte nodeType;
  -
  -   private Set internalFqns;
  -
      ObjectInputStream in; //used in streaming state transfer
   
  -   public StreamingStateTransferIntegrator_200(ObjectInputStream inputStream, Fqn targetFqn, TreeCache cache)
  +   protected StreamingStateTransferIntegrator_200(ObjectInputStream inputStream, Fqn targetFqn, TreeCache cache)
      {
  -      this.targetFqn = targetFqn;
  -      this.cache = cache;
  -      this.factory = NodeFactory.getInstance();
  -      this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
  -            ? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE
  -            : NodeFactory.NODE_TYPE_TREENODE;
  -      this.internalFqns = cache.getInternalFqns();
  +      super(targetFqn,cache);      
         in = inputStream;
      }
   
  -   public void integrateTransientState(DataNode target, ClassLoader cl) throws Exception
  -   {
  -      ClassLoader oldCL = setClassLoader(cl);
  -      try
  -      {
  -         if (log.isTraceEnabled())
  -            log.trace("integrating transient state for " + target);
  -
  -         integrateTransientState(target, in);
  -
  -         transientSet = true;
  -
  -         if (log.isTraceEnabled())
  -            log.trace("transient state successfully integrated for " + targetFqn);
  -
  -         integrateAssociatedState();
  -      }
  -      finally
  -      {
  -         if (!transientSet)
  -         {
  -            // Clear any existing state from the targetRoot
  -            target.clear();
  -            target.removeAllChildren();
  -         }
  -
  -         resetClassLoader(oldCL);
  -      }
  -   }
   
  -   /**
  -    * Provided for subclasses that deal with associated state.
  -    * 
  -    * @throws Exception
  -    */
  -   protected void integrateAssociatedState() throws Exception
  +   public void integrateState(DataNode target, ClassLoader cl) throws Exception
      {
  -      // no-op in this base class
  -   }
  -
  -   public void integratePersistentState() throws Exception
  -   {
  -
         try
         {
  -         CacheLoader loader = cache.getCacheLoader();
  -         if (loader == null)
  -         {
  -            log.error("cache loader is null, cannot set persistent state");
  -         }
  -         else if (targetFqn.isRoot())
  -         {
  -            if (log.isTraceEnabled())
  -               log.trace("setting the persistent state");
  -            loader.storeEntireState(in);
  -            if (log.isTraceEnabled())
  -               log.trace("setting the persistent state was successful");
  -         }
  -         else
  -         {
  -            if (log.isTraceEnabled())
  -               log.trace("setting partial persistent state at " + targetFqn);
  -            loader.storeState(targetFqn, in);
  -            if (log.isTraceEnabled())
  -               log.trace("setting partial persistent state was successful");
  -         }
  +         integrateTransientState(in, target, cl);
  +         integratePersistentState(in);
         }
         finally
         {
            in.close();
         }
      }
  -
  -   private ClassLoader setClassLoader(ClassLoader newLoader)
  -   {
  -      ClassLoader oldClassLoader = null;
  -      if (newLoader != null)
  -      {
  -         oldClassLoader = Thread.currentThread().getContextClassLoader();
  -         Thread.currentThread().setContextClassLoader(newLoader);
  -      }
  -      return oldClassLoader;
  -   }
  -
  -   private void resetClassLoader(ClassLoader oldLoader)
  -   {
  -      if (oldLoader != null)
  -         Thread.currentThread().setContextClassLoader(oldLoader);
  -   }
  -
  -   private void integrateTransientState(DataNode target, ObjectInputStream in) throws IOException,
  -         ClassNotFoundException
  -   {
  -      Set retainedNodes = retainInternalNodes(target);
  -
  -      target.removeAllChildren();
  -
  -      // Read the first NodeData and integrate into our target     
  -      NodeData nd = (NodeData) in.readObject();
  -
  -      //are there any transient nodes at all?
  -      if (!nd.isMarker())
  -      {
  -         Map attrs = nd.getAttributes();
  -         if (attrs != null)
  -            target.put(attrs, true);
  -         else
  -            target.clear();
  -
  -         // Check whether this is an integration into the buddy backup
  -         // subtree
  -         Fqn tferFqn = nd.getFqn();
  -         Fqn tgtFqn = target.getFqn();
  -         boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
  -               && !tferFqn.isChildOrEquals(tgtFqn);
  -         // If it is an integration, calculate how many levels of offset
  -         int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
  -
  -         integrateStateTransferChildren(target, offset, in);
  -
  -         integrateRetainedNodes(target, retainedNodes);
  -      }
  -   }
  -
  -   private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
  -         throws IOException, ClassNotFoundException
  -   {
  -      int parent_level = parent.getFqn().size();
  -      int target_level = parent_level + 1;
  -      Fqn fqn;
  -      int size;
  -      Object name;
  -      NodeData nd = (NodeData) in.readObject();
  -      while (nd != null && !nd.isMarker())
  -      {
  -         fqn = nd.getFqn();
  -         // If we need to integrate into the buddy backup subtree,
  -         // change the Fqn to fit under it
  -         if (offset > 0)
  -            fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
  -         size = fqn.size();
  -         if (size <= parent_level)
  -            return nd;
  -         else if (size > target_level)
  -            throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
  -
  -         name = fqn.get(size - 1);
  -
  -         // We handle this NodeData.  Create a DataNode and
  -         // integrate its data            
  -         DataNode target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), true, cache);
  -         parent.addChild(name, target);
  -
  -         // Recursively call, which will walk down the tree
  -         // and return the next NodeData that's a child of our parent
  -         nd = integrateStateTransferChildren(target, offset, in);
  -      }
  -      return null;
  -   }
  -
  -   private Set retainInternalNodes(DataNode target)
  -   {
  -      Set result = new HashSet();
  -      Fqn targetFqn = target.getFqn();
  -      for (Iterator it = internalFqns.iterator(); it.hasNext();)
  -      {
  -         Fqn internalFqn = (Fqn) it.next();
  -         if (internalFqn.isChildOf(targetFqn))
  -         {
  -            DataNode internalNode = getInternalNode(target, internalFqn);
  -            if (internalNode != null)
  -               result.add(internalNode);
  -         }
  -      }
  -
  -      return result;
  -   }
  -
  -   private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
  -   {
  -      Object name = internalFqn.get(parent.getFqn().size());
  -      DataNode result = (DataNode) parent.getChild(name);
  -      if (result != null)
  -      {
  -         if (internalFqn.size() < result.getFqn().size())
  -         {
  -            // need to recursively walk down the tree
  -            result = getInternalNode(result, internalFqn);
  -         }
  -      }
  -      return result;
  -   }
  -
  -   private void integrateRetainedNodes(DataNode root, Set retainedNodes)
  -   {
  -      Fqn rootFqn = root.getFqn();
  -      for (Iterator it = retainedNodes.iterator(); it.hasNext();)
  -      {
  -         DataNode retained = (DataNode) it.next();
  -         if (retained.getFqn().isChildOf(rootFqn))
  -         {
  -            integrateRetainedNode(root, retained);
  -         }
  -      }
  -   }
  -
  -   private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
  -   {
  -      Fqn descFqn = descendant.getFqn();
  -      Fqn ancFqn = ancestor.getFqn();
  -      Object name = descFqn.get(ancFqn.size());
  -      DataNode child = (DataNode) ancestor.getChild(name);
  -      if (ancFqn.size() == descFqn.size() + 1)
  -      {
  -         if (child == null)
  -         {
  -            ancestor.addChild(name, descendant);
  -         }
  -         else
  -         {
  -            log.warn("Received unexpected internal node " + descFqn + " in transferred state");
  -         }
  -      }
  -      else
  -      {
  -         if (child == null)
  -         {
  -            // Missing level -- have to create empty node
  -            // This shouldn't really happen -- internal fqns should
  -            // be immediately under the root
  -            child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, cache);
  -            ancestor.addChild(name, child);
  -         }
  -
  -         // Keep walking down the tree
  -         integrateRetainedNode(child, descendant);
  -      }
  -   }
  -
  -   protected TreeCache getCache()
  -   {
  -      return cache;
  -   }
  -
  -   protected NodeFactory getFactory()
  -   {
  -      return factory;
  -   }
  -
  -   protected byte getNodeType()
  -   {
  -      return nodeType;
  -   }
  -
  -   protected Fqn getTargetFqn()
  -   {
  -      return targetFqn;
  -   }
  -
   }
  
  
  
  1.1      date: 2006/08/31 20:30:45;  author: vblagojevic;  state: Exp;JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferGenerator.java
  
  Index: AbstractStateTransferGenerator.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   * 
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.cache.statetransfer;
  
  import java.io.ObjectOutputStream;
  import java.util.Iterator;
  import java.util.Map;
  import java.util.Set;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jboss.cache.DataNode;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.TreeCache;
  import org.jboss.cache.Version;
  import org.jboss.cache.loader.NodeData;
  
  public class AbstractStateTransferGenerator
  {
  
     public static final short STATE_TRANSFER_VERSION = Version.getVersionShort("2.0.0.GA");
  
     private Log log = LogFactory.getLog(getClass().getName());
  
     private TreeCache cache;
  
     private Set internalFqns;
  
     protected AbstractStateTransferGenerator(TreeCache cache)
     {
        this.cache = cache;
        this.internalFqns = cache.getInternalFqns();
     }
  
     protected void streamState(ObjectOutputStream out, DataNode rootNode, boolean generateTransient,
           boolean generatePersistent, boolean suppressErrors) throws Throwable
     {
        Fqn fqn = rootNode.getFqn();
        try
        {
           
           //transient + marker
           try
           {
              out.writeShort(STATE_TRANSFER_VERSION);
              if (generateTransient)
              {
                 if (log.isTraceEnabled())
                    log.trace("writing transient state for " + fqn);
  
                 marshallTransientState(rootNode, out);
  
                 if (log.isTraceEnabled())
                    log.trace("transient state succesfully written");              
              }
           }
           catch (Throwable t)
           {
              log.error("failed getting the in-memory (transient) state", t);
              if (!suppressErrors)
                 throw t;
           }
           finally
           {
              if (log.isTraceEnabled())
                 log.trace("writing delimeter after transient state");
  
              out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
           }
           
           //associated + marker
           try
           {
              if (log.isTraceEnabled())
                 log.trace("writing associated state");
  
              marshallAssociatedState(fqn, out);
  
              if (log.isTraceEnabled())
                 log.trace("associated state succesfully written");
           }
           catch (Throwable t)
           {
              log.error("failed writing associated state", t);
              if (!suppressErrors)
                 throw t;
           }
           finally
           {
              if (log.isTraceEnabled())
                 log.trace("writing delimeter after associated state");
  
              out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
           }        
           
           //persistent + marker
           try
           {
              if (generatePersistent)
              {
                 if (log.isTraceEnabled())
                    log.trace("writing persistent state for " + 
                          fqn + ",using " + cache.getCacheLoader().getClass());
  
                 if (fqn.isRoot())
                 {
                    cache.getCacheLoader().loadEntireState(out);
                 }
                 else
                 {
                    cache.getCacheLoader().loadState(fqn, out);
                 }
  
                 if (log.isTraceEnabled())
                    log.trace("persistent state succesfully written");
              }
           }
           catch (Throwable t)
           {
              log.error("failed getting the persistent state", t);
              if (!suppressErrors)
                 throw t;
           }
           finally
           {
              if (log.isTraceEnabled())
                 log.trace("writing delimeter after persistent state");
  
              out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
           }
        }
        finally
        {
           out.close();
        }
     }
  
     /**
      * Do a preorder traversal: visit the node first, then the node's children
      * 
      * @param out
      * @throws Exception
      */
     protected void marshallTransientState(DataNode node, ObjectOutputStream out) throws Exception
     {
  
        if (internalFqns.contains(node.getFqn()))
           return;
  
        Map attrs;
        NodeData nd;
  
        // first handle the current node
        attrs = node.getData();
        if (attrs == null || attrs.size() == 0)
           nd = new NodeData(node.getFqn());
        else
           nd = new NodeData(node.getFqn(), attrs);
        out.writeObject(nd);
  
        // then visit the children
        Map children = node.getChildren();
        if (children == null)
           return;
        for (Iterator it = children.entrySet().iterator(); it.hasNext();)
        {
           Map.Entry entry = (Map.Entry) it.next();
           marshallTransientState((DataNode) entry.getValue(), out);
        }
     }
  
     /**
      * Does nothing in this base class; can be overridden in a subclass.
      */
     protected void marshallAssociatedState(Fqn fqn, ObjectOutputStream baos) throws Exception
     {
        // no-op in this base class      
     }
     
     protected TreeCache getTreeCache()
     {
        return cache;
     }
  
  }
  
  
  
  1.1      date: 2006/08/31 20:30:45;  author: vblagojevic;  state: Exp;JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferIntegrator.java
  
  Index: AbstractStateTransferIntegrator.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   * 
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.cache.statetransfer;
  
  import java.io.IOException;
  import java.io.ObjectInputStream;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.Map;
  import java.util.Set;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jboss.cache.DataNode;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.TreeCache;
  import org.jboss.cache.buddyreplication.BuddyManager;
  import org.jboss.cache.factories.NodeFactory;
  import org.jboss.cache.loader.CacheLoader;
  import org.jboss.cache.loader.NodeData;
  
  public class AbstractStateTransferIntegrator
  {   
  
     protected Log log = LogFactory.getLog(getClass().getName());
  
     private TreeCache cache;
  
     private Fqn targetFqn;
  
     private NodeFactory factory;
  
     private byte nodeType;
  
     private Set internalFqns;  
  
     public AbstractStateTransferIntegrator(Fqn targetFqn, TreeCache cache)
     {
        this.targetFqn = targetFqn;
        this.cache = cache;
        this.factory = NodeFactory.getInstance();
        this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
              ? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE
              : NodeFactory.NODE_TYPE_TREENODE;
        this.internalFqns = cache.getInternalFqns();     
     }
  
     protected void integrateTransientState(ObjectInputStream in,DataNode target, ClassLoader cl) throws Exception
     {
        boolean transientSet = false;
        ClassLoader oldCL = setClassLoader(cl);
        try
        {
           if (log.isTraceEnabled())
              log.trace("integrating transient state for " + target);
  
           integrateTransientState(target, in);
  
           transientSet = true;
  
           if (log.isTraceEnabled())
              log.trace("transient state successfully integrated for " + targetFqn);
  
           integrateAssociatedState(in);
           notifyAllNodesCreated(target);
        }
        finally
        {
           if (!transientSet)
           {
              // Clear any existing state from the targetRoot
              target.clear();
              target.removeAllChildren();
           }
  
           resetClassLoader(oldCL);
        }
     }
  
     /**
      * Provided for subclasses that deal with associated state.
      * 
      * @throws Exception
      */
     protected void integrateAssociatedState(ObjectInputStream in) throws Exception
     {
        // no-op in this base class 
        // just read marker
        in.readObject();      
     }
  
     protected void integratePersistentState(ObjectInputStream in) throws Exception
     {
  
        CacheLoader loader = cache.getCacheLoader();
        if (loader == null)
        {
           log.error("cache loader is null, cannot set persistent state");
        }
        else
        {
           if (log.isTraceEnabled())
              log.trace("setting the persistent state using " + loader.getClass());
           if (targetFqn.isRoot())
           {            
              loader.storeEntireState(in);           
           }
           else
           {          
              loader.storeState(targetFqn, in);            
           }
           if (log.isTraceEnabled())
              log.trace("setting persistent state was successful");
        }
     }        
     
     /**
      * Generates NodeAdded notifications for all nodes of the tree. This is
      * called whenever the tree is initially retrieved (state transfer)
      */
     private void notifyAllNodesCreated(DataNode curr)
     {
        DataNode n;
        Map children;
  
        if (curr == null) return;
        getCache().getNotifier().notifyNodeCreated(curr.getFqn(), true);
        getCache().getNotifier().notifyNodeCreated(curr.getFqn(), false);
        if ((children = curr.getChildren()) != null)
        {
           for (Iterator it = children.values().iterator(); it.hasNext();)
           {
              n = (DataNode) it.next();
              notifyAllNodesCreated(n);
           }
        }
     }
  
     private ClassLoader setClassLoader(ClassLoader newLoader)
     {
        ClassLoader oldClassLoader = null;
        if (newLoader != null)
        {
           oldClassLoader = Thread.currentThread().getContextClassLoader();
           Thread.currentThread().setContextClassLoader(newLoader);
        }
        return oldClassLoader;
     }
  
     private void resetClassLoader(ClassLoader oldLoader)
     {
        if (oldLoader != null)
           Thread.currentThread().setContextClassLoader(oldLoader);
     }
  
     private void integrateTransientState(DataNode target, ObjectInputStream in) throws IOException,
           ClassNotFoundException
     {
        Set retainedNodes = retainInternalNodes(target);
  
        target.removeAllChildren();
  
        // Read the first NodeData and integrate into our target     
        NodeData nd = (NodeData) in.readObject();
  
        //are there any transient nodes at all?
        if (!nd.isMarker())
        {
           Map attrs = nd.getAttributes();
           if (attrs != null)
              target.put(attrs, true);
           else
              target.clear();
  
           // Check whether this is an integration into the buddy backup
           // subtree
           Fqn tferFqn = nd.getFqn();
           Fqn tgtFqn = target.getFqn();
           boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
                 && !tferFqn.isChildOrEquals(tgtFqn);
           // If it is an integration, calculate how many levels of offset
           int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
  
           integrateStateTransferChildren(target, offset, in);
  
           integrateRetainedNodes(target, retainedNodes);
        }
     }
  
     private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
           throws IOException, ClassNotFoundException
     {
        int parent_level = parent.getFqn().size();
        int target_level = parent_level + 1;
        Fqn fqn;
        int size;
        Object name;
        NodeData nd = (NodeData) in.readObject();
        while (nd != null && !nd.isMarker())
        {
           fqn = nd.getFqn();
           // If we need to integrate into the buddy backup subtree,
           // change the Fqn to fit under it
           if (offset > 0)
              fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
           size = fqn.size();
           if (size <= parent_level)
              return nd;
           else if (size > target_level)
              throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
  
           name = fqn.get(size - 1);
  
           // We handle this NodeData.  Create a DataNode and
           // integrate its data            
           DataNode target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), true, cache);
           parent.addChild(name, target);
  
           // Recursively call, which will walk down the tree
           // and return the next NodeData that's a child of our parent
           nd = integrateStateTransferChildren(target, offset, in);
        }
        return null;
     }
  
     private Set retainInternalNodes(DataNode target)
     {
        Set result = new HashSet();
        Fqn targetFqn = target.getFqn();
        for (Iterator it = internalFqns.iterator(); it.hasNext();)
        {
           Fqn internalFqn = (Fqn) it.next();
           if (internalFqn.isChildOf(targetFqn))
           {
              DataNode internalNode = getInternalNode(target, internalFqn);
              if (internalNode != null)
                 result.add(internalNode);
           }
        }
  
        return result;
     }
  
     private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
     {
        Object name = internalFqn.get(parent.getFqn().size());
        DataNode result = (DataNode) parent.getChild(name);
        if (result != null)
        {
           if (internalFqn.size() < result.getFqn().size())
           {
              // need to recursively walk down the tree
              result = getInternalNode(result, internalFqn);
           }
        }
        return result;
     }
  
     private void integrateRetainedNodes(DataNode root, Set retainedNodes)
     {
        Fqn rootFqn = root.getFqn();
        for (Iterator it = retainedNodes.iterator(); it.hasNext();)
        {
           DataNode retained = (DataNode) it.next();
           if (retained.getFqn().isChildOf(rootFqn))
           {
              integrateRetainedNode(root, retained);
           }
        }
     }
  
     private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
     {
        Fqn descFqn = descendant.getFqn();
        Fqn ancFqn = ancestor.getFqn();
        Object name = descFqn.get(ancFqn.size());
        DataNode child = (DataNode) ancestor.getChild(name);
        if (ancFqn.size() == descFqn.size() + 1)
        {
           if (child == null)
           {
              ancestor.addChild(name, descendant);
           }
           else
           {
              log.warn("Received unexpected internal node " + descFqn + " in transferred state");
           }
        }
        else
        {
           if (child == null)
           {
              // Missing level -- have to create empty node
              // This shouldn't really happen -- internal fqns should
              // be immediately under the root
              child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, cache);
              ancestor.addChild(name, child);
           }
  
           // Keep walking down the tree
           integrateRetainedNode(child, descendant);
        }
     }
  
     protected TreeCache getCache()
     {
        return cache;
     }
  
     protected NodeFactory getFactory()
     {
        return factory;
     }
  
     protected byte getNodeType()
     {
        return nodeType;
     }
  
     protected Fqn getTargetFqn()
     {
        return targetFqn;
     }
  
  }
  
  
  



More information about the jboss-cvs-commits mailing list