[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Manik Surtani msurtani at jboss.com
Thu Dec 14 12:18:48 EST 2006


  User: msurtani
  Date: 06/12/14 12:18:48

  Modified:    src/org/jboss/cache/statetransfer     
                        StateTransferGenerator.java
                        StateTransferManager.java
                        DefaultStateTransferGenerator.java
                        StateTransferIntegrator.java
                        DefaultStateTransferIntegrator.java
  Log:
  The beginnings of porting JBCACHE-871 and JBCACHE-875 as well as rearranging the Node/Cache object model to something sensible
  
  Revision  Changes    Path
  1.4       +5 -5      JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferGenerator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -b -r1.3 -r1.4
  --- StateTransferGenerator.java	11 Sep 2006 21:53:19 -0000	1.3
  +++ StateTransferGenerator.java	14 Dec 2006 17:18:48 -0000	1.4
  @@ -6,14 +6,14 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import java.io.ObjectOutputStream;
  +import org.jboss.cache.Node;
   
  -import org.jboss.cache.DataNode;
  +import java.io.ObjectOutputStream;
   
   public interface StateTransferGenerator
   {
     
  -   public void generateState(ObjectOutputStream stream, DataNode rootNode, boolean generateTransient,
  +   public void generateState(ObjectOutputStream stream, Node rootNode, boolean generateTransient,
            boolean generatePersistent, boolean suppressErrors) throws Throwable;
   
   }
  \ No newline at end of file
  
  
  
  1.17      +61 -49    JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -b -r1.16 -r1.17
  --- StateTransferManager.java	7 Dec 2006 22:21:48 -0000	1.16
  +++ StateTransferManager.java	14 Dec 2006 17:18:48 -0000	1.17
  @@ -6,14 +6,8 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import java.io.ByteArrayInputStream;
  -import java.io.InputStream;
  -import java.io.ObjectInputStream;
  -import java.io.OutputStream;
  -
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  -import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.Node;
   import org.jboss.cache.TreeCache;
  @@ -28,6 +22,11 @@
   import org.jboss.util.stream.MarshalledValueInputStream;
   import org.jboss.util.stream.MarshalledValueOutputStream;
   
  +import java.io.ByteArrayInputStream;
  +import java.io.InputStream;
  +import java.io.ObjectInputStream;
  +import java.io.OutputStream;
  +
   public class StateTransferManager
   {
      protected final static Log log = LogFactory.getLog(StateTransferManager.class);
  @@ -101,8 +100,10 @@
            if (marshaller_.isInactive(fqn.toString()))
            {
               if (log.isDebugEnabled())
  +            {
                  log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
  -            if(usingStreamingStateTransfer)
  +            }
  +            if (usingStreamingStateTransfer)
               {
                  MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
                  out.writeBoolean(false);
  @@ -112,9 +113,11 @@
            }         
         }
   
  -      DataNode rootNode = cache.findNode(fqn);
  +      Node rootNode = cache.findNode(fqn);
         if (rootNode == null)
  +      {
            return null;
  +      }
   
         boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState();
         CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
  @@ -131,7 +134,7 @@
            }
          
            MarshalledValueOutputStream out = null;
  -         byte resultBuffer [] = new byte[0];
  +         byte resultBuffer[] = new byte[0];
            StateTransferGenerator generator = getStateTransferGenerator(); 
            long startTime = System.currentTimeMillis();
            if (usingStreamingStateTransfer)
  @@ -142,12 +145,12 @@
            }
            else
            {         
  -            ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16*1024);
  +            ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
               out = new MarshalledValueOutputStream(baos);            
               generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
               resultBuffer = baos.getRawBuffer();
            }  
  -         log.info("Successfully generated state in " + (System.currentTimeMillis()-startTime) + " msec");
  +         log.info("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
            return resultBuffer;
         }
         finally
  @@ -162,9 +165,8 @@
      }
   
      /**
  -    * 
       * TODO change this javadoc
  -    * 
  +    * <p/>
       * Requests state from each of the given source nodes in the cluster
       * until it gets it or no node replies with a timeout exception.  If state
       * is returned, integrates it into the given DataNode.  If no state is
  @@ -183,7 +185,7 @@
                            Object[] sources, ClassLoader cl)
              throws Exception
      {
  -      treeCache.fetchPartialState(sources, subtreeRoot,integrationRoot.getFqn());
  +      treeCache.fetchPartialState(sources, subtreeRoot, integrationRoot.getFqn());
      }   
   
      /**
  @@ -206,7 +208,7 @@
              throws Exception
      {
         TreeCache cache = getTreeCache();
  -      DataNode target = cache.findNode(targetRoot);
  +      Node target = cache.findNode(targetRoot);
         if (target == null)
         {
            // Create the integration root, but do not replicate
  @@ -234,7 +236,7 @@
       * @param cl         classloader to use to unmarshal the state, or
       *                   <code>null</code> if the TCCL should be used
       */
  -   private void setState(Object state, DataNode targetRoot, ClassLoader cl)
  +   private void setState(Object state, Node targetRoot, ClassLoader cl)
              throws Exception
      {
         if (state == null)
  @@ -254,7 +256,7 @@
                    true, true);
   
            StateTransferIntegrator integrator = null;
  -         MarshalledValueInputStream in =null;
  +         MarshalledValueInputStream in = null;
            if (usingStreamTransfer)
            {
               in =  (MarshalledValueInputStream) state;           
  @@ -284,8 +286,8 @@
            try
            {
               log.info("starting state integration at node " + targetRoot);
  -            integrator.integrateState(in,targetRoot, cl);
  -            log.info("successfully integrated state in " + (System.currentTimeMillis()-startTime) + " msec");
  +            integrator.integrateState(in, targetRoot, cl);
  +            log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
            }
            catch (Throwable t)
            {
  @@ -303,7 +305,7 @@
      /**
       * Acquires locks on a root node for an owner for state transfer.
       */
  -   protected void acquireLocksForStateTransfer(DataNode root,
  +   protected void acquireLocksForStateTransfer(Node root,
                                                  Object lockOwner,
                                                  long timeout,
                                                  boolean lockChildren,
  @@ -313,10 +315,14 @@
         try
         {
            if (lockChildren)
  +         {
               root.getNodeSPI().getLock().acquireAll(lockOwner, timeout, NodeLock.LockType.READ);
  +         }
            else
  +         {
               root.getNodeSPI().getLock().acquire(lockOwner, timeout, NodeLock.LockType.READ);
         }
  +      }
         catch (TimeoutException te)
         {
            log.error("Caught TimeoutException acquiring locks on region " +
  @@ -324,7 +330,7 @@
            if (force)
            {
               // Until we have FLUSH in place, don't force locks
  -//            forceAcquireLock(root, lockOwner, lockChildren);
  +            //            forceAcquireLock(root, lockOwner, lockChildren);
               throw te;
   
            }
  @@ -340,17 +346,21 @@
       *
       * @see #acquireLocksForStateTransfer
       */
  -   protected void releaseStateTransferLocks(DataNode root,
  +   protected void releaseStateTransferLocks(Node root,
                                               Object lockOwner,
                                               boolean childrenLocked)
      {
         try
         {
            if (childrenLocked)
  +         {
               root.getNodeSPI().getLock().releaseAll(lockOwner);
  +         }
            else
  +         {
               root.getNodeSPI().getLock().release(lockOwner);
         }
  +      }
         catch (Throwable t)
         {
            log.error("failed releasing locks", t);
  @@ -375,7 +385,9 @@
      {
         Object owner = getTreeCache().getCurrentTransaction();
         if (owner == null)
  +      {
            owner = Thread.currentThread();
  +      }
   
         return owner;
      }
  
  
  
  1.6       +34 -13    JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferGenerator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -b -r1.5 -r1.6
  --- DefaultStateTransferGenerator.java	20 Nov 2006 03:53:55 -0000	1.5
  +++ DefaultStateTransferGenerator.java	14 Dec 2006 17:18:48 -0000	1.6
  @@ -6,20 +6,21 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import java.io.ObjectOutputStream;
  -import java.util.Iterator;
  -import java.util.Map;
  -import java.util.Set;
  -
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
  +import org.jboss.cache.Node;
   import org.jboss.cache.TreeCache;
   import org.jboss.cache.Version;
   import org.jboss.cache.loader.CacheLoader;
  -import org.jboss.cache.loader.NodeDataExceptionMarker;
   import org.jboss.cache.loader.NodeData;
  +import org.jboss.cache.loader.NodeDataExceptionMarker;
  +
  +import java.io.ObjectOutputStream;
  +import java.util.Iterator;
  +import java.util.Map;
  +import java.util.Set;
   
   public class DefaultStateTransferGenerator implements StateTransferGenerator
   {
  @@ -38,7 +39,7 @@
         this.internalFqns = cache.getInternalFqns();
      }
   
  -   public void generateState(ObjectOutputStream out, DataNode rootNode, boolean generateTransient,
  +   public void generateState(ObjectOutputStream out, Node rootNode, boolean generateTransient,
            boolean generatePersistent, boolean suppressErrors) throws Throwable
      {
         Fqn fqn = rootNode.getFqn();
  @@ -51,23 +52,31 @@
            {
               //transient + marker
               if (log.isTraceEnabled())
  +            {
                  log.trace("writing transient state for " + fqn);
  +            }
   
               marshallTransientState(rootNode, out);
               out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
   
               if (log.isTraceEnabled())
  +            {
                  log.trace("transient state succesfully written");
  +            }
   
               //associated + marker
               if (log.isTraceEnabled())
  +            {
                  log.trace("writing associated state");
  +            }
   
               marshallAssociatedState(fqn, out);
               out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
   
               if (log.isTraceEnabled())
  +            {
                  log.trace("associated state succesfully written");
  +            }
   
            }
            
  @@ -75,7 +84,9 @@
            if (cacheLoader != null && generatePersistent)
            {
               if (log.isTraceEnabled())
  +            {
                  log.trace("writing persistent state for " + fqn + ",using " + cache.getCacheLoader().getClass());
  +            }
   
               if (fqn.isRoot())
               {
  @@ -87,15 +98,17 @@
               }
   
               if (log.isTraceEnabled())
  +            {
                  log.trace("persistent state succesfully written");
            }
  +         }
            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
         }
         catch (Throwable t)
         {
            encouteredException = t;
            log.error("failed writing state", t);
  -         out.writeObject(new NodeDataExceptionMarker(t,cache.getLocalAddress()));
  +         out.writeObject(new NodeDataExceptionMarker(t, cache.getLocalAddress()));
         }
         finally
         {
  @@ -113,11 +126,13 @@
       * @param out
       * @throws Exception
       */
  -   protected void marshallTransientState(DataNode node, ObjectOutputStream out) throws Exception
  +   protected void marshallTransientState(Node node, ObjectOutputStream out) throws Exception
      {
   
         if (internalFqns.contains(node.getFqn()))
  +      {
            return;
  +      }
   
         Map attrs;
         NodeData nd;
  @@ -125,15 +140,21 @@
         // first handle the current node
         attrs = node.getData();
         if (attrs == null || attrs.size() == 0)
  +      {
            nd = new NodeData(node.getFqn());
  +      }
         else
  +      {
            nd = new NodeData(node.getFqn(), attrs);
  +      }
         out.writeObject(nd);
   
         // then visit the children
         Map children = node.getNodeSPI().getChildrenMap();
         if (children == null)
  +      {
            return;
  +      }
         for (Iterator it = children.entrySet().iterator(); it.hasNext();)
         {
            Map.Entry entry = (Map.Entry) it.next();
  
  
  
  1.5       +4 -4      JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -b -r1.4 -r1.5
  --- StateTransferIntegrator.java	11 Sep 2006 21:53:19 -0000	1.4
  +++ StateTransferIntegrator.java	14 Dec 2006 17:18:48 -0000	1.5
  @@ -6,13 +6,13 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import java.io.ObjectInputStream;
  +import org.jboss.cache.Node;
   
  -import org.jboss.cache.DataNode;
  +import java.io.ObjectInputStream;
   
   public interface StateTransferIntegrator
   {
      
  -   void integrateState(ObjectInputStream ois, DataNode target,ClassLoader cl)throws Exception;
  +   void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception;
   
   }
  \ No newline at end of file
  
  
  
  1.7       +78 -59    JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -b -r1.6 -r1.7
  --- DefaultStateTransferIntegrator.java	23 Nov 2006 19:43:05 -0000	1.6
  +++ DefaultStateTransferIntegrator.java	14 Dec 2006 17:18:48 -0000	1.7
  @@ -6,13 +6,6 @@
    */
   package org.jboss.cache.statetransfer;
   
  -import java.io.IOException;
  -import java.io.ObjectInputStream;
  -import java.util.HashSet;
  -import java.util.Iterator;
  -import java.util.Map;
  -import java.util.Set;
  -
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.CacheException;
  @@ -26,6 +19,12 @@
   import org.jboss.cache.loader.NodeData;
   import org.jboss.cache.loader.NodeDataExceptionMarker;
   
  +import java.io.IOException;
  +import java.io.ObjectInputStream;
  +import java.util.HashSet;
  +import java.util.Iterator;
  +import java.util.Set;
  +
   public class DefaultStateTransferIntegrator implements StateTransferIntegrator
   {   
   
  @@ -52,7 +51,7 @@
         this.internalFqns = cache.getInternalFqns();     
      }
      
  -   public void integrateState(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
  +   public void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception
      {
         Throwable cause = null;     
         try
  @@ -61,36 +60,40 @@
            integrateAssociatedState(ois);
            integratePersistentState(ois);
         }     
  -      catch(Throwable t)
  +      catch (Throwable t)
         {
            cause = t;
  -         log.error("Failed integrating state.",t);
  +         log.error("Failed integrating state.", t);
         }                        
         finally
         {
            ois.close();
            if (cause != null)
            {
  -            throw new Exception("State transfer failed ",cause);
  +            throw new Exception("State transfer failed ", cause);
            }
         }
      }
   
  -   protected void integrateTransientState(ObjectInputStream in,DataNode target, ClassLoader cl) throws Exception
  +   protected void integrateTransientState(ObjectInputStream in, Node target, ClassLoader cl) throws Exception
      {
         boolean transientSet = false;
         ClassLoader oldCL = setClassLoader(cl);
         try
         {
            if (log.isTraceEnabled())
  +         {
               log.trace("integrating transient state for " + target);
  +         }
   
            integrateTransientState(target, in);
   
            transientSet = true;
   
            if (log.isTraceEnabled())
  +         {
               log.trace("transient state successfully integrated");
  +         }
            
            notifyAllNodesCreated(target);
         }
  @@ -101,7 +104,7 @@
               // Clear any existing state from the targetRoot
               log.warn("transient state integration failed, removing all children of " + target);
               target.clearData();
  -            target.removeChildren();
  +            ((DataNode) target).removeChildren();
            }
   
            resetClassLoader(oldCL);
  @@ -127,12 +130,16 @@
         if (loader == null)
         {
            if (log.isTraceEnabled())
  +         {
               log.trace("cache loader is null, will not attempt to integrate persistent state");
         }
  +      }
         else
         {
            if (log.isTraceEnabled())
  +         {
               log.trace("integrating persistent state using " + loader.getClass().getName());
  +         }
            
            boolean persistentSet = false;
            try
  @@ -163,11 +170,13 @@
               else
               {
                  if (log.isTraceEnabled())
  +               {
                     log.trace("persistent state integrated successfully");
               }
            }         
         }
      }   
  +   }
      
      protected TreeCache getCache()
      {
  @@ -193,7 +202,7 @@
       * Generates NodeAdded notifications for all nodes of the tree. This is
       * called whenever the tree is initially retrieved (state transfer)
       */
  -   private void notifyAllNodesCreated(DataNode curr)
  +   private void notifyAllNodesCreated(Node curr)
      {
         DataNode n;
   
  @@ -222,15 +231,17 @@
      private void resetClassLoader(ClassLoader oldLoader)
      {
         if (oldLoader != null)
  +      {
            Thread.currentThread().setContextClassLoader(oldLoader);
      }
  +   }
   
  -   private void integrateTransientState(DataNode target, ObjectInputStream in) throws IOException,
  +   private void integrateTransientState(Node target, ObjectInputStream in) throws IOException,
            ClassNotFoundException
      {
         Set retainedNodes = retainInternalNodes(target);
   
  -      target.removeChildren();
  +      ((DataNode) target).removeChildren();
   
         // Read the first NodeData and integrate into our target     
         NodeData nd = readNode(in);
  @@ -267,7 +278,7 @@
         return nd;
      }
   
  -   private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
  +   private NodeData integrateStateTransferChildren(Node parent, int offset, ObjectInputStream in)
            throws IOException, ClassNotFoundException
      {
         int parent_level = parent.getFqn().size();
  @@ -282,19 +293,25 @@
            // If we need to integrate into the buddy backup subtree,
            // change the Fqn to fit under it
            if (offset > 0)
  +         {
               fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
  +         }
            size = fqn.size();
            if (size <= parent_level)
  +         {
               return nd;
  +         }
            else if (size > target_level)
  +         {
               throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
  +         }
   
            name = fqn.get(size - 1);
   
            // We handle this NodeData.  Create a DataNode and
            // integrate its data            
  -         DataNode target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), false, null, cache.getCacheSPI());
  -         parent.addChild(name, target);
  +         Node target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), false, null, cache.getCacheSPI());
  +         ((DataNode) parent).addChild(name, target);
   
            // Recursively call, which will walk down the tree
            // and return the next NodeData that's a child of our parent
  @@ -303,7 +320,7 @@
         return null;
      }
   
  -   private Set retainInternalNodes(DataNode target)
  +   private Set retainInternalNodes(Node target)
      {
         Set result = new HashSet();
         Fqn targetFqn = target.getFqn();
  @@ -314,17 +331,19 @@
            {
               DataNode internalNode = getInternalNode(target, internalFqn);
               if (internalNode != null)
  +            {
                  result.add(internalNode);
            }
         }
  +      }
   
         return result;
      }
   
  -   private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
  +   private DataNode getInternalNode(Node parent, Fqn internalFqn)
      {
         Object name = internalFqn.get(parent.getFqn().size());
  -      DataNode result = (DataNode) parent.getChild(name);
  +      DataNode result = (DataNode) parent.getChild(new Fqn(name));
         if (result != null)
         {
            if (internalFqn.size() < result.getFqn().size())
  @@ -336,7 +355,7 @@
         return result;
      }
   
  -   private void integrateRetainedNodes(DataNode root, Set retainedNodes)
  +   private void integrateRetainedNodes(Node root, Set retainedNodes)
      {
         Fqn rootFqn = root.getFqn();
         for (Iterator it = retainedNodes.iterator(); it.hasNext();)
  @@ -349,17 +368,17 @@
         }
      }
   
  -   private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
  +   private void integrateRetainedNode(Node ancestor, DataNode descendant)
      {
         Fqn descFqn = descendant.getFqn();
         Fqn ancFqn = ancestor.getFqn();
         Object name = descFqn.get(ancFqn.size());
  -      DataNode child = (DataNode) ancestor.getChild(name);
  +      DataNode child = (DataNode) ancestor.getChild(new Fqn(name));
         if (ancFqn.size() == descFqn.size() + 1)
         {
            if (child == null)
            {
  -            ancestor.addChild(name, descendant);
  +            ((DataNode) ancestor).addChild(name, descendant);
            }
            else
            {
  @@ -373,8 +392,8 @@
               // Missing level -- have to create empty node
               // This shouldn't really happen -- internal fqns should
               // be immediately under the root
  -            child = (DataNode) factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, null, cache.getCacheSPI());
  -            ancestor.addChild(name, child);
  +            child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, null, cache.getCacheSPI());
  +            ((DataNode) ancestor).addChild(name, child);
            }
   
            // Keep walking down the tree
  
  
  



More information about the jboss-cvs-commits mailing list