[jboss-cvs] JBossCache/src/org/jboss/cache ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Wed Dec 20 17:28:13 EST 2006


  User: vblagojevic
  Date: 06/12/20 17:28:13

  Modified:    src/org/jboss/cache  TreeCache.java
  Log:
  final state transfer refactoring
  
  Revision  Changes    Path
  1.298     +127 -72   JBossCache/src/org/jboss/cache/TreeCache.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TreeCache.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCache.java,v
  retrieving revision 1.297
  retrieving revision 1.298
  diff -u -b -r1.297 -r1.298
  --- TreeCache.java	20 Dec 2006 16:04:02 -0000	1.297
  +++ TreeCache.java	20 Dec 2006 22:28:13 -0000	1.298
  @@ -39,6 +39,7 @@
   import org.jboss.cache.notifications.Notifier;
   import org.jboss.cache.optimistic.DataVersion;
   import org.jboss.cache.statetransfer.StateTransferManager;
  +import org.jboss.cache.util.ExposedByteArrayOutputStream;
   import org.jboss.cache.util.MapCopy;
   import org.jboss.util.stream.MarshalledValueInputStream;
   import org.jboss.util.stream.MarshalledValueOutputStream;
  @@ -58,6 +59,7 @@
   import org.jgroups.stack.IpAddress;
   import org.jgroups.util.Rsp;
   import org.jgroups.util.RspList;
  +import org.jgroups.util.Util;
   import org.w3c.dom.Element;
   
   import javax.management.MBeanServer;
  @@ -66,6 +68,8 @@
   import javax.transaction.SystemException;
   import javax.transaction.Transaction;
   import javax.transaction.TransactionManager;
  +
  +import java.io.ByteArrayInputStream;
   import java.io.ByteArrayOutputStream;
   import java.io.IOException;
   import java.io.InputStream;
  @@ -95,7 +99,7 @@
    * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
    * @author Brian Stansberry
    * @author Daniel Huang (dhuang at jboss.org)
  - * @version $Id: TreeCache.java,v 1.297 2006/12/20 16:04:02 msurtani Exp $
  + * @version $Id: TreeCache.java,v 1.298 2006/12/20 22:28:13 vblagojevic Exp $
    *          <p/>
    * @see <a href="http://labs.jboss.com/portal/jbosscache/docs">JBossCache doc</a>
    */
  @@ -1136,15 +1140,30 @@
       * @param suppressErrors should any Throwable thrown be suppressed?
       * @return a serialized byte[][], element 0 is the transient state
       *         (or null), and element 1 is the persistent state (or null).
  +    *         
  +    *         
  +    *TODO here only because of BuddyManager state transfer  
  +    *Consider for removal if BuddyManager transfer changes      
  +    *         
       */
  -   public byte[] _getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
  +   public byte[] generateState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
      {
  -      return getStateTransferManager().getState(fqn, timeout, force, suppressErrors);
  -   }
   
  -   public void _getState(OutputStream os, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
  +      MarshalledValueOutputStream out = null;
  +      byte[] result = null;
  +      try
  +      {
  +         ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
  +         out = new MarshalledValueOutputStream(baos);
  +         getStateTransferManager().getState(out, fqn, timeout, force, suppressErrors);
  +         result = baos.getRawBuffer();
  +      }
  +      finally
      {
  -      getStateTransferManager().getState(os, fqn, timeout, force, suppressErrors);
  +         Util.close(out);
  +      }
  +
  +      return result;
      }
   
      private void removeLocksForDeadMembers(Node node,
  @@ -3248,25 +3267,17 @@
            }
         }
   
  -      /**
  -       * Returns a copy of the current cache (tree). It actually returns a 2
  -       * element array of byte[], element 0 being the transient state (or null)
  -       * and element 1 being the persistent state (or null)
  -       */
         public byte[] getState()
         {
  +         MarshalledValueOutputStream out = null;
  +         byte[] result = null;
            try
            {
  -            //            // We use the lock acquisition timeout rather than the
  -            //            // state transfer timeout, otherwise we'd never try
  -            //            // to break locks before the requesting node gives up
  -            //            return cache._getState(Fqn.fromString(SEPARATOR),
  -            //               cache.getLockAcquisitionTimeout(),
  -            //               true,
  -            //               true);
  -            // Until flush is in place, use the old mechanism
  -            // where we wait the full state retrieval timeout
  -            return _getState(Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true);
  +            ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
  +            out = new MarshalledValueOutputStream(baos);
  +
  +            getStateTransferManager().getState(out,Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true);
  +            result = baos.getRawBuffer();
            }
            catch (Throwable t)
            {
  @@ -3275,8 +3286,12 @@
               my_log.error("Caught " + t.getClass().getName() +
                       " while responding to initial state transfer request;" +
                       " returning null", t);
  -            return null;
            }
  +         finally
  +         {
  +            Util.close(out);
  +         }
  +         return result;
         }
   
         public void setState(byte[] new_state)
  @@ -3286,9 +3301,16 @@
               my_log.debug("transferred state is null (may be first member in cluster)");
               return;
            }
  +         ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
  +         MarshalledValueInputStream in = null;
            try
            {
  -            getStateTransferManager().setState(new_state, Fqn.ROOT, null);
  +            in = new MarshalledValueInputStream(bais);
  +            boolean hasState = in.readBoolean();
  +            if(hasState)
  +            {
  +               getStateTransferManager().setState(in, Fqn.ROOT, null);
  +            }
               isStateSet = true;
            }
            catch (Throwable t)
  @@ -3305,6 +3327,7 @@
            }
            finally
            {
  +            Util.close(in);
               synchronized (stateLock)
               {
                  // Notify wait that state has been set.
  @@ -3315,15 +3338,24 @@
   
         public byte[] getState(String state_id)
         {
  +         MarshalledValueOutputStream out = null;
            String sourceRoot = state_id;
  +         byte[] result = null;
  +         
            boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0;
            if (hasDifferentSourceAndIntegrationRoots)
            {
               sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[0];
            }
  +         
            try
            {
  -            return _getState(Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true);
  +            ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
  +            out = new MarshalledValueOutputStream(baos);
  +
  +            getStateTransferManager().getState(out, Fqn.fromString(sourceRoot),
  +                  configuration.getInitialStateRetrievalTimeout(), true, true);
  +            result = baos.getRawBuffer();
            }
            catch (Throwable t)
            {
  @@ -3332,15 +3364,21 @@
               my_log.error("Caught " + t.getClass().getName() +
                       " while responding to partial state transfer request;" +
                       " returning null", t);
  -            return null;
            }
  +         finally
  +         {
  +            Util.close(out);
  +         }
  +         return result;
         }
   
         public void getState(OutputStream ostream)
         {
  +         MarshalledValueOutputStream out = null;
            try
            {
  -            _getState(ostream, Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true);
  +            out = new MarshalledValueOutputStream(ostream);            
  +            getStateTransferManager().getState(out, Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true);
            }
            catch (Throwable t)
            {
  @@ -3350,11 +3388,16 @@
                       " while responding to initial state transfer request;" +
                       " returning null", t);
            }
  +         finally
  +         {
  +            Util.close(out);
  +         }
         }
   
         public void getState(String state_id, OutputStream ostream)
         {
            String sourceRoot = state_id;
  +         MarshalledValueOutputStream out = null;
            boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0;
            if (hasDifferentSourceAndIntegrationRoots)
            {
  @@ -3362,7 +3405,8 @@
            }
            try
            {
  -            _getState(ostream, Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true);
  +            out = new MarshalledValueOutputStream(ostream);       
  +            getStateTransferManager().getState(out, Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true);
            }
            catch (Throwable t)
            {
  @@ -3372,6 +3416,10 @@
                       " while responding to partial state transfer request;" +
                       " returning null", t);
            }
  +         finally
  +         {
  +            Util.close(out);
  +         }
         }
   
         public void setState(InputStream istream)
  @@ -3381,19 +3429,16 @@
               my_log.debug("stream is null (may be first member in cluster)");
               return;
            }
  +         MarshalledValueInputStream in = null;
            try
            {
  -            MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
  +            in = new MarshalledValueInputStream(istream);   
               boolean hasState = in.readBoolean();
  -            if (!hasState)
  -            {
  -               in.close();
  -            }
  -            else
  +            if(hasState)
               {
                  getStateTransferManager().setState(in, Fqn.ROOT, null);
  -               isStateSet = true;
               }
  +            isStateSet = true;           
            }
            catch (Throwable t)
            {
  @@ -3409,6 +3454,7 @@
            }
            finally
            {
  +            Util.close(in);
               synchronized (stateLock)
               {
                  // Notify wait that state has been set.
  @@ -3419,6 +3465,13 @@
   
         public void setState(String state_id, byte[] state)
         {
  +         if (state == null)
  +         {
  +            my_log.debug("partial transferred state is null");
  +            return;
  +         }
  +         
  +         MarshalledValueInputStream in = null;
            String targetRoot = state_id;
            boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0;
            if (hasDifferentSourceAndIntegrationRoots)
  @@ -3428,8 +3481,6 @@
            try
            {
   
  -            if (state != null)
  -            {
                  my_log.debug("Setting received partial state for subroot " + state_id);
                  Fqn subroot = Fqn.fromString(targetRoot);
                  Region region = regionManager.getRegion(subroot, false);
  @@ -3439,9 +3490,14 @@
                     // If a classloader is registered for the node's region, use it
                     cl = region.getClassLoader();
                  }
  -               getStateTransferManager().setState(state, subroot, cl);
  -               isStateSet = true;
  +            ByteArrayInputStream bais = new ByteArrayInputStream(state);
  +            in = new MarshalledValueInputStream(bais);
  +            boolean hasState = in.readBoolean();
  +            if(hasState)
  +            {
  +               getStateTransferManager().setState(in, subroot, cl);
               }
  +            isStateSet = true;
            }
            catch (Throwable t)
            {
  @@ -3457,6 +3513,7 @@
            }
            finally
            {
  +            Util.close(in);
               synchronized (stateLock)
               {
                  // Notify wait that state has been set.
  @@ -3468,6 +3525,7 @@
         public void setState(String state_id, InputStream istream)
         {
            String targetRoot = state_id;
  +         MarshalledValueInputStream in = null;
            boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0;
            if (hasDifferentSourceAndIntegrationRoots)
            {
  @@ -3481,15 +3539,8 @@
   
            try
            {
  -            MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
  -            boolean hasState = in.readBoolean();
  -            if (!hasState)
  -            {
  -               in.close();
  -            }
  -            else
  -            {
                  my_log.debug("Setting received partial state for subroot " + state_id);
  +            in = new MarshalledValueInputStream(istream);                      
                  Fqn subroot = Fqn.fromString(targetRoot);
                  Region region = regionManager.getRegion(subroot, false);
                  ClassLoader cl = null;
  @@ -3498,9 +3549,12 @@
                     // If a classloader is registered for the node's region, use it
                     cl = region.getClassLoader();
                  }
  -               getStateTransferManager().setState(in, Fqn.fromString(state_id), cl);
  -               isStateSet = true;
  +            boolean hasState = in.readBoolean();
  +            if(hasState)
  +            {
  +               getStateTransferManager().setState(in,subroot, cl);
               }
  +            isStateSet = true;           
            }
            catch (Throwable t)
            {
  @@ -3516,6 +3570,7 @@
            }
            finally
            {
  +            Util.close(in);
               synchronized (stateLock)
               {
                  // Notify wait that state has been set.
  
  
  



More information about the jboss-cvs-commits mailing list