[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Tue Nov 14 14:56:10 EST 2006


  User: vblagojevic
  Date: 06/11/14 14:56:10

  Modified:    src/org/jboss/cache/statetransfer  StateTransferManager.java
  Log:
  [JBCACHE-591] partial state transfer
  
  Revision  Changes    Path
  1.13      +26 -122   JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -b -r1.12 -r1.13
  --- StateTransferManager.java	24 Oct 2006 11:35:31 -0000	1.12
  +++ StateTransferManager.java	14 Nov 2006 19:56:09 -0000	1.13
  @@ -6,9 +6,13 @@
    */
   package org.jboss.cache.statetransfer;
   
  +import java.io.ByteArrayInputStream;
  +import java.io.InputStream;
  +import java.io.ObjectInputStream;
  +import java.io.OutputStream;
  +
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  -import org.jboss.cache.CacheException;
   import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
  @@ -16,20 +20,10 @@
   import org.jboss.cache.loader.NodeData;
   import org.jboss.cache.loader.NodeDataMarker;
   import org.jboss.cache.lock.TimeoutException;
  -import org.jboss.cache.marshall.MethodCall;
  -import org.jboss.cache.marshall.MethodCallFactory;
  -import org.jboss.cache.marshall.MethodDeclarations;
   import org.jboss.cache.marshall.VersionAwareMarshaller;
   import org.jboss.cache.util.ExposedByteArrayOutputStream;
  -import org.jboss.util.stream.MarshalledValueOutputStream;
   import org.jboss.util.stream.MarshalledValueInputStream;
  -
  -import java.io.ByteArrayInputStream;
  -import java.io.InputStream;
  -import java.io.ObjectInputStream;
  -import java.io.OutputStream;
  -import java.util.List;
  -import java.util.Vector;
  +import org.jboss.util.stream.MarshalledValueOutputStream;
   
   public class StateTransferManager
   {
  @@ -37,6 +31,9 @@
   
      public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
   
  +   
  +   public static final String PARTIAL_STATE_DELIMETER = "_PARTIAL_STATE_DELIMETER";
  +
      private TreeCache treeCache;
      private long[] loadStateTimeouts = {400, 800, 1200};
   
  @@ -98,18 +95,16 @@
         if (marshaller_ != null)
         {
            // can't give state for regions currently being activated/inactivated
  -         if (cache.isActivatingDeactivating(fqn))
  +         if (marshaller_.isInactive(fqn.toString()))
            {
               if (log.isDebugEnabled())
                  log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
  -            return null;
  -         }
  -
  -         // Can't give state for inactive nodes
  -         if (marshaller_.isInactive(fqn.toString()))
  +            if(usingStreamingStateTransfer)
            {
  -            if (log.isDebugEnabled())
  -               log.debug("ignoring _getState() for inactive region " + fqn);
  +               MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
  +               out.writeBoolean(false);
  +               out.close();
  +            }
               return null;
            }
         }
  @@ -139,6 +134,7 @@
            if (usingStreamingStateTransfer)
            {
               out = new MarshalledValueOutputStream(os);          
  +            out.writeBoolean(true);
               generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
            }
            else
  @@ -163,6 +159,9 @@
      }
   
      /**
  +    * 
  +    * TODO change this javadoc
  +    * 
       * Requests state from each of the given source nodes in the cluster
       * until it gets it or no node replies with a timeout exception.  If state
       * is returned, integrates it into the given DataNode.  If no state is
  @@ -181,102 +180,7 @@
                            Object[] sources, ClassLoader cl)
              throws Exception
      {
  -      TreeCache cache = getTreeCache();
  -      // Call each node in the cluster with progressively longer timeouts
  -      // until we get state or no cluster node returns a TimeoutException
  -      long[] timeouts = getLoadStateTimeouts();
  -      Object ourself = cache.getLocalAddress(); // ignore ourself when we call
  -      boolean stateSet = false;
  -      TimeoutException timeoutException = null;
  -      Object timeoutTarget = null;
  -
  -      boolean trace = log.isTraceEnabled();
  -
  -      for (int i = 0; i < timeouts.length; i++)
  -      {
  -         timeoutException = null;
  -
  -         Boolean force = (i == timeouts.length - 1) ? Boolean.TRUE
  -                 : Boolean.FALSE;
  -
  -         MethodCall psmc = MethodCallFactory.create(MethodDeclarations.getPartialStateMethod, subtreeRoot,
  -                 timeouts[i],
  -                 force,
  -                 Boolean.FALSE);
  -
  -         MethodCall replPsmc = MethodCallFactory.create(MethodDeclarations.replicateMethod,
  -                 psmc);
  -
  -         // Iterate over the group members, seeing if anyone
  -         // can give us state for this region
  -         for (int j = 0; j < sources.length; j++)
  -         {
  -            Object target = sources[j];
  -            if (ourself.equals(target))
  -               continue;
  -
  -            Vector targets = new Vector();
  -            targets.add(target);
  -
  -            List responses = cache.callRemoteMethods(targets, replPsmc, true,
  -                    true, cache.getConfiguration().getSyncReplTimeout());
  -            Object rsp = null;
  -            if (responses != null && responses.size() > 0)
  -            {
  -               rsp = responses.get(0);
  -               if (rsp instanceof byte[])
  -               {
  -                  setState((byte[]) rsp, integrationRoot, cl);
  -                  stateSet = true;
  -
  -                  if (log.isDebugEnabled())
  -                  {
  -                     log.debug("loadState(): " + ourself +
  -                             " got state from " + target);
  -                  }
  -
  -                  break;
  -               }
  -               else if (rsp instanceof TimeoutException)
  -               {
  -                  timeoutException = (TimeoutException) rsp;
  -                  timeoutTarget = target;
  -                  if (trace)
  -                  {
  -                     log.trace("TreeCache.activateRegion(): " + ourself +
  -                             " got a TimeoutException from " + target);
  -                  }
  -               }
  -            }
  -
  -            if (trace)
  -            {
  -               log.trace("TreeCache.activateRegion(): " + ourself +
  -                       " No usable response from node " + target +
  -                       (rsp == null ? "" : (" -- received " + rsp)));
  -            }
  -         }
  -
  -         // We've looped through all targets; if we got state or didn't
  -         // but no one sent a timeout (which means no one had state)
  -         // we don't want to try again
  -         if (stateSet || timeoutException == null)
  -            break;
  -      }
  -
  -      if (!stateSet)
  -      {
  -         // If we got a timeout exception on the final try,
  -         // this is a failure condition
  -         if (timeoutException != null)
  -         {
  -            throw new CacheException("Failed getting state due to timeout on " +
  -                    timeoutTarget, timeoutException);
  -         }
  -
  -         if (log.isDebugEnabled())
  -            log.debug("TreeCache.activateRegion(): No nodes able to give state");
  -      }
  +      treeCache.fetchPartialState(sources, subtreeRoot,integrationRoot.getFqn());
      }
   
      /**
  @@ -350,7 +254,7 @@
            MarshalledValueInputStream in =null;
            if (usingStreamTransfer)
            {
  -            in = new MarshalledValueInputStream((InputStream) state);           
  +            in =  (MarshalledValueInputStream) state;           
            }
            else
            {   
  
  
  



More information about the jboss-cvs-commits mailing list