[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Wed Dec 20 17:28:13 EST 2006


  User: vblagojevic
  Date: 06/12/20 17:28:13

  Modified:    src/org/jboss/cache/statetransfer  
                        DefaultStateTransferIntegrator.java
                        StateTransferManager.java
  Log:
  final state transfer refactoring
  
  Revision  Changes    Path
  1.8       +2 -9      JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -b -r1.7 -r1.8
  --- DefaultStateTransferIntegrator.java	14 Dec 2006 17:18:48 -0000	1.7
  +++ DefaultStateTransferIntegrator.java	20 Dec 2006 22:28:13 -0000	1.8
  @@ -64,16 +64,9 @@
         {
            cause = t;
            log.error("Failed integrating state.", t);
  -      }
  -      finally
  -      {
  -         ois.close();
  -         if (cause != null)
  -         {
               throw new Exception("State transfer failed ", cause);
            }
         }
  -   }
   
      protected void integrateTransientState(ObjectInputStream in, Node target, ClassLoader cl) throws Exception
      {
  
  
  
  1.18      +43 -134   JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -b -r1.17 -r1.18
  --- StateTransferManager.java	14 Dec 2006 17:18:48 -0000	1.17
  +++ StateTransferManager.java	20 Dec 2006 22:28:13 -0000	1.18
  @@ -18,14 +18,10 @@
   import org.jboss.cache.lock.NodeLock;
   import org.jboss.cache.lock.TimeoutException;
   import org.jboss.cache.marshall.VersionAwareMarshaller;
  -import org.jboss.cache.util.ExposedByteArrayOutputStream;
  -import org.jboss.util.stream.MarshalledValueInputStream;
  -import org.jboss.util.stream.MarshalledValueOutputStream;
   
  -import java.io.ByteArrayInputStream;
  -import java.io.InputStream;
   import java.io.ObjectInputStream;
  -import java.io.OutputStream;
  +import java.io.ObjectOutputStream;
  +
   
   public class StateTransferManager
   {
  @@ -33,42 +29,25 @@
   
      public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
   
  -
      public static final String PARTIAL_STATE_DELIMETER = "_PARTIAL_STATE_DELIMETER";
   
  -   private TreeCache treeCache;
  -   private long[] loadStateTimeouts = {400, 800, 1200};
  +   private final TreeCache cache;
   
      public StateTransferManager(TreeCache cache)
      {
  -      this.treeCache = cache;
  +      this.cache = cache;
      }
   
      public TreeCache getTreeCache()
      {
  -      return treeCache;
  -   }
  -
  -   public void setTreeCache(TreeCache cache)
  -   {
  -      this.treeCache = cache;
  -   }
  -
  -   public long[] getLoadStateTimeouts()
  -   {
  -      return loadStateTimeouts;
  -   }
  -
  -   public void setLoadStateTimeouts(long[] loadStateTimeouts)
  -   {
  -      this.loadStateTimeouts = loadStateTimeouts;
  +      return cache;
      }
   
      /**
  -    * Returns the state for the portion of the tree named by <code>fqn</code>.
  +    * Writes the state for the portion of the tree named by <code>fqn</code> to 
  +    * the provided OutputStream.
  +    * 
       * <p/>
  -    * State returned is a serialized byte[][], element 0 is the transient state
  -    * (or null), and element 1 is the persistent state (or null).
       *
       * @param fqn            Fqn indicating the uppermost node in the
       *                       portion of the tree whose state should be returned.
  @@ -83,85 +62,41 @@
       * @return a serialized byte[][], element 0 is the transient state
       *         (or null), and element 1 is the persistent state (or null).
       */
  -   public byte[] getState(OutputStream os, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
  +   public void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
      {
  -      boolean usingStreamingStateTransfer = os != null;
  -      TreeCache cache = getTreeCache();
  +      VersionAwareMarshaller marshaller = cache.getMarshaller();
   
  -      VersionAwareMarshaller marshaller_ = null;
  -      if (cache.getConfiguration().isUseRegionBasedMarshalling())
  -      {
  -         marshaller_ = cache.getMarshaller();
  -      }
  -
  -      if (marshaller_ != null)
  -      {
            // can't give state for regions currently being activated/inactivated
  -         if (marshaller_.isInactive(fqn.toString()))
  -         {
  -            if (log.isDebugEnabled())
  -            {
  -               log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
  -            }
  -            if (usingStreamingStateTransfer)
  -            {
  -               MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
  -               out.writeBoolean(false);
  -               out.close();
  -            }
  -            return null;
  -         }
  -      }
  -
  -      Node rootNode = cache.findNode(fqn);
  -      if (rootNode == null)
  -      {
  -         return null;
  -      }
  +      boolean canProvideState = !(marshaller.isInactive(fqn.toString()) || cache.findNode(fqn) == null);
   
         boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState();
         CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
         boolean fetchPersistentState = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState();
   
  +      if (canProvideState && (fetchPersistentState || fetchTransientState))
  +      {
  +         out.writeBoolean(true);
  +         StateTransferGenerator generator = getStateTransferGenerator();
         Object owner = getOwnerForLock();
  +         long startTime = System.currentTimeMillis();
  +         Node rootNode = cache.findNode(fqn);
   
         try
         {
  -         if (fetchTransientState || fetchPersistentState)
  -         {
               log.info("locking the " + fqn + " subtree to return the in-memory (transient) state");
               acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
  -         }
  -
  -         MarshalledValueOutputStream out = null;
  -         byte resultBuffer[] = new byte[0];
  -         StateTransferGenerator generator = getStateTransferGenerator();
  -         long startTime = System.currentTimeMillis();
  -         if (usingStreamingStateTransfer)
  -         {
  -            out = new MarshalledValueOutputStream(os);
  -            out.writeBoolean(true);
               generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
  -         }
  -         else
  -         {
  -            ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
  -            out = new MarshalledValueOutputStream(baos);
  -            generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
  -            resultBuffer = baos.getRawBuffer();
  -         }
            log.info("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
  -         return resultBuffer;
         }
         finally
         {
            releaseStateTransferLocks(rootNode, owner, true);
         }
      }
  -
  -   public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
  +      else
      {
  -      return getState(null, fqn, timeout, force, suppressErrors);
  +         out.writeBoolean(false);
  +      }
      }
   
      /**
  @@ -185,7 +120,7 @@
                            Object[] sources, ClassLoader cl)
              throws Exception
      {
  -      treeCache.fetchPartialState(sources, subtreeRoot, integrationRoot.getFqn());
  +      cache.fetchPartialState(sources, subtreeRoot, integrationRoot.getFqn());
      }
   
      /**
  @@ -204,8 +139,7 @@
       * @param cl         classloader to use to unmarshal the state, or
       *                   <code>null</code> if the TCCL should be used
       */
  -   public void setState(Object state, Fqn targetRoot, ClassLoader cl)
  -           throws Exception
  +   public void setState(ObjectInputStream state, Fqn targetRoot, ClassLoader cl) throws Exception
      {
         TreeCache cache = getTreeCache();
         Node target = cache.findNode(targetRoot);
  @@ -216,7 +150,6 @@
            cache.put(targetRoot, null);
            target = cache.findNode(targetRoot);
         }
  -
         setState(state, target, cl);
      }
   
  @@ -236,39 +169,16 @@
       * @param cl         classloader to use to unmarshal the state, or
       *                   <code>null</code> if the TCCL should be used
       */
  -   private void setState(Object state, Node targetRoot, ClassLoader cl)
  -           throws Exception
  -   {
  -      if (state == null)
  +   private void setState(ObjectInputStream state, Node targetRoot, ClassLoader cl) throws Exception
         {
  -         log.info("new_state is null (may be first member in cluster)");
  -         return;
  -      }
  -
  -      boolean usingStreamTransfer = (state instanceof InputStream) ? true : false;
  -
         Object owner = getOwnerForLock();
  +      long startTime = System.currentTimeMillis();
  +
         try
         {
            // Acquire a lock on the root node
  -         acquireLocksForStateTransfer(targetRoot, owner,
  -                 getTreeCache().getConfiguration().getInitialStateRetrievalTimeout(),
  -                 true, true);
  -
  -         StateTransferIntegrator integrator = null;
  -         MarshalledValueInputStream in = null;
  -         if (usingStreamTransfer)
  -         {
  -            in = (MarshalledValueInputStream) state;
  -         }
  -         else
  -         {
  -            ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) state);
  -            in = new MarshalledValueInputStream(bais);
  -         }
  -
  -         integrator = getStateTransferIntegrator(in, targetRoot.getFqn());
  -         long startTime = System.currentTimeMillis();
  +         acquireLocksForStateTransfer(targetRoot, owner, getTreeCache().getConfiguration()
  +               .getInitialStateRetrievalTimeout(), true, true);
   
            /*
            * Vladimir/Manik/Brian (Dec 7,2006)
  @@ -281,12 +191,13 @@
   
            Option option = new Option();
            option.setBypassInterceptorChain(true);
  -         treeCache.getInvocationContext().setOptionOverrides(option);
  +         cache.getInvocationContext().setOptionOverrides(option);
   
            try
            {
  +            StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn());
               log.info("starting state integration at node " + targetRoot);
  -            integrator.integrateState(in, targetRoot, cl);
  +            integrator.integrateState(state, targetRoot, cl);
               log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
            }
            catch (Throwable t)
  @@ -298,7 +209,6 @@
         {
            releaseStateTransferLocks(targetRoot, owner, true);
         }
  -
      }
   
   
  @@ -388,7 +298,6 @@
         {
            owner = Thread.currentThread();
         }
  -
         return owner;
      }
   }
  
  
  



More information about the jboss-cvs-commits mailing list