[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Manik Surtani msurtani at jboss.com
Wed Jan 17 09:13:06 EST 2007


  User: msurtani
  Date: 07/01/17 09:13:06

  Modified:    src/org/jboss/cache/statetransfer     
                        DefaultStateTransferIntegrator.java
                        StateTransferIntegrator.java
                        StateTransferManager.java StateTransferFactory.java
                        DefaultStateTransferGenerator.java
  Log:
  JBCACHE-908
  
  Revision  Changes    Path
  1.20      +60 -33    JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
  retrieving revision 1.19
  retrieving revision 1.20
  diff -u -b -r1.19 -r1.20
  --- DefaultStateTransferIntegrator.java	15 Jan 2007 16:19:09 -0000	1.19
  +++ DefaultStateTransferIntegrator.java	17 Jan 2007 14:13:06 -0000	1.20
  @@ -25,6 +25,8 @@
   import java.io.IOException;
   import java.io.ObjectInputStream;
   import java.util.HashSet;
  +import java.util.Iterator;
  +import java.util.List;
   import java.util.Map;
   import java.util.Set;
   
  @@ -54,17 +56,17 @@
         this.internalFqns = cache.getInternalFqns();
      }
   
  -   public void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception
  +   public void integrateState(ObjectInputStream ois, Node target) throws Exception
      {
  -      integrateTransientState(ois, (NodeSPI) target, cl);
  +      integrateTransientState(ois, (NodeSPI) target);
         integrateAssociatedState(ois);
         integratePersistentState(ois);
      }
   
  -   protected void integrateTransientState(ObjectInputStream in, NodeSPI target, ClassLoader cl) throws Exception
  +   protected void integrateTransientState(ObjectInputStream in, NodeSPI target) throws Exception
      {
         boolean transientSet = false;
  -      ClassLoader oldCL = setClassLoader(cl);
  +//      ClassLoader oldCL = setClassLoader(cl);
         try
         {
            if (log.isTraceEnabled())
  @@ -93,7 +95,7 @@
               target.removeChildrenDirect();
            }
   
  -         resetClassLoader(oldCL);
  +//         resetClassLoader(oldCL);
         }
      }
   
  @@ -106,7 +108,7 @@
      {
         // no-op in this base class 
         // just read marker
  -      readNode(in);
  +      cache.getMarshaller().objectFromObjectStream(in);
      }
   
      protected void integratePersistentState(ObjectInputStream in) throws Exception
  @@ -200,6 +202,7 @@
         }
      }
   
  +   /*
      private ClassLoader setClassLoader(ClassLoader newLoader)
      {
         ClassLoader oldClassLoader = null;
  @@ -218,16 +221,21 @@
            Thread.currentThread().setContextClassLoader(oldLoader);
         }
      }
  +   */
   
  -   private void integrateTransientState(NodeSPI target, ObjectInputStream in) throws IOException,
  -           ClassNotFoundException
  +   private void integrateTransientState(NodeSPI target, ObjectInputStream in) throws Exception
      {
         Set<Node> retainedNodes = retainInternalNodes(target);
   
         target.removeChildrenDirect();
   
  +      List<NodeData> list = readNodesAsList(in);
  +      Iterator<NodeData> nodeDataIterator = list.iterator();
  +
         // Read the first NodeData and integrate into our target     
  -      NodeData nd = readNode(in);
  +      if (nodeDataIterator.hasNext())
  +      {
  +         NodeData nd = nodeDataIterator.next();
   
         //are there any transient nodes at all?
         if (nd != null && !nd.isMarker())
  @@ -243,25 +251,44 @@
            // If it is an integration, calculate how many levels of offset
            int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
   
  -         integrateStateTransferChildren(target, offset, in);
  +            integrateStateTransferChildren(target, offset, nodeDataIterator);
   
            integrateRetainedNodes(target, retainedNodes);
         }
      }
   
  -   private NodeData readNode(ObjectInputStream in) throws IOException, ClassNotFoundException
  +      // read marker off stack
  +      cache.getMarshaller().objectFromObjectStream(in);
  +   }
  +
  +   private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception
      {
  -      NodeData nd = (NodeData) in.readObject();
  -      if (nd != null && nd.isExceptionMarker())
  +      List list = (List) cache.getMarshaller().objectFromObjectStream(in);
  +      for (Object o : list)
         {
  -         NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
  +         if (o instanceof NodeDataExceptionMarker)
  +         {
  +            NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) o;
            throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
                    + " threw exception during loadState", ndem.getCause());
         }
  -      return nd;
      }
  +      return list;
  +   }
  +
  +//   private NodeData readNode(ObjectInputStream in) throws IOException, ClassNotFoundException
  +//   {
  +//      NodeData nd = (NodeData) in.readObject();
  +//      if (nd != null && nd.isExceptionMarker())
  +//      {
  +//         NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
  +//         throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
  +//                 + " threw exception during loadState", ndem.getCause());
  +//      }
  +//      return nd;
  +//   }
   
  -   private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, ObjectInputStream in)
  +   private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, Iterator<NodeData> nodeDataIterator)
              throws IOException, ClassNotFoundException
      {
         int parent_level = parent.getFqn().size();
  @@ -269,7 +296,7 @@
         Fqn fqn;
         int size;
         Object name;
  -      NodeData nd = readNode(in);
  +      NodeData nd = nodeDataIterator.hasNext() ? nodeDataIterator.next() : null;
         while (nd != null && !nd.isMarker())
         {
            fqn = nd.getFqn();
  @@ -308,7 +335,7 @@
   
            // Recursively call, which will walk down the tree
            // and return the next NodeData that's a child of our parent
  -         nd = integrateStateTransferChildren(target, offset, in);
  +         nd = integrateStateTransferChildren(target, offset, nodeDataIterator);
         }
         return null;
      }
  
  
  
  1.6       +1 -1      JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -b -r1.5 -r1.6
  --- StateTransferIntegrator.java	14 Dec 2006 17:18:48 -0000	1.5
  +++ StateTransferIntegrator.java	17 Jan 2007 14:13:06 -0000	1.6
  @@ -13,6 +13,6 @@
   public interface StateTransferIntegrator
   {
   
  -   void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception;
  +   void integrateState(ObjectInputStream ois, Node target) throws Exception;
   
   }
  \ No newline at end of file
  
  
  
  1.24      +19 -20    JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.23
  retrieving revision 1.24
  diff -u -b -r1.23 -r1.24
  --- StateTransferManager.java	15 Jan 2007 18:10:56 -0000	1.23
  +++ StateTransferManager.java	17 Jan 2007 14:13:06 -0000	1.24
  @@ -29,9 +29,9 @@
   {
      protected final static Log log = LogFactory.getLog(StateTransferManager.class);
   
  -   public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
  +   public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
   
  -   public static final String PARTIAL_STATE_DELIMETER = "_PARTIAL_STATE_DELIMETER";
  +   public static final String PARTIAL_STATE_DELIMITER = "_PARTIAL_STATE_DELIMITER";
   
      private final CacheImpl cache;
   
  @@ -51,6 +51,7 @@
       * <p/>
       * <p/>
       *
  +    * @param out            stream to write state to
       * @param fqn            Fqn indicating the uppermost node in the
       *                       portion of the tree whose state should be returned.
       * @param timeout        max number of ms this method should wait to acquire
  @@ -61,8 +62,7 @@
       *                       on the nodes be rolled back? <strong>NOTE:</strong>
       *                       In release 1.2.4, this parameter has no effect.
       * @param suppressErrors should any Throwable thrown be suppressed?
  -    * @return a serialized byte[][], element 0 is the transient state
  -    *         (or null), and element 1 is the persistent state (or null).
  +    * @throws Throwable in event of error
       */
      public void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
      {
  @@ -77,7 +77,7 @@
   
         if (canProvideState && (fetchPersistentState || fetchTransientState))
         {
  -         out.writeBoolean(true);
  +         cache.getMarshaller().objectToObjectStream(true, out);
            StateTransferGenerator generator = getStateTransferGenerator();
            Object owner = getOwnerForLock();
            long startTime = System.currentTimeMillis();
  @@ -97,7 +97,7 @@
         }
         else
         {
  -         out.writeBoolean(false);
  +         cache.getMarshaller().objectToObjectStream(false, out);
            Exception e = null;
            if (!canProvideState)
            {
  @@ -117,7 +117,7 @@
            {
               e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state");
            }
  -         out.writeObject(e);
  +         cache.getMarshaller().objectToObjectStream(e, out);
            throw e;
         }
      }
  @@ -152,15 +152,15 @@
       * <strong>NOTE:</strong> This method performs no locking of nodes; it
       * is up to the caller to lock <code>targetRoot</code> before calling
       * this method.
  +    * <p/>
  +    * This method will use any {@linl ClassLoader} needed as defined by the active {@link org.jboss.cache.Region}
  +    * in the {@link org.jboss.cache.RegionManager}, pertaining to the targetRoot passed in.
       *
  -    * @param in         a serialized byte[][] array where element 0 is the
  -    *                   transient state (or null) , and element 1 is the
  -    *                   persistent state (or null)
  +    * @param in         an input stream containing the state
       * @param targetRoot fqn of the node into which the state should be integrated
  -    * @param cl         classloader to use to unmarshal the state, or
  -    *                   <code>null</code> if the TCCL should be used
  +    * @throws Exception In event of error
       */
  -   public void setState(ObjectInputStream in, Fqn targetRoot, ClassLoader cl) throws Exception
  +   public void setState(ObjectInputStream in, Fqn targetRoot) throws Exception
      {
         CacheImpl cache = getTreeCache();
         NodeSPI target = cache.findNode(targetRoot);
  @@ -171,15 +171,16 @@
            cache.put(targetRoot, null);
            target = cache.findNode(targetRoot);
         }
  -      boolean hasState = in.readBoolean();
  +      Object o = cache.getMarshaller().objectFromObjectStream(in);
  +      Boolean hasState = (Boolean) o;
         if (hasState)
         {
  -         setState(in, target, cl);
  +         setState(in, target);
         }
         else
         {
            throw new CacheException("Cache instance at " + cache.getLocalAddress()
  -                 + " cannot integrate state since state provider could not provide state due to " + in.readObject());
  +                 + " cannot integrate state since state provider could not provide state due to " + cache.getMarshaller().objectFromObjectStream(in));
         }
      }
   
  @@ -196,10 +197,8 @@
       *                   transient state (or null) , and element 1 is the
       *                   persistent state (or null)
       * @param targetRoot node into which the state should be integrated
  -    * @param cl         classloader to use to unmarshal the state, or
  -    *                   <code>null</code> if the TCCL should be used
       */
  -   private void setState(ObjectInputStream state, NodeSPI targetRoot, ClassLoader cl) throws Exception
  +   private void setState(ObjectInputStream state, NodeSPI targetRoot) throws Exception
      {
         Object owner = getOwnerForLock();
         long timeout = cache.getConfiguration().getInitialStateRetrievalTimeout();
  @@ -225,7 +224,7 @@
   
            StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn());
            log.info("starting state integration at node " + targetRoot);
  -         integrator.integrateState(state, targetRoot, cl);
  +         integrator.integrateState(state, targetRoot);
            log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
         }
         finally
  
  
  
  1.14      +2 -2      JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferFactory.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -b -r1.13 -r1.14
  --- StateTransferFactory.java	30 Dec 2006 17:49:57 -0000	1.13
  +++ StateTransferFactory.java	17 Jan 2007 14:13:06 -0000	1.14
  @@ -18,7 +18,7 @@
    * {@link StateTransferIntegrator} instances.
    *
    * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
  - * @version $Revision: 1.13 $
  + * @version $Revision: 1.14 $
    */
   public abstract class StateTransferFactory
   {
  @@ -50,7 +50,7 @@
         short version = 0;
         try
         {
  -         version = in.readShort();
  +         version = (Short) cache.getMarshaller().objectFromObjectStream(in);
         }
         catch (IOException io)
         {
  
  
  
  1.16      +32 -11    JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferGenerator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -b -r1.15 -r1.16
  --- DefaultStateTransferGenerator.java	15 Jan 2007 16:19:09 -0000	1.15
  +++ DefaultStateTransferGenerator.java	17 Jan 2007 14:13:06 -0000	1.16
  @@ -17,7 +17,10 @@
   import org.jboss.cache.marshall.NodeData;
   import org.jboss.cache.marshall.NodeDataExceptionMarker;
   
  +import java.io.IOException;
   import java.io.ObjectOutputStream;
  +import java.util.LinkedList;
  +import java.util.List;
   import java.util.Map;
   import java.util.Set;
   
  @@ -44,7 +47,7 @@
         Fqn fqn = rootNode.getFqn();
         try
         {
  -         out.writeShort(STATE_TRANSFER_VERSION);
  +         cache.getMarshaller().objectToObjectStream(STATE_TRANSFER_VERSION, out);
            if (generateTransient)
            {
               //transient + marker
  @@ -52,9 +55,8 @@
               {
                  log.trace("writing transient state for " + fqn);
               }
  -
               marshallTransientState((NodeSPI) rootNode, out);
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  +            delimitStream(out);
   
               if (log.isTraceEnabled())
               {
  @@ -68,7 +70,7 @@
               }
   
               marshallAssociatedState(fqn, out);
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  +            delimitStream(out);
   
               if (log.isTraceEnabled())
               {
  @@ -79,8 +81,8 @@
            else
            {
               //we have to write two markers for transient and associated
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  +            delimitStream(out);
  +            delimitStream(out);
            }
   
            CacheLoader cacheLoader = cache.getCacheLoaderManager() == null ? null : cache.getCacheLoaderManager().getCacheLoader();
  @@ -105,16 +107,27 @@
                  log.trace("persistent state succesfully written");
               }
            }
  -         out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  +         delimitStream(out);
         }
         catch (Throwable t)
         {
  -         out.writeObject(new NodeDataExceptionMarker(t, cache.getLocalAddress()));
  +         cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(t, cache.getLocalAddress()), out);
            throw t;
         }
      }
   
      /**
  +    * Places a delimiter marker on the stream
  +    *
  +    * @param out stream
  +    * @throws IOException if there are errs
  +    */
  +   protected void delimitStream(ObjectOutputStream out) throws Exception
  +   {
  +      cache.getMarshaller().objectToObjectStream(StateTransferManager.STREAMING_DELIMITER_NODE, out);
  +   }
  +
  +   /**
       * Do a preorder traversal: visit the node first, then the node's children
       *
       * @param out
  @@ -122,13 +135,19 @@
       */
      protected void marshallTransientState(NodeSPI node, ObjectOutputStream out) throws Exception
      {
  +      List<NodeData> nodeData = new LinkedList<NodeData>();
  +      generateNodeDataList(node, nodeData);
  +      cache.getMarshaller().objectToObjectStream(nodeData, out, node.getFqn());
  +   }
   
  +   protected void generateNodeDataList(NodeSPI node, List<NodeData> list) throws Exception
  +   {
         if (internalFqns.contains(node.getFqn()))
         {
            return;
         }
   
  -      Map attrs;
  +      Map<Object, Object> attrs;
         NodeData nd;
   
         // first handle the current node
  @@ -141,12 +160,14 @@
         {
            nd = new NodeData(node.getFqn(), attrs);
         }
  -      out.writeObject(nd);
  +
  +      list.add(nd);
   
         // then visit the children
         for (NodeSPI child : node.getChildrenDirect())
         {
  -         marshallTransientState(child, out);
  +         //marshallTransientState(child, out);
  +         generateNodeDataList(child, list);
         }
      }
   
  
  
  



More information about the jboss-cvs-commits mailing list