[jboss-cvs] JBossCache/src/org/jboss/cache ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Thu Dec 21 16:41:10 EST 2006


  User: vblagojevic
  Date: 06/12/21 16:41:10

  Modified:    src/org/jboss/cache  TreeCache.java
  Log:
  final state transfer polishing (all tests pass now)
  
  Revision  Changes    Path
  1.300     +86 -107   JBossCache/src/org/jboss/cache/TreeCache.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TreeCache.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCache.java,v
  retrieving revision 1.299
  retrieving revision 1.300
  diff -u -b -r1.299 -r1.300
  --- TreeCache.java	21 Dec 2006 05:34:03 -0000	1.299
  +++ TreeCache.java	21 Dec 2006 21:41:10 -0000	1.300
  @@ -99,7 +99,7 @@
    * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
    * @author Brian Stansberry
    * @author Daniel Huang (dhuang at jboss.org)
  - * @version $Id: TreeCache.java,v 1.299 2006/12/21 05:34:03 bstansberry Exp $
  + * @version $Id: TreeCache.java,v 1.300 2006/12/21 21:41:10 vblagojevic Exp $
    *          <p/>
    * @see <a href="http://labs.jboss.com/portal/jbosscache/docs">JBossCache doc</a>
    */
  @@ -151,7 +151,7 @@
      /**
       * JGroups message listener.
       */
  -   protected MessageListenerAdaptor ml = new MessageListenerAdaptor(log);
  +   protected MessageListenerAdaptor ml = new MessageListenerAdaptor();
   
      /**
       * Maintains mapping of transactions (keys) and Modifications/Undo-Operations
  @@ -172,7 +172,7 @@
      /**
       * True if state was initialized during start-up.
       */
  -   protected boolean isStateSet = false;
  +   protected volatile boolean isStateSet = false;
   
      protected String evictionInterceptorClass = "org.jboss.cache.interceptors.EvictionInterceptor";
   
  @@ -580,7 +580,19 @@
         {
            Address target = (Address) iter.next();
            log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
  +         isStateSet = false;
            successfulTransfer = channel.getState(target, stateId, stateFetchTimeout);
  +         if (successfulTransfer)
  +         {
  +            try
  +            {
  +               ml.waitForState();
  +            }
  +            catch(Exception transferFailed)
  +            {
  +               successfulTransfer = false;
  +            }
  +         }
            log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
         }
   
  @@ -3217,22 +3229,15 @@
         node.getNodeSPI().setFqn(newFqn);
      }
   
  -   class MessageListenerAdaptor implements ExtendedMessageListener
  +   protected class MessageListenerAdaptor implements ExtendedMessageListener
      {
  -      final Log my_log;// Need this to run under jdk1.3
  -      final boolean trace;
  -
         /**
          * An exception occuring upon fetch state.
          */
  -      protected Exception setStateException;
  +      protected volatile Exception setStateException;
         private final Object stateLock = new Object();
   
  -      MessageListenerAdaptor(Log log)
  -      {
  -         this.my_log = log;
  -         this.trace = my_log.isTraceEnabled();
  -      }
  +      protected MessageListenerAdaptor(){}
   
         public void waitForState() throws Exception
         {
  @@ -3256,14 +3261,53 @@
            }
         }
   
  +      protected void stateReceivedSuccess()
  +      {         
  +         isStateSet = true;
  +         setStateException = null;        
  +      }
  +      
  +      protected void stateReceivingFailed(Throwable t)
  +      {         
  +         if(t instanceof CacheException)
  +         {
  +            log.debug(t);
  +         }
  +         else
  +         {
  +            log.error("failed setting state", t);
  +         }
  +         if (t instanceof Exception)
  +         {
  +            setStateException = (Exception) t;
  +         }
  +         else
  +         {
  +            setStateException = new Exception(t);
  +         }
  +      }
  +      
  +      protected void stateProducingFailed(Throwable t)
  +      {
  +         if (t instanceof CacheException)
  +         {
  +            log.debug(t);
  +         }
  +         else
  +         {
  +            log.error("Caught " + t.getClass().getName()
  +                  + " while responding to partial byte based state transfer request", t);
  +         }
  +      }
  +
         /**
          * Callback, does nothing.
          */
         public void receive(Message msg)
         {
  -         if (trace)
  +         if (log.isTraceEnabled())
            {
  -            my_log.trace("Received message " + msg);
  +            log.trace("Received message " + msg);
            }
         }
         
  @@ -3281,11 +3325,7 @@
            }
            catch (Throwable t)
            {
  -            // This shouldn't happen as we set "suppressErrors" to true,
  -            // but we have to catch the Throwable declared in the method sig
  -            my_log.error("Caught " + t.getClass().getName() +
  -                    " while responding to initial state transfer request;" +
  -                    " returning null", t);            
  +            stateProducingFailed(t); 
            }
            finally
            {
  @@ -3298,7 +3338,7 @@
         {
            if (new_state == null)
            {
  -            my_log.debug("transferred state is null (may be first member in cluster)");
  +            log.debug("transferred state is null (may be first member in cluster)");
               return;
            }
            ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
  @@ -3306,24 +3346,12 @@
            try
            {                               
               in = new MarshalledValueInputStream(bais);
  -            boolean hasState = in.readBoolean();
  -            if(hasState)
  -            {
                  getStateTransferManager().setState(in, Fqn.ROOT, null);
  -            }
  -            isStateSet = true;           
  +            stateReceivedSuccess();           
            }
            catch (Throwable t)
            {
  -            my_log.error("failed setting state", t);
  -            if (t instanceof Exception)
  -            {
  -               setStateException = (Exception) t;
  -            }
  -            else
  -            {
  -               setStateException = new Exception(t);
  -            }
  +            stateReceivingFailed(t);
            }
            finally
            {
  @@ -3359,11 +3387,7 @@
            }
            catch (Throwable t)
            {
  -            // This shouldn't happen as we set "suppressErrors" to true,
  -            // but we have to catch the Throwable declared in the method sig
  -            my_log.error("Caught " + t.getClass().getName() +
  -                    " while responding to partial state transfer request;" +
  -                    " returning null", t);            
  +            stateProducingFailed(t); 
            }
            finally
            {
  @@ -3382,11 +3406,7 @@
            }
            catch (Throwable t)
            {
  -            // This shouldn't happen as we set "suppressErrors" to true,
  -            // but we have to catch the Throwable declared in the method sig
  -            my_log.error("Caught " + t.getClass().getName() +
  -                    " while responding to initial state transfer request;" +
  -                    " returning null", t);
  +            stateProducingFailed(t); 
            }
            finally
            {
  @@ -3410,11 +3430,7 @@
            }
            catch (Throwable t)
            {
  -            // This shouldn't happen as we set "suppressErrors" to true,
  -            // but we have to catch the Throwable declared in the method sig
  -            my_log.error("Caught " + t.getClass().getName() +
  -                    " while responding to partial state transfer request;" +
  -                    " returning null", t);
  +            stateProducingFailed(t); 
            }
            finally
            {
  @@ -3426,31 +3442,19 @@
         {
            if (istream == null)
            {
  -            my_log.debug("stream is null (may be first member in cluster)");
  +            log.debug("stream is null (may be first member in cluster)");
               return;
            }
            MarshalledValueInputStream in = null;
            try
            {
               in = new MarshalledValueInputStream(istream);   
  -            boolean hasState = in.readBoolean();
  -            if(hasState)
  -            {
                  getStateTransferManager().setState(in, Fqn.ROOT, null);
  -            }
  -            isStateSet = true;           
  +            stateReceivedSuccess();          
            }
            catch (Throwable t)
            {
  -            my_log.error("failed setting state", t);
  -            if (t instanceof Exception)
  -            {
  -               setStateException = (Exception) t;
  -            }
  -            else
  -            {
  -               setStateException = new Exception(t);
  -            }
  +            stateReceivingFailed(t);
            }
            finally
            {
  @@ -3467,7 +3471,7 @@
         {
            if (state == null)
            {
  -            my_log.debug("partial transferred state is null");
  +            log.debug("partial transferred state is null");
               return;
            }
            
  @@ -3480,8 +3484,7 @@
            }
            try
            {
  -
  -            my_log.debug("Setting received partial state for subroot " + state_id);
  +            log.debug("Setting received partial state for subroot " + state_id);
               Fqn subroot = Fqn.fromString(targetRoot);
               Region region = regionManager.getRegion(subroot, false);
               ClassLoader cl = null;
  @@ -3492,24 +3495,12 @@
               }
               ByteArrayInputStream bais = new ByteArrayInputStream(state);
               in = new MarshalledValueInputStream(bais);
  -            boolean hasState = in.readBoolean();
  -            if(hasState)
  -            {
                  getStateTransferManager().setState(in, subroot, cl);
  -            }
  -            isStateSet = true;
  +            stateReceivedSuccess();
            }
            catch (Throwable t)
            {
  -            my_log.error("failed setting state", t);
  -            if (t instanceof Exception)
  -            {
  -               setStateException = (Exception) t;
  -            }
  -            else
  -            {
  -               setStateException = new Exception(t);
  -            }
  +            stateReceivingFailed(t);
            }
            finally
            {
  @@ -3533,13 +3524,13 @@
            }
            if (istream == null)
            {
  -            my_log.debug("stream is null (may be first member in cluster). State is not set");
  +            log.debug("stream is null (may be first member in cluster). State is not set");
               return;
            }
   
            try
            {
  -            my_log.debug("Setting received partial state for subroot " + state_id);
  +            log.debug("Setting received partial state for subroot " + state_id);
               in = new MarshalledValueInputStream(istream);                      
               Fqn subroot = Fqn.fromString(targetRoot);
               Region region = regionManager.getRegion(subroot, false);
  @@ -3549,24 +3540,12 @@
                  // If a classloader is registered for the node's region, use it
                  cl = region.getClassLoader();
               }
  -            boolean hasState = in.readBoolean();
  -            if(hasState)
  -            {
                  getStateTransferManager().setState(in,subroot, cl);
  -            }
  -            isStateSet = true;           
  +            stateReceivedSuccess();          
            }
            catch (Throwable t)
            {
  -            my_log.error("failed setting partial state", t);
  -            if (t instanceof Exception)
  -            {
  -               setStateException = (Exception) t;
  -            }
  -            else
  -            {
  -               setStateException = new Exception(t);
  -            }
  +            stateReceivingFailed(t);
            }
            finally
            {
  
  
  



More information about the jboss-cvs-commits mailing list