[jboss-cvs] JBossCache/src/org/jboss/cache/aop/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Thu Aug 31 16:30:45 EDT 2006


  User: vblagojevic
  Date: 06/08/31 16:30:44

  Modified:    src/org/jboss/cache/aop/statetransfer      
                        PojoStateTransferIntegrator_200.java
                        PojoStateTransferFactory.java
                        PojoStateTransferManager.java
                        PojoStateTransferGenerator_200.java
  Added:       src/org/jboss/cache/aop/statetransfer      
                        StreamingPojoStateTransferIntegrator_200.java
                        StreamingPojoStateTransferGenerator_200.java
  Log:
  state transfer refactoring
  
  Revision  Changes    Path
  1.2       +28 -40    JBossCache/src/org/jboss/cache/aop/statetransfer/PojoStateTransferIntegrator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: PojoStateTransferIntegrator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/aop/statetransfer/PojoStateTransferIntegrator_200.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -b -r1.1 -r1.2
  --- PojoStateTransferIntegrator_200.java	20 Jul 2006 21:58:22 -0000	1.1
  +++ PojoStateTransferIntegrator_200.java	31 Aug 2006 20:30:44 -0000	1.2
  @@ -6,8 +6,8 @@
    */
   package org.jboss.cache.aop.statetransfer;
   
  -import java.io.ByteArrayInputStream;
   import java.io.EOFException;
  +import java.io.ObjectInputStream;
   
   import org.jboss.cache.DataNode;
   import org.jboss.cache.Fqn;
  @@ -16,7 +16,6 @@
   import org.jboss.cache.aop.InternalDelegate;
   import org.jboss.cache.factories.NodeFactory;
   import org.jboss.cache.statetransfer.StateTransferIntegrator_200;
  -import org.jboss.invocation.MarshalledValueInputStream;
   
   class PojoStateTransferIntegrator_200 extends StateTransferIntegrator_200
   {
  @@ -28,47 +27,36 @@
         super(state, targetFqn, cache);
      }
      
  -   protected void integrateAssociatedState() throws Exception
  +   protected void integrateAssociatedState(ObjectInputStream in) throws Exception
      {
  -      if (getAssociatedSize() > 0) {
            
            TreeCache cache = getCache();
            DataNode refMapNode = cache.get(InternalDelegate.JBOSS_INTERNAL_MAP);
  -
  -         ByteArrayInputStream in_stream=new ByteArrayInputStream(getState(), HEADER_LENGTH + getTransientSize(), getAssociatedSize());
  -         MarshalledValueInputStream in=new MarshalledValueInputStream(in_stream);
  -         
  -         try {
  +      try
  +      {
               Object[] nameValue;
               NodeFactory factory = getFactory();
  -            while ((nameValue = (Object[]) in.readObject()) != null) {
  +         while ((nameValue = (Object[]) in.readObject()) != null)
  +         {
                  TreeNode target = refMapNode.getChild(nameValue[0]);
                  
  -               if (target == null) {
  +            if (target == null)
  +            {
                     // Create the node
                     Fqn fqn = new Fqn(InternalDelegate.JBOSS_INTERNAL_MAP, nameValue[0]);
  -                  target = factory.createDataNode(getNodeType(), 
  -                                                  nameValue[0], 
  -                                                  fqn, 
  -                                                  refMapNode, 
  -                                                  null, 
  -                                                  true,
  -                                                  cache);
  +               target = factory.createDataNode(getNodeType(), nameValue[0], fqn, refMapNode, null, true, cache);
                     refMapNode.addChild(nameValue[0], target);
                  }
                  
                  target.put(nameValue[0], nameValue[1]);
               }
            }
  -         catch (EOFException eof) {
  +      catch (EOFException eof)
  +      {
               // all done
            }
            
            if (log.isTraceEnabled())
               log.trace("associated state successfully integrated for " + getTargetFqn());
         }
  -      else if (log.isTraceEnabled()) {
  -         log.trace("No need to integrate associated state for " + getTargetFqn());
  -      }
  -   }
   }
  
  
  
  1.2       +44 -0     JBossCache/src/org/jboss/cache/aop/statetransfer/PojoStateTransferFactory.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: PojoStateTransferFactory.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/aop/statetransfer/PojoStateTransferFactory.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -b -r1.1 -r1.2
  --- PojoStateTransferFactory.java	20 Jul 2006 21:58:22 -0000	1.1
  +++ PojoStateTransferFactory.java	31 Aug 2006 20:30:44 -0000	1.2
  @@ -8,6 +8,8 @@
   
   import java.io.ByteArrayInputStream;
   import java.io.IOException;
  +import java.io.InputStream;
  +import java.io.OutputStream;
   
   import org.jboss.cache.Fqn;
   import org.jboss.cache.TreeCache;
  @@ -21,7 +23,7 @@
    * {@link StateTransferIntegrator} instances.
    * 
    * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public abstract class PojoStateTransferFactory
   {
  @@ -95,4 +97,46 @@
            catch (IOException io) {}
         }
      }
  +   
  +   public static StateTransferGenerator getStateTransferGenerator(OutputStream os, TreeCache cache)
  +   {
  +      short version = cache.getConfiguration().getReplicationVersion();
  +
  +      // Compiler won't let me use a switch
  +
  +      if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
  +         throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  +      else
  +         return new StreamingPojoStateTransferGenerator_200(os,cache); // current default
  +   }
  +   
  +   public static StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn, TreeCache treeCache)
  +         throws Exception
  +   {
  +      short version = 0;
  +      MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
  +      try
  +      {
  +         version = in.readShort();
  +      }
  +      catch (IOException io)
  +      {
  +         // something is wrong with this stream, close it
  +         try
  +         {
  +            in.close();
  +         }
  +         catch (IOException ignored)
  +         {
  +         }
  +         throw new IllegalStateException("Stream corrupted ", io);
  +      }
  +
  +      // Compiler won't let me use a switch
  +
  +      if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
  +         throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  +      else
  +         return new StreamingPojoStateTransferIntegrator_200(in, fqn, treeCache); // current default
  +   }      
   }
  
  
  
  1.2       +12 -0     JBossCache/src/org/jboss/cache/aop/statetransfer/PojoStateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: PojoStateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/aop/statetransfer/PojoStateTransferManager.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -b -r1.1 -r1.2
  --- PojoStateTransferManager.java	20 Jul 2006 21:58:22 -0000	1.1
  +++ PojoStateTransferManager.java	31 Aug 2006 20:30:44 -0000	1.2
  @@ -1,5 +1,7 @@
   package org.jboss.cache.aop.statetransfer;
   
  +import java.io.InputStream;
  +import java.io.OutputStream;
   import java.util.Iterator;
   import java.util.Map;
   
  @@ -31,6 +33,16 @@
         return PojoStateTransferFactory.getStateTransferIntegrator(state, targetFqn, getTreeCache());
      }
   
  +   protected StateTransferGenerator getStateTransferGenerator(OutputStream os)
  +   {
  +      return PojoStateTransferFactory.getStateTransferGenerator(os,getTreeCache());
  +   } 
  +   
  +   private StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn) throws Exception
  +   {
  +      return PojoStateTransferFactory.getStateTransferIntegrator(istream, fqn, getTreeCache());
  +   }
  +
      /**
       * Overrides the superclass version by additionally acquiring locks
       * on the internal reference map nodes used for tracking shared objects.
  
  
  
  1.2       +4 -10     JBossCache/src/org/jboss/cache/aop/statetransfer/PojoStateTransferGenerator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: PojoStateTransferGenerator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/aop/statetransfer/PojoStateTransferGenerator_200.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -b -r1.1 -r1.2
  --- PojoStateTransferGenerator_200.java	20 Jul 2006 21:58:22 -0000	1.1
  +++ PojoStateTransferGenerator_200.java	31 Aug 2006 20:30:44 -0000	1.2
  @@ -6,7 +6,7 @@
    */
   package org.jboss.cache.aop.statetransfer;
   
  -import java.io.OutputStream;
  +import java.io.ObjectOutputStream;
   import java.util.Iterator;
   import java.util.Map;
   
  @@ -16,7 +16,6 @@
   import org.jboss.cache.aop.InternalDelegate;
   import org.jboss.cache.aop.util.ObjectUtil;
   import org.jboss.cache.statetransfer.StateTransferGenerator_200;
  -import org.jboss.invocation.MarshalledValueOutputStream;
   
   class PojoStateTransferGenerator_200 extends StateTransferGenerator_200
   {
  @@ -31,7 +30,7 @@
       * name and the value of its sole attribute.  Does nothing if the Fqn is the 
       * root node (i.e. "/") or if it is in the internal reference area itself.
       */
  -   protected void marshallAssociatedState(Fqn fqn, OutputStream baos) 
  +   protected void marshallAssociatedState(Fqn fqn, ObjectOutputStream out) 
            throws Exception
      {
         if (fqn == null 
  @@ -39,8 +38,6 @@
               || fqn.isChildOf(InternalDelegate.JBOSS_INTERNAL))
            return;
   
  -      MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos);
  -      
         DataNode refMapNode = getTreeCache().get(InternalDelegate.JBOSS_INTERNAL_MAP);
         
         Map children = null;
  @@ -60,8 +57,5 @@
               }
            }
         }
  -      
  -      out.close();
  -      
      }
   }
  
  
  
  1.1      date: 2006/08/31 20:30:44;  author: vblagojevic;  state: Exp;JBossCache/src/org/jboss/cache/aop/statetransfer/StreamingPojoStateTransferIntegrator_200.java
  
  Index: StreamingPojoStateTransferIntegrator_200.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   * 
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.cache.aop.statetransfer;
  
  import java.io.EOFException;
  import java.io.ObjectInputStream;
  
  import org.jboss.cache.DataNode;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.TreeCache;
  import org.jboss.cache.TreeNode;
  import org.jboss.cache.aop.InternalDelegate;
  import org.jboss.cache.factories.NodeFactory;
  import org.jboss.cache.statetransfer.StreamingStateTransferIntegrator_200;
  
  class StreamingPojoStateTransferIntegrator_200 extends StreamingStateTransferIntegrator_200
  {  
     
     protected StreamingPojoStateTransferIntegrator_200(ObjectInputStream inputStream, Fqn targetFqn, TreeCache cache)
     {
        super(inputStream, targetFqn, cache);     
     }
  
     protected void integrateAssociatedState(ObjectInputStream in) throws Exception
     {
  
        TreeCache cache = getCache();
        DataNode refMapNode = cache.get(InternalDelegate.JBOSS_INTERNAL_MAP);
        try
        {
           Object[] nameValue;
           NodeFactory factory = getFactory();
           while ((nameValue = (Object[]) in.readObject()) != null)
           {
              TreeNode target = refMapNode.getChild(nameValue[0]);
  
              if (target == null)
              {
                 // Create the node
                 Fqn fqn = new Fqn(InternalDelegate.JBOSS_INTERNAL_MAP, nameValue[0]);
                 target = factory.createDataNode(getNodeType(), nameValue[0], fqn, refMapNode, null, true, cache);
                 refMapNode.addChild(nameValue[0], target);
              }
  
              target.put(nameValue[0], nameValue[1]);
           }
        }
        catch (EOFException eof)
        {
           // all done
        }
  
        if (log.isTraceEnabled())
           log.trace("associated state successfully integrated for " + getTargetFqn());
     }        
  }
  
  
  
  1.1      date: 2006/08/31 20:30:44;  author: vblagojevic;  state: Exp;JBossCache/src/org/jboss/cache/aop/statetransfer/StreamingPojoStateTransferGenerator_200.java
  
  Index: StreamingPojoStateTransferGenerator_200.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   * 
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.cache.aop.statetransfer;
  
  import java.io.ObjectOutputStream;
  import java.io.OutputStream;
  import java.util.Iterator;
  import java.util.Map;
  
  import org.jboss.cache.DataNode;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.TreeCache;
  import org.jboss.cache.aop.InternalDelegate;
  import org.jboss.cache.aop.util.ObjectUtil;
  import org.jboss.cache.statetransfer.StreamingStateTransferGenerator_200;
  
  class StreamingPojoStateTransferGenerator_200 extends StreamingStateTransferGenerator_200
  {  
     
     protected StreamingPojoStateTransferGenerator_200(OutputStream os, TreeCache cache)
     {
        super(os, cache);     
     }
  
     /**
      * For each node in the internal reference map that is associated with the 
      * given Fqn, writes an Object[] to the stream containing the node's
      * name and the value of its sole attribute.  Does nothing if the Fqn is the 
      * root node (i.e. "/") or if it is in the internal reference area itself.
      */
     protected void marshallAssociatedState(Fqn fqn, ObjectOutputStream out) 
           throws Exception
     {
        if (fqn == null 
              || fqn.size() == 0 
              || fqn.isChildOf(InternalDelegate.JBOSS_INTERNAL))
           return;     
        
        DataNode refMapNode = getTreeCache().get(InternalDelegate.JBOSS_INTERNAL_MAP);
        
        Map children = null;
        if (refMapNode != null && (children = refMapNode.getChildren()) != null) {
           
           String targetFqn = ObjectUtil.getIndirectFqn(fqn.toString());
           
           Map.Entry entry;
           String key;
           DataNode value;
           for (Iterator iter = children.entrySet().iterator(); iter.hasNext();) {
              entry = (Map.Entry) iter.next();
              key = (String) entry.getKey();
              if (key.startsWith(targetFqn)) {
                 value = (DataNode) entry.getValue();
                 out.writeObject(new Object[] { key, value.get(key) });
              }
           }
        }          
     }
  }
  
  
  



More information about the jboss-cvs-commits mailing list