[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Thu Dec 21 16:41:10 EST 2006


  User: vblagojevic
  Date: 06/12/21 16:41:10

  Modified:    src/org/jboss/cache/statetransfer   
                        DefaultStateTransferGenerator.java
                        DefaultStateTransferIntegrator.java
                        StateTransferManager.java
  Log:
  final state transfer polishing (all tests pass now)
  
  Revision  Changes    Path
  1.7       +5 -17     JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferGenerator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -b -r1.6 -r1.7
  --- DefaultStateTransferGenerator.java	14 Dec 2006 17:18:48 -0000	1.6
  +++ DefaultStateTransferGenerator.java	21 Dec 2006 21:41:10 -0000	1.7
  @@ -8,7 +8,6 @@
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  -import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.Node;
   import org.jboss.cache.TreeCache;
  @@ -43,8 +42,6 @@
                                boolean generatePersistent, boolean suppressErrors) throws Throwable
      {
         Fqn fqn = rootNode.getFqn();
  -      Throwable encouteredException = null;
  -
         try
         {
            out.writeShort(STATE_TRANSFER_VERSION);
  @@ -106,17 +103,8 @@
         }
         catch (Throwable t)
         {
  -         encouteredException = t;
  -         log.error("failed writing state", t);
            out.writeObject(new NodeDataExceptionMarker(t, cache.getLocalAddress()));
  -      }
  -      finally
  -      {
  -         out.close();
  -         if (encouteredException != null && !suppressErrors)
  -         {
  -            throw encouteredException;
  -         }
  +         throw t;
         }
      }
   
  @@ -158,7 +146,7 @@
         for (Iterator it = children.entrySet().iterator(); it.hasNext();)
         {
            Map.Entry entry = (Map.Entry) it.next();
  -         marshallTransientState((DataNode) entry.getValue(), out);
  +         marshallTransientState((Node) entry.getValue(), out);
         }
      }
   
  
  
  
  1.9       +19 -29    JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -b -r1.8 -r1.9
  --- DefaultStateTransferIntegrator.java	20 Dec 2006 22:28:13 -0000	1.8
  +++ DefaultStateTransferIntegrator.java	21 Dec 2006 21:41:10 -0000	1.9
  @@ -9,10 +9,10 @@
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.CacheException;
  -import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.Node;
   import org.jboss.cache.TreeCache;
  +import org.jboss.cache.TreeNode;
   import org.jboss.cache.buddyreplication.BuddyManager;
   import org.jboss.cache.factories.NodeFactory;
   import org.jboss.cache.loader.CacheLoader;
  @@ -53,20 +53,10 @@
   
      public void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception
      {
  -      Throwable cause = null;
  -      try
  -      {
            integrateTransientState(ois, target, cl);
            integrateAssociatedState(ois);
            integratePersistentState(ois);
         }
  -      catch (Throwable t)
  -      {
  -         cause = t;
  -         log.error("Failed integrating state.", t);
  -         throw new Exception("State transfer failed ", cause);
  -      }      
  -   }
   
      protected void integrateTransientState(ObjectInputStream in, Node target, ClassLoader cl) throws Exception
      {
  @@ -97,7 +87,7 @@
               // Clear any existing state from the targetRoot
               log.warn("transient state integration failed, removing all children of " + target);
               target.clearData();
  -            ((DataNode) target).removeChildren();
  +            ((TreeNode) target).removeChildren();
            }
   
            resetClassLoader(oldCL);
  @@ -197,7 +187,7 @@
       */
      private void notifyAllNodesCreated(Node curr)
      {
  -      DataNode n;
  +      TreeNode n;
   
         if (curr == null) return;
         getCache().getNotifier().notifyNodeCreated(curr.getFqn(), true);
  @@ -205,7 +195,7 @@
         Set children = curr.getChildren();
         for (Iterator it = children.iterator(); it.hasNext();)
         {
  -         n = (DataNode) it.next();
  +         n = (TreeNode) it.next();
            notifyAllNodesCreated(n);
         }
      }
  @@ -234,7 +224,7 @@
      {
         Set retainedNodes = retainInternalNodes(target);
   
  -      ((DataNode) target).removeChildren();
  +      ((TreeNode) target).removeChildren();
   
         // Read the first NodeData and integrate into our target     
         NodeData nd = readNode(in);
  @@ -242,7 +232,7 @@
         //are there any transient nodes at all?
         if (nd != null && !nd.isMarker())
         {
  -         target.put(nd.getAttributes(), true);
  +         target.put(nd.getAttributes());
   
            // Check whether this is an integration into the buddy backup
            // subtree
  @@ -301,10 +291,10 @@
   
            name = fqn.get(size - 1);
   
  -         // We handle this NodeData.  Create a DataNode and
  +         // We handle this NodeData.  Create a TreeNode and
            // integrate its data            
            Node target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), false, null, cache.getCacheSPI());
  -         ((DataNode) parent).addChild(name, target);
  +         ((TreeNode) parent).addChild(name, target);
   
            // Recursively call, which will walk down the tree
            // and return the next NodeData that's a child of our parent
  @@ -322,7 +312,7 @@
            Fqn internalFqn = (Fqn) it.next();
            if (internalFqn.isChildOf(targetFqn))
            {
  -            DataNode internalNode = getInternalNode(target, internalFqn);
  +            TreeNode internalNode = getInternalNode(target, internalFqn);
               if (internalNode != null)
               {
                  result.add(internalNode);
  @@ -333,10 +323,10 @@
         return result;
      }
   
  -   private DataNode getInternalNode(Node parent, Fqn internalFqn)
  +   private TreeNode getInternalNode(Node parent, Fqn internalFqn)
      {
         Object name = internalFqn.get(parent.getFqn().size());
  -      DataNode result = (DataNode) parent.getChild(new Fqn(name));
  +      TreeNode result = (TreeNode) parent.getChild(new Fqn(name));
         if (result != null)
         {
            if (internalFqn.size() < result.getFqn().size())
  @@ -353,7 +343,7 @@
         Fqn rootFqn = root.getFqn();
         for (Iterator it = retainedNodes.iterator(); it.hasNext();)
         {
  -         DataNode retained = (DataNode) it.next();
  +         TreeNode retained = (TreeNode) it.next();
            if (retained.getFqn().isChildOf(rootFqn))
            {
               integrateRetainedNode(root, retained);
  @@ -361,17 +351,17 @@
         }
      }
   
  -   private void integrateRetainedNode(Node ancestor, DataNode descendant)
  +   private void integrateRetainedNode(Node ancestor, TreeNode descendant)
      {
         Fqn descFqn = descendant.getFqn();
         Fqn ancFqn = ancestor.getFqn();
         Object name = descFqn.get(ancFqn.size());
  -      DataNode child = (DataNode) ancestor.getChild(new Fqn(name));
  +      TreeNode child = (TreeNode) ancestor.getChild(new Fqn(name));
         if (ancFqn.size() == descFqn.size() + 1)
         {
            if (child == null)
            {
  -            ((DataNode) ancestor).addChild(name, descendant);
  +            ((TreeNode) ancestor).addChild(name, descendant);
            }
            else
            {
  @@ -386,7 +376,7 @@
               // This shouldn't really happen -- internal fqns should
               // be immediately under the root
               child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, null, cache.getCacheSPI());
  -            ((DataNode) ancestor).addChild(name, child);
  +            ((TreeNode) ancestor).addChild(name, child);
            }
   
            // Keep walking down the tree
  
  
  
  1.19      +42 -20    JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.18
  retrieving revision 1.19
  diff -u -b -r1.18 -r1.19
  --- StateTransferManager.java	20 Dec 2006 22:28:13 -0000	1.18
  +++ StateTransferManager.java	21 Dec 2006 21:41:10 -0000	1.19
  @@ -8,6 +8,7 @@
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  +import org.jboss.cache.CacheException;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.Node;
   import org.jboss.cache.TreeCache;
  @@ -67,7 +68,7 @@
         VersionAwareMarshaller marshaller = cache.getMarshaller();
   
         // can't give state for regions currently being activated/inactivated
  -      boolean canProvideState = !(marshaller.isInactive(fqn.toString()) || cache.findNode(fqn) == null);
  +      boolean canProvideState = (!marshaller.isInactive(fqn.toString()) && cache.findNode(fqn) != null);
   
         boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState();
         CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
  @@ -96,12 +97,31 @@
         else
         {
            out.writeBoolean(false);
  +         Exception e = null;
  +         if(!canProvideState)
  +         {
  +            String exceptionMessage = "Cache instance at " + cache.getLocalAddress() + " cannot provide state for fqn " + fqn +".";
  +            
  +            if(marshaller.isInactive(fqn.toString()))
  +            {
  +               exceptionMessage += " Region for fqn " + fqn + " is inactive.";
  +            }
  +            if(cache.findNode(fqn) == null)
  +            {
  +               exceptionMessage += " There is no cache node at fqn " + fqn;
  +            }
  +            e = new CacheException(exceptionMessage);           
  +         }
  +         if(!fetchPersistentState && !fetchTransientState)
  +         {
  +            e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state");
  +         }
  +         out.writeObject(e);
  +         throw e;
         }
      }   
   
      /**
  -    * TODO change this javadoc
  -    * <p/>
       * Requests state from each of the given source nodes in the cluster
       * until it gets it or no node replies with a timeout exception.  If state
       * is returned, integrates it into the given DataNode.  If no state is
  @@ -132,14 +152,14 @@
       * is up to the caller to lock <code>targetRoot</code> before calling
       * this method.
       *
  -    * @param state      a serialized byte[][] array where element 0 is the
  +    * @param in      a serialized byte[][] array where element 0 is the
       *                   transient state (or null) , and element 1 is the
       *                   persistent state (or null)
       * @param targetRoot fqn of the node into which the state should be integrated
       * @param cl         classloader to use to unmarshal the state, or
       *                   <code>null</code> if the TCCL should be used
       */
  -   public void setState(ObjectInputStream state, Fqn targetRoot, ClassLoader cl) throws Exception
  +   public void setState(ObjectInputStream in, Fqn targetRoot, ClassLoader cl) throws Exception
      {
         TreeCache cache = getTreeCache();
         Node target = cache.findNode(targetRoot);
  @@ -150,7 +170,16 @@
            cache.put(targetRoot, null);
            target = cache.findNode(targetRoot);
         }
  -      setState(state, target, cl);
  +      boolean hasState = in.readBoolean();
  +      if(hasState)
  +      {
  +         setState(in, target, cl);
  +      }
  +      else
  +      {
  +         throw new CacheException("Cache instance at " + cache.getLocalAddress()
  +               + " cannot integrate state since state provider could not provide state due to " + in.readObject());
  +      }
      }
   
      /**
  @@ -172,13 +201,13 @@
      private void setState(ObjectInputStream state, Node targetRoot, ClassLoader cl) throws Exception
      {
         Object owner = getOwnerForLock();
  +      long timeout = cache.getConfiguration().getInitialStateRetrievalTimeout();
         long startTime = System.currentTimeMillis();
   
         try
         {
            // Acquire a lock on the root node
  -         acquireLocksForStateTransfer(targetRoot, owner, getTreeCache().getConfiguration()
  -               .getInitialStateRetrievalTimeout(), true, true);
  +         acquireLocksForStateTransfer(targetRoot, owner, timeout, true, true);
   
            /*
             * Vladimir/Manik/Brian (Dec 7,2006)
  @@ -193,18 +222,11 @@
            option.setBypassInterceptorChain(true);
            cache.getInvocationContext().setOptionOverrides(option);
   
  -         try
  -         {
               StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn());
               log.info("starting state integration at node " + targetRoot);
               integrator.integrateState(state, targetRoot, cl);
               log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
            }
  -         catch (Throwable t)
  -         {
  -            log.error("failed integrating state", t);
  -         }
  -      }
         finally
         {
            releaseStateTransferLocks(targetRoot, owner, true);
  
  
  



More information about the jboss-cvs-commits mailing list