[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Mon Sep 11 17:53:19 EDT 2006


  User: vblagojevic
  Date: 06/09/11 17:53:19

  Modified:    src/org/jboss/cache/statetransfer          
                        AbstractStateTransferGenerator.java
                        StateTransferIntegrator.java
                        AbstractStateTransferIntegrator.java
                        StateTransferGenerator.java
                        StateTransferFactory.java StateTransferManager.java
  Removed:     src/org/jboss/cache/statetransfer          
                        StateTransferGenerator_200.java
                        StreamingStateTransferGenerator_200.java
                        StateTransferIntegrator_200.java
                        StreamingStateTransferIntegrator_200.java
  Log:
  refactoring after Brian's review
  
  Revision  Changes    Path
  1.3       +2 -2      JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferGenerator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: AbstractStateTransferGenerator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferGenerator.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- AbstractStateTransferGenerator.java	7 Sep 2006 17:41:52 -0000	1.2
  +++ AbstractStateTransferGenerator.java	11 Sep 2006 21:53:19 -0000	1.3
  @@ -19,7 +19,7 @@
   import org.jboss.cache.Version;
   import org.jboss.cache.loader.NodeData;
   
  -public class AbstractStateTransferGenerator
  +public class AbstractStateTransferGenerator implements StateTransferGenerator
   {
   
      public static final short STATE_TRANSFER_VERSION = Version.getVersionShort("2.0.0.GA");
  @@ -36,7 +36,7 @@
         this.internalFqns = cache.getInternalFqns();
      }
   
  -   protected void streamState(ObjectOutputStream out, DataNode rootNode, boolean generateTransient,
  +   public void generateState(ObjectOutputStream out, DataNode rootNode, boolean generateTransient,
            boolean generatePersistent, boolean suppressErrors) throws Throwable
      {
         Fqn fqn = rootNode.getFqn();
  
  
  
  1.4       +3 -1      JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -b -r1.3 -r1.4
  --- StateTransferIntegrator.java	31 Aug 2006 20:30:45 -0000	1.3
  +++ StateTransferIntegrator.java	11 Sep 2006 21:53:19 -0000	1.4
  @@ -6,11 +6,13 @@
    */
   package org.jboss.cache.statetransfer;
   
  +import java.io.ObjectInputStream;
  +
   import org.jboss.cache.DataNode;
   
   public interface StateTransferIntegrator
   {
      
  -   void integrateState(DataNode target,ClassLoader cl)throws Exception;
  +   void integrateState(ObjectInputStream ois, DataNode target,ClassLoader cl)throws Exception;
   
   }
  \ No newline at end of file
  
  
  
  1.5       +2 -2      JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: AbstractStateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferIntegrator.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -b -r1.4 -r1.5
  --- AbstractStateTransferIntegrator.java	7 Sep 2006 18:56:39 -0000	1.4
  +++ AbstractStateTransferIntegrator.java	11 Sep 2006 21:53:19 -0000	1.5
  @@ -23,7 +23,7 @@
   import org.jboss.cache.loader.CacheLoader;
   import org.jboss.cache.loader.NodeData;
   
  -public class AbstractStateTransferIntegrator
  +public class AbstractStateTransferIntegrator implements StateTransferIntegrator
   {   
   
      protected Log log = LogFactory.getLog(getClass().getName());
  @@ -343,7 +343,7 @@
         return targetFqn;
      }
        
  -   protected void integrateStateHelper(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
  +   public void integrateState(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
      {
         Throwable cause=null;
         //first try integrating transient state
  
  
  
  1.3       +4 -2      JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferGenerator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- StateTransferGenerator.java	11 Oct 2005 20:15:15 -0000	1.2
  +++ StateTransferGenerator.java	11 Sep 2006 21:53:19 -0000	1.3
  @@ -6,12 +6,14 @@
    */
   package org.jboss.cache.statetransfer;
   
  +import java.io.ObjectOutputStream;
  +
   import org.jboss.cache.DataNode;
   
   public interface StateTransferGenerator
   {
   
  -   public abstract byte[] generateStateTransfer(DataNode rootNode, boolean generateTransient,
  +   public void generateState(ObjectOutputStream stream, DataNode rootNode, boolean generateTransient,
            boolean generatePersistent, boolean suppressErrors) throws Throwable;
   
   }
  \ No newline at end of file
  
  
  
  1.11      +9 -72     JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferFactory.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -b -r1.10 -r1.11
  --- StateTransferFactory.java	31 Aug 2006 14:56:46 -0000	1.10
  +++ StateTransferFactory.java	11 Sep 2006 21:53:19 -0000	1.11
  @@ -6,22 +6,19 @@
    */
   package org.jboss.cache.statetransfer;
   
  +import java.io.IOException;
  +import java.io.ObjectInputStream;
  +
   import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
   import org.jboss.cache.Version;
  -import org.jboss.invocation.MarshalledValueInputStream;
  -
  -import java.io.ByteArrayInputStream;
  -import java.io.IOException;
  -import java.io.InputStream;
  -import java.io.OutputStream;
   
   /**
    * Factory class able to create {@link StateTransferGenerator} and 
    * {@link StateTransferIntegrator} instances.
    * 
    * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
  - * @version $Revision: 1.10 $
  + * @version $Revision: 1.11 $
    */
   public abstract class StateTransferFactory
   {
  @@ -46,73 +43,13 @@
         if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
            throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
         else
  -         return new StateTransferGenerator_200(cache); // current default
  -   }
  -   
  -   public static StateTransferGenerator getStateTransferGenerator(OutputStream os, TreeCache cache)
  -   {
  -      short version = cache.getConfiguration().getReplicationVersion();
  -
  -      // Compiler won't let me use a switch
  -
  -      if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
  -         throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  -      else
  -         return new StreamingStateTransferGenerator_200(os,cache); // current default
  -   }
  -   
  -   /**
  -    * Gets a StateTransferIntegrator able to handle the given state.
  -    * 
  -    * @param state      the state
  -    * @param targetFqn  Fqn of the node to which the state will be bound
  -    * @param cache      cache in which the state will be stored
  -    * @return           the {@link StateTransferIntegrator}.
  -    * 
  -    * @throws IllegalStateException if the cache's ReplicationVersion is < 2.0.0
  -    * @throws Exception
  -    */
  -   public static StateTransferIntegrator 
  -      getStateTransferIntegrator(byte[] state, Fqn targetFqn, TreeCache cache) 
  -         throws Exception
  -   {
  -      ByteArrayInputStream bais = new ByteArrayInputStream(state);
  -      bais.mark(1024);      
  -      
  -      short version = 0;
  -      MarshalledValueInputStream in = new MarshalledValueInputStream(bais);
  -      try {
  -         try
  -         {
  -            version = in.readShort();
  -         }
  -         catch (IOException io)
  -         {
  -            // No short at the head of the stream means version 123
  -            throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  -         }
  -         
  -         // Compiler won't let me use a switch
  -         
  -         if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
  -            throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  -         else
  -            return new StateTransferIntegrator_200(state, targetFqn, cache); // current default
  -                 
  -      }
  -      finally {
  -         try {
  -            in.close();         
  -         }
  -         catch (IOException io) {}
  -      }
  +         return new AbstractStateTransferGenerator(cache); // current default
      }
   
  -   public static StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn, TreeCache treeCache)
  +   public static StateTransferIntegrator getStateTransferIntegrator(ObjectInputStream in, Fqn fqn, TreeCache treeCache)
            throws Exception
      {
         short version = 0;
  -      MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
         try
         {
            version = in.readShort();
  @@ -133,7 +70,7 @@
         if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
            throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
         else
  -         return new StreamingStateTransferIntegrator_200(in, fqn, treeCache); // current default
  +         return new AbstractStateTransferIntegrator(fqn, treeCache); // current default
      }      
              
   }
  
  
  
  1.11      +33 -58    JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -b -r1.10 -r1.11
  --- StateTransferManager.java	11 Sep 2006 17:02:43 -0000	1.10
  +++ StateTransferManager.java	11 Sep 2006 21:53:19 -0000	1.11
  @@ -20,12 +20,15 @@
   import org.jboss.cache.marshall.MethodCallFactory;
   import org.jboss.cache.marshall.MethodDeclarations;
   import org.jboss.cache.marshall.VersionAwareMarshaller;
  +import org.jboss.cache.util.ExposedByteArrayOutputStream;
  +import org.jboss.invocation.MarshalledValueInputStream;
  +import org.jboss.invocation.MarshalledValueOutputStream;
   
  +import java.io.ByteArrayInputStream;
   import java.io.InputStream;
  +import java.io.ObjectInputStream;
   import java.io.OutputStream;
  -import java.util.Iterator;
   import java.util.List;
  -import java.util.Map;
   import java.util.Vector;
   
   public class StateTransferManager
  @@ -132,20 +135,24 @@
               acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
            }
   
  -         StateTransferGenerator generator = null;
  +         MarshalledValueOutputStream out = null;
  +         byte resultBuffer [] = new byte[0];
  +         StateTransferGenerator generator = getStateTransferGenerator(); 
  +         long startTime = System.currentTimeMillis();
            if (usingStreamingStateTransfer)
            {
  -            generator = getStateTransferGenerator(os);
  +            out = new MarshalledValueOutputStream(os);          
  +            generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
            }
            else
            {
  -            generator = getStateTransferGenerator();
  +            ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16*1024);
  +            out = new MarshalledValueOutputStream(baos);            
  +            generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
  +            resultBuffer = baos.getRawBuffer();
            }
  -
  -         return generator.generateStateTransfer(rootNode,
  -                 fetchTransientState,
  -                 fetchPersistentState,
  -                 suppressErrors);
  +         log.info("Successfully generated state in " + (System.currentTimeMillis()-startTime) + " msec");
  +         return resultBuffer;
         }
         finally
         {
  @@ -343,23 +350,24 @@
                    true, true);
   
            StateTransferIntegrator integrator = null;
  +         MarshalledValueInputStream in =null;
            if (usingStreamTransfer)
            {
  -            integrator = getStateTransferIntegrator((InputStream) state,
  -                    targetRoot.getFqn());
  +            in = new MarshalledValueInputStream((InputStream) state);           
            }
            else
            {
  -            byte [] new_state = (byte[]) state;
  -            log.info("received the state (size=" + new_state.length + " bytes)");
  -            integrator = getStateTransferIntegrator(new_state, targetRoot.getFqn());
  +            ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) state);
  +            in = new MarshalledValueInputStream(bais);                        
            }
   
  +         integrator = getStateTransferIntegrator(in, targetRoot.getFqn());
  +         long startTime = System.currentTimeMillis();
            try
            {
  -            log.info("starting state integration at node " + targetRoot + " using " + integrator);
  -            integrator.integrateState(targetRoot, cl);
  -            log.info("successfully integrated state");
  +            log.info("starting state integration at node " + targetRoot);
  +            integrator.integrateState(in,targetRoot, cl);
  +            log.info("successfully integrated state in " + (System.currentTimeMillis()-startTime) + " msec");
            }
            catch (Throwable t)
            {
  @@ -436,45 +444,12 @@
         return StateTransferFactory.getStateTransferGenerator(getTreeCache());
      }
   
  -   protected StateTransferGenerator getStateTransferGenerator(OutputStream os)
  -   {
  -      return StateTransferFactory.getStateTransferGenerator(os, getTreeCache());
  -   }
  -
  -   protected StateTransferIntegrator getStateTransferIntegrator(byte[] state, Fqn targetFqn)
  -           throws Exception
  -   {
  -      return StateTransferFactory.getStateTransferIntegrator(state, targetFqn, getTreeCache());
  -   }
  -
  -   private StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn) throws Exception
  +   protected StateTransferIntegrator getStateTransferIntegrator(ObjectInputStream istream, Fqn fqn) throws Exception
      {
         return StateTransferFactory.getStateTransferIntegrator(istream, fqn, getTreeCache());
      }
   
      /**
  -    * Generates NodeAdded notifications for all nodes of the tree. This is
  -    * called whenever the tree is initially retrieved (state transfer)
  -    */
  -   private void notifyAllNodesCreated(DataNode curr)
  -   {
  -      DataNode n;
  -      Map children;
  -
  -      if (curr == null) return;
  -      getTreeCache().getNotifier().notifyNodeCreated(curr.getFqn(), true);
  -      getTreeCache().getNotifier().notifyNodeCreated(curr.getFqn(), false);
  -      if ((children = curr.getChildren()) != null)
  -      {
  -         for (Iterator it = children.values().iterator(); it.hasNext();)
  -         {
  -            n = (DataNode) it.next();
  -            notifyAllNodesCreated(n);
  -         }
  -      }
  -   }
  -
  -   /**
       * Returns an object suitable for use in node locking, either the current
       * transaction or the current thread if there is no transaction.
       */
  
  
  



More information about the jboss-cvs-commits mailing list