[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Thu Aug 31 10:56:46 EDT 2006


  User: vblagojevic
  Date: 06/08/31 10:56:46

  Modified:    src/org/jboss/cache/statetransfer      
                        StateTransferIntegrator_200.java
                        StateTransferFactory.java
                        StateTransferGenerator_200.java
                        StreamingStateTransferGenerator_200.java
                        StateTransferManager.java
                        StreamingStateTransferIntegrator_200.java
  Log:
  CacheLoader API change (remove byte based load/store)
  
  Revision  Changes    Path
  1.5       +20 -15    JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferIntegrator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -b -r1.4 -r1.5
  --- StateTransferIntegrator_200.java	20 Jul 2006 21:58:21 -0000	1.4
  +++ StateTransferIntegrator_200.java	31 Aug 2006 14:56:46 -0000	1.5
  @@ -127,25 +127,30 @@
      
      public void integratePersistentState() throws Exception
      {
  -      if(persistentSize > 0) {
  +      if (persistentSize > 0)
  +      {
            CacheLoader loader = cache.getCacheLoader();
  -         if(loader == null) {
  +         if (loader == null)
  +         {
               log.error("cache loader is null, cannot set persistent state");
            }
  -         else if (targetFqn.size() == 0){
  -            if (log.isTraceEnabled())
  -               log.trace("setting the persistent state");
  -            byte[] persistentState = getPersistentState();
  -            loader.storeEntireState(persistentState);
  +         else
  +         {
  +            ByteArrayInputStream in_stream = new ByteArrayInputStream(getPersistentState());
  +            MarshalledValueInputStream in = new MarshalledValueInputStream(in_stream);
  +
               if (log.isTraceEnabled())
  -               log.trace("setting the persistent state was successful");
  +               log.trace("setting the persistent state with " + loader.getClass());
  +
  +            if (targetFqn.isRoot())
  +            {
  +               loader.storeEntireState(in);
            }
  -         else {
  -            if (log.isTraceEnabled())
  -               log.trace("setting the persistent state");
  -            // cache_loader.remove(Fqn.fromString("/"));
  -            byte[] persistentState = getPersistentState();
  -            loader.storeState(persistentState, targetFqn);
  +            else
  +            {
  +               loader.storeState(targetFqn, in);
  +            }
  +
               if (log.isTraceEnabled())
                  log.trace("setting the persistent state was successful");
            }            
  
  
  
  1.10      +8 -3      JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferFactory.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -b -r1.9 -r1.10
  --- StateTransferFactory.java	24 Aug 2006 22:05:34 -0000	1.9
  +++ StateTransferFactory.java	31 Aug 2006 14:56:46 -0000	1.10
  @@ -21,7 +21,7 @@
    * {@link StateTransferIntegrator} instances.
    * 
    * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
  - * @version $Revision: 1.9 $
  + * @version $Revision: 1.10 $
    */
   public abstract class StateTransferFactory
   {
  @@ -119,8 +119,13 @@
         }
         catch (IOException io)
         {
  -         // No short at the head of the stream means version 123
  -         throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
  +         // something is wrong with this stream, close it
  +         try
  +         {
  +            in.close();
  +         }
  +         catch(IOException ignored){}
  +         throw new IllegalStateException("Stream corrupted ",io);                         
         }
   
         // Compiler won't let me use a switch
  
  
  
  1.5       +43 -19    JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferGenerator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -b -r1.4 -r1.5
  --- StateTransferGenerator_200.java	22 Aug 2006 20:37:53 -0000	1.4
  +++ StateTransferGenerator_200.java	31 Aug 2006 14:56:46 -0000	1.5
  @@ -16,6 +16,7 @@
   import org.jboss.cache.util.ExposedByteArrayOutputStream;
   import org.jboss.invocation.MarshalledValueOutputStream;
   
  +import java.io.ByteArrayOutputStream;
   import java.io.IOException;
   import java.io.ObjectOutputStream;
   import java.io.OutputStream;
  @@ -28,6 +29,10 @@
      public static final short STATE_TRANSFER_VERSION = 
         Version.getVersionShort("2.0.0.GA");
      
  +   //whenever we wrap stream A with object based stream B,B writes a few bytes 
  +   //of a stream header to underlying stream A   
  +   public static final int OBJECT_STREAM_MARKER_LENGTH =4;
  +   
      private Log log = LogFactory.getLog(getClass().getName());
      
      private TreeCache cache;
  @@ -109,32 +114,51 @@
            }
         }
         
  -      if (generatePersistent) {
  -         try {
  -            if (debug)
  -               log.debug("getting the persistent state");
  +      if (generatePersistent)
  +      {
  +         ByteArrayOutputStream out_stream = new ByteArrayOutputStream(1024);
  +         ObjectOutputStream out = new MarshalledValueOutputStream(out_stream);
               byte[] persState = null;
  -            if (fqn.size() == 0)
  -               persState = cache.getCacheLoader().loadEntireState();
  -            else
  -               persState = cache.getCacheLoader().loadState(fqn);
  -            
  -            if (persState != null) {
  -               sizes[2] = persState.length;
  -               baos.write(persState);
  +         boolean persistentStateProvidedOk = false;
  +         try
  +         {
  +            if (debug)
  +               log.debug("getting the persistent state from cacheloader " + cache.getCacheLoader().getClass());
  +            if (fqn.isRoot())
  +            {
  +               cache.getCacheLoader().loadEntireState(out);
               }
  -            
  -            if (debug) {
  -               log.debug("generated the persistent state (" + sizes[2] + 
  -                         " bytes)");
  +            else
  +            {
  +               cache.getCacheLoader().loadState(fqn, out);
               }
  +            persistentStateProvidedOk=true;            
            }
  -         catch(Throwable t) {
  -            log.error("failed getting the persistent state", t);
  +         catch (Throwable t)
  +         {
  +            log.error("cacheloader failed while getting the persistent state", t);
               if (!suppressErrors)
                  throw t;
  +         }
  +         finally
  +         {
  +            out.close();
  +            persState = out_stream.toByteArray();
  +            if (persistentStateProvidedOk && persState.length > OBJECT_STREAM_MARKER_LENGTH)
  +            {
  +               sizes[2] = persState.length;
  +               baos.write(persState);
  +            }
  +            else
  +            {
               sizes[2] = 0;
            }
  +
  +            if (debug)
  +            {
  +               log.debug("generated the persistent state (" + sizes[2] + " bytes)");
  +            }
  +         }
         }
      
         // Overwrite the placeholders used for the sizes of the state transfer
  
  
  
  1.2       +45 -15    JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StreamingStateTransferGenerator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -b -r1.1 -r1.2
  --- StreamingStateTransferGenerator_200.java	24 Aug 2006 22:05:34 -0000	1.1
  +++ StreamingStateTransferGenerator_200.java	31 Aug 2006 14:56:46 -0000	1.2
  @@ -48,17 +48,26 @@
   
         try
         {
  -         out.writeShort(STATE_TRANSFER_VERSION);
            try
            {
  +            out.writeShort(STATE_TRANSFER_VERSION);
               if (generateTransient)
               {
  +               if (log.isTraceEnabled())
  +                  log.trace("writing transient state for " + fqn);
  +
                  marshallTransientState(rootNode, out);
  -               log.debug("generated the in-memory state");
   
  -               // Return any state associated with the subtree but not stored in it
  +               if (log.isTraceEnabled())
  +                  log.trace("transient state succesfully written");
  +
  +               if (log.isTraceEnabled())
  +                  log.trace("writing associated state");
  +
                  marshallAssociatedState(fqn, os);
  -               log.debug("returning the associated state bytes)");
  +
  +               if (log.isTraceEnabled())
  +                  log.trace("associated state succesfully written");
               }
            }
            catch (Throwable t)
  @@ -69,15 +78,30 @@
            }
            finally
            {
  -            log.debug("writing delimeter after transient state");
  +            if (log.isTraceEnabled())
  +               log.trace("writing delimeter after transient state");
  +
               out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
            }
  -
  +         try
  +         {
            if (generatePersistent)
            {
  -            try
  +               if (log.isTraceEnabled())
  +                  log.trace("writing persistent state for " + fqn);
  +
  +               if (fqn.isRoot())
  +               {
  +                  cache.getCacheLoader().loadEntireState(out);
  +               }
  +               else
               {
  -               //TODO
  +                  cache.getCacheLoader().loadState(fqn, out);
  +               }
  +
  +               if (log.isTraceEnabled())
  +                  log.trace("persistent state succesfully written");
  +            }
               }
               catch (Throwable t)
               {
  @@ -85,6 +109,12 @@
                  if (!suppressErrors)
                     throw t;
               }
  +         finally
  +         {
  +            if (log.isTraceEnabled())
  +               log.trace("writing delimeter after persistent state");
  +
  +            out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
            }
         }
         finally
  
  
  
  1.7       +74 -66    JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -b -r1.6 -r1.7
  --- StateTransferManager.java	30 Aug 2006 17:08:18 -0000	1.6
  +++ StateTransferManager.java	31 Aug 2006 14:56:46 -0000	1.7
  @@ -1,5 +1,12 @@
   package org.jboss.cache.statetransfer;
   
  +import java.io.InputStream;
  +import java.io.OutputStream;
  +import java.util.Iterator;
  +import java.util.List;
  +import java.util.Map;
  +import java.util.Vector;
  +
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.CacheException;
  @@ -9,27 +16,21 @@
   import org.jboss.cache.config.Option;
   import org.jboss.cache.loader.CacheLoaderManager;
   import org.jboss.cache.loader.NodeData;
  +import org.jboss.cache.loader.NodeDataMarker;
   import org.jboss.cache.lock.TimeoutException;
  -import org.jboss.cache.marshall.MethodCall;
   import org.jboss.cache.marshall.MethodCallFactory;
   import org.jboss.cache.marshall.MethodDeclarations;
   import org.jboss.cache.marshall.VersionAwareMarshaller;
  -
  -import java.io.InputStream;
  -import java.io.OutputStream;
  -import java.util.Iterator;
  -import java.util.List;
  -import java.util.Map;
  -import java.util.Vector;
  +import org.jboss.cache.marshall.MethodCall;
   
   public class StateTransferManager
   {
      protected final static Log log = LogFactory.getLog(StateTransferManager.class);
   
  -   public static final NodeData STREAMING_DELIMETER_NODE = new NodeData(Fqn.fromString("STREAMING_DELIMETER_NODE"), null);
  +   public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
   
      private TreeCache treeCache;
  -   private long[] loadStateTimeouts = {400, 800, 1200};
  +   private long[] loadStateTimeouts =  { 400, 800, 1200 };
   
      public StateTransferManager(TreeCache cache)
      {
  @@ -78,9 +79,9 @@
       *                                       enabled, the requested Fqn is not the root node, and the
       *                                       cache loader does not implement {@link ExtendedCacheLoader}.
       */
  -   public byte[] getState(OutputStream os, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
  +   public byte[] getState(OutputStream os,Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
      {
  -      boolean usingStreamingStateTransfer = os != null;
  +      boolean usingStreamingStateTransfer = os!=null;
         TreeCache cache = getTreeCache();
   
         VersionAwareMarshaller marshaller_ = null;
  @@ -149,7 +150,7 @@
   
      public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
      {
  -      return getState(null, fqn, timeout, force, suppressErrors);
  +      return getState(null,fqn,timeout,force,suppressErrors);
      }
   
      /**
  @@ -329,7 +330,7 @@
   
         byte [] new_state = null;
         InputStream istream = null;
  -      if (state instanceof byte[])
  +      if(state instanceof byte[])
         {
            new_state = (byte[]) state;
            log.info("received the state (size=" + new_state.length + " bytes)");
  @@ -347,10 +348,10 @@
                    getTreeCache().getConfiguration().getInitialStateRetrievalTimeout(),
                    true, true);
   
  -         StateTransferIntegrator integrator = null;
  -         if (new_state != null)
  +         StateTransferIntegrator integrator =null;
  +         if(new_state!=null)
            {
  -            integrator = getStateTransferIntegrator(new_state, targetRoot.getFqn());
  +            integrator = getStateTransferIntegrator(new_state,targetRoot.getFqn());
            }
            else
            {
  @@ -374,6 +375,13 @@
         }
         finally
         {
  +         if(istream!=null)
  +         {
  +            try
  +            {
  +               istream.close();
  +            }catch(Exception ignored){}
  +         }
            releaseStateTransferLocks(targetRoot, owner, true);
         }
   
  @@ -444,7 +452,7 @@
   
      protected StateTransferGenerator getStateTransferGenerator(OutputStream os)
      {
  -      return StateTransferFactory.getStateTransferGenerator(os, getTreeCache());
  +      return StateTransferFactory.getStateTransferGenerator(os,getTreeCache());
      }
   
      protected StateTransferIntegrator getStateTransferIntegrator(byte[] state, Fqn targetFqn)
  
  
  
  1.2       +24 -3     JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StreamingStateTransferIntegrator_200.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -b -r1.1 -r1.2
  --- StreamingStateTransferIntegrator_200.java	24 Aug 2006 22:05:34 -0000	1.1
  +++ StreamingStateTransferIntegrator_200.java	31 Aug 2006 14:56:46 -0000	1.2
  @@ -20,6 +20,7 @@
   import org.jboss.cache.TreeCache;
   import org.jboss.cache.buddyreplication.BuddyManager;
   import org.jboss.cache.factories.NodeFactory;
  +import org.jboss.cache.loader.CacheLoader;
   import org.jboss.cache.loader.NodeData;
   
   public class StreamingStateTransferIntegrator_200 implements StateTransferIntegrator
  @@ -104,7 +105,27 @@
   
         try
         {
  -         //TODO
  +         CacheLoader loader = cache.getCacheLoader();
  +         if (loader == null)
  +         {
  +            log.error("cache loader is null, cannot set persistent state");
  +         }
  +         else if (targetFqn.isRoot())
  +         {
  +            if (log.isTraceEnabled())
  +               log.trace("setting the persistent state");
  +            loader.storeEntireState(in);
  +            if (log.isTraceEnabled())
  +               log.trace("setting the persistent state was successful");
  +         }
  +         else
  +         {
  +            if (log.isTraceEnabled())
  +               log.trace("setting partial persistent state at " + targetFqn);
  +            loader.storeState(targetFqn, in);
  +            if (log.isTraceEnabled())
  +               log.trace("setting partial persistent state was successful");
  +         }
         }
         finally
         {
  @@ -140,7 +161,7 @@
         NodeData nd = (NodeData) in.readObject();
   
         //are there any transient nodes at all?
  -      if (!StateTransferManager.STREAMING_DELIMETER_NODE.getFqn().equals(nd.getFqn()))
  +      if (!nd.isMarker())
         {
            Map attrs = nd.getAttributes();
            if (attrs != null)
  @@ -172,7 +193,7 @@
         int size;
         Object name;
         NodeData nd = (NodeData) in.readObject();
  -      while (nd != null && !StateTransferManager.STREAMING_DELIMETER_NODE.getFqn().equals(nd.getFqn()))
  +      while (nd != null && !nd.isMarker())
         {
            fqn = nd.getFqn();
            // If we need to integrate into the buddy backup subtree,
  
  
  



More information about the jboss-cvs-commits mailing list