[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Thu Sep 21 11:14:45 EDT 2006


  User: vblagojevic
  Date: 06/09/21 11:14:45

  Modified:    src/org/jboss/cache/statetransfer  
                        DefaultStateTransferGenerator.java
                        DefaultStateTransferIntegrator.java
  Log:
  atomic state transfer, see http://jboss.org/index.html?module=bb&op=viewtopic&t=90586
  
  Revision  Changes    Path
  1.3       +35 -67    JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferGenerator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- DefaultStateTransferGenerator.java	13 Sep 2006 15:42:17 -0000	1.2
  +++ DefaultStateTransferGenerator.java	21 Sep 2006 15:14:45 -0000	1.3
  @@ -42,65 +42,37 @@
      {
         Fqn fqn = rootNode.getFqn();
         Throwable encouteredException = null;
  -      try
  -      {
            
  -         //transient + marker
            try
            {
               out.writeShort(STATE_TRANSFER_VERSION);
               if (generateTransient)
               {
  +            //transient + marker
                  if (log.isTraceEnabled())
                     log.trace("writing transient state for " + fqn);
   
                  marshallTransientState(rootNode, out);
  +            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
   
                  if (log.isTraceEnabled())
                     log.trace("transient state succesfully written");              
  -            }
  -         }
  -         catch (Throwable t)
  -         {
  -            encouteredException=t;
  -            log.error("failed getting the in-memory (transient) state", t);
  -            out.writeObject(new NodeDataExceptionMarker(t));            
  -         }
  -         finally
  -         {            
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  -         }
            
            //associated + marker
  -         try
  -         {
               if (log.isTraceEnabled())
                  log.trace("writing associated state");
   
               marshallAssociatedState(fqn, out);
  +            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
   
               if (log.isTraceEnabled())
                  log.trace("associated state succesfully written");
  -         }
  -         catch (Throwable t)
  -         {
  -            encouteredException=t;
  -            log.error("failed writing associated state", t);   
  -            out.writeObject(new NodeDataExceptionMarker(t));   
  -         }
  -         finally
  -         {            
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  -         }        
            
  -         //persistent + marker
  -         try
  -         {
  +         }
               if (generatePersistent)
               {
                  if (log.isTraceEnabled())
  -                  log.trace("writing persistent state for " + 
  -                        fqn + ",using " + cache.getCacheLoader().getClass());
  +               log.trace("writing persistent state for " + fqn + ",using " + cache.getCacheLoader().getClass());
   
                  if (fqn.isRoot())
                  {
  @@ -114,22 +86,18 @@
                  if (log.isTraceEnabled())
                     log.trace("persistent state succesfully written");
               }
  +         out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
            }
            catch (Throwable t)
            {
  -            encouteredException=t;
  -            log.error("failed getting the persistent state", t);
  +         encouteredException = t;
  +         log.error("failed writing state", t);
               out.writeObject(new NodeDataExceptionMarker(t));            
            }
            finally
            {            
  -            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
  -         }
  -      }
  -      finally
  -      {
            out.close();
  -         if(encouteredException!=null && !suppressErrors)
  +         if (encouteredException != null && !suppressErrors)
            {
               throw encouteredException;
            }
  
  
  
  1.3       +75 -84    JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DefaultStateTransferIntegrator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- DefaultStateTransferIntegrator.java	13 Sep 2006 15:42:17 -0000	1.2
  +++ DefaultStateTransferIntegrator.java	21 Sep 2006 15:14:45 -0000	1.3
  @@ -15,6 +15,7 @@
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  +import org.jboss.cache.CacheException;
   import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
  @@ -22,6 +23,7 @@
   import org.jboss.cache.factories.NodeFactory;
   import org.jboss.cache.loader.CacheLoader;
   import org.jboss.cache.loader.NodeData;
  +import org.jboss.cache.loader.NodeDataExceptionMarker;
   
   public class DefaultStateTransferIntegrator implements StateTransferIntegrator
   {   
  @@ -49,6 +51,36 @@
         this.internalFqns = cache.getInternalFqns();     
      }
   
  +   public void integrateState(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
  +   {
  +      Throwable cause = null;     
  +      try
  +      {
  +         integrateTransientState(ois, target, cl);
  +         integrateAssociatedState(ois);
  +         integratePersistentState(ois);
  +      }
  +      catch (ClassCastException cce)
  +      {
  +         cause = cce;
  +         log.error("Failed integrating persistent state. One of cacheloaders is not"
  +               + " adhering to state stream format [JBCACHE-738].");
  +      }
  +      catch(Throwable t)
  +      {
  +         cause = t;
  +         log.error("Failed integrating state.",t);
  +      }                        
  +      finally
  +      {
  +         ois.close();
  +         if (cause != null)
  +         {
  +            throw new Exception("State transfer failed ");
  +         }
  +      }
  +   }
  +
      protected void integrateTransientState(ObjectInputStream in,DataNode target, ClassLoader cl) throws Exception
      {
         boolean transientSet = false;
  @@ -72,6 +104,7 @@
            if (!transientSet)
            {
               // Clear any existing state from the targetRoot
  +            log.warn("transient state integration failed, removing all children of " + target);
               target.clear();
               target.removeAllChildren();
            }
  @@ -89,7 +122,7 @@
      {
         // no-op in this base class 
         // just read marker
  -      in.readObject();      
  +      readNode(in);   
      }
   
      protected void integratePersistentState(ObjectInputStream in) throws Exception
  @@ -99,7 +132,8 @@
         boolean persistentSet=false;
         if (loader == null)
         {
  -         log.error("cache loader is null, cannot set persistent state");
  +         if (log.isTraceEnabled())
  +            log.trace("cache loader is null, will not attempt to integrate persistent state");
         }
         else
         {
  @@ -119,11 +153,9 @@
            }
            finally
            {
  -            if(!persistentSet)
  +            if (!persistentSet)
               {
  -               if (log.isTraceEnabled())
  -                  log.trace("persistent state integration failed, removing all nodes from loader");
  -               
  +               log.warn("persistent state integration failed, removing all nodes from loader");
                  loader.remove(targetFqn);              
               }
               else
  @@ -135,6 +167,26 @@
         }
      }        
      
  +   protected TreeCache getCache()
  +   {
  +      return cache;
  +   }
  +
  +   protected NodeFactory getFactory()
  +   {
  +      return factory;
  +   }
  +
  +   protected byte getNodeType()
  +   {
  +      return nodeType;
  +   }
  +
  +   protected Fqn getTargetFqn()
  +   {
  +      return targetFqn;
  +   }     
  +   
      /**
       * Generates NodeAdded notifications for all nodes of the tree. This is
       * called whenever the tree is initially retrieved (state transfer)
  @@ -182,10 +234,10 @@
         target.removeAllChildren();
   
         // Read the first NodeData and integrate into our target     
  -      NodeData nd = (NodeData) in.readObject();
  +      NodeData nd = readNode(in);
   
         //are there any transient nodes at all?
  -      if (!nd.isMarker())
  +      if (nd != null && !nd.isMarker())
         {
            Map attrs = nd.getAttributes();
            if (attrs != null)
  @@ -208,6 +260,17 @@
         }
      }
   
  +   private NodeData readNode(ObjectInputStream in) throws IOException, ClassNotFoundException
  +   {
  +      NodeData nd = (NodeData) in.readObject();
  +      if (nd != null && nd.isExceptionMarker())
  +      {
  +         throw new CacheException("State provider cacheloader threw exception during loadState",
  +               ((NodeDataExceptionMarker) nd).getCause());
  +      }
  +      return nd;
  +   }
  +
      private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
            throws IOException, ClassNotFoundException
      {
  @@ -216,7 +279,7 @@
         Fqn fqn;
         int size;
         Object name;
  -      NodeData nd = (NodeData) in.readObject();
  +      NodeData nd = readNode(in);
         while (nd != null && !nd.isMarker())
         {
            fqn = nd.getFqn();
  @@ -322,76 +385,4 @@
            integrateRetainedNode(child, descendant);
         }
      }
  -
  -   protected TreeCache getCache()
  -   {
  -      return cache;
  -   }
  -
  -   protected NodeFactory getFactory()
  -   {
  -      return factory;
  -   }
  -
  -   protected byte getNodeType()
  -   {
  -      return nodeType;
  -   }
  -
  -   protected Fqn getTargetFqn()
  -   {
  -      return targetFqn;
  -   }
  -     
  -   public void integrateState(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
  -   {
  -      Throwable cause=null;
  -      //first try integrating transient state
  -      try
  -      {
  -         integrateTransientState(ois, target, cl);
  -      }      
  -      catch(Throwable t)
  -      {
  -         cause = t;
  -         log.error("Failed integrating transient state.",t);
  -      }
  -      
  -      //then try integrating associated state
  -      try
  -      {
  -         integrateAssociatedState(ois);
  -      }
  -      catch(Throwable t)
  -      {
  -         cause = t;
  -         log.error("Failed integrating associated state.",t);
  -      }
  -      
  -      //finally try integrating persistent
  -      try
  -      {
  -         integratePersistentState(ois);
  -      }
  -      catch (ClassCastException cce)
  -      {
  -         cause = cce;
  -         log.error("Failed integrating persistent state. One of cacheloaders is not"
  -               + " adhering to state stream format [JBCACHE-738].");
  -      }
  -      catch(Throwable t)
  -      {        
  -         cause = t;
  -         log.error("Failed integrating persistent state.", t);
  -      }      
  -      finally
  -      {
  -         ois.close();
  -         if(cause!=null)
  -         {
  -            throw new Exception("State transfer failed ");
  -         }
  -      }
  -   }
  -
   }
  
  
  



More information about the jboss-cvs-commits mailing list