[jboss-cvs] JBossCache/src/org/jboss/cache ...

Brian Stansberry brian.stansberry at jboss.com
Thu Jul 20 17:58:21 EDT 2006


  User: bstansberry
  Date: 06/07/20 17:58:21

  Modified:    src/org/jboss/cache    CacheSPI.java TreeCacheProxyImpl.java
                        TreeCache.java
  Log:
  [JBCACHE-465] Extract the state transfer code out of TreeCache
  
  Revision  Changes    Path
  1.5       +8 -1      JBossCache/src/org/jboss/cache/CacheSPI.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: CacheSPI.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/CacheSPI.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -b -r1.4 -r1.5
  --- CacheSPI.java	19 Jul 2006 21:34:45 -0000	1.4
  +++ CacheSPI.java	20 Jul 2006 21:58:21 -0000	1.5
  @@ -8,10 +8,11 @@
   
   import org.jboss.cache.buddyreplication.BuddyManager;
   import org.jboss.cache.config.Configuration;
  +import org.jboss.cache.eviction.RegionManager;
   import org.jboss.cache.interceptors.Interceptor;
   import org.jboss.cache.loader.CacheLoader;
   import org.jboss.cache.loader.CacheLoaderManager;
  -import org.jboss.cache.eviction.RegionManager;
  +import org.jboss.cache.statetransfer.StateTransferManager;
   import org.jgroups.Address;
   import org.jgroups.blocks.MethodCall;
   
  @@ -69,6 +70,12 @@
   
       /**
        *
  +     * @return the current {@link PojoStateTransferManager}
  +     */
  +    StateTransferManager getStateTransferManager();
  +
  +    /**
  +     *
        * @return the local address of this cache in a cluster.  Null if running in local mode.
        */
       Object getLocalAddress();
  
  
  
  1.5       +6 -0      JBossCache/src/org/jboss/cache/TreeCacheProxyImpl.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TreeCacheProxyImpl.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCacheProxyImpl.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -b -r1.4 -r1.5
  --- TreeCacheProxyImpl.java	19 Jul 2006 21:34:45 -0000	1.4
  +++ TreeCacheProxyImpl.java	20 Jul 2006 21:58:21 -0000	1.5
  @@ -12,6 +12,7 @@
   import org.jboss.cache.loader.CacheLoaderManager;
   import org.jboss.cache.lock.IdentityLock;
   import org.jboss.cache.eviction.RegionManager;
  +import org.jboss.cache.statetransfer.StateTransferManager;
   import org.jgroups.Address;
   import org.jgroups.blocks.MethodCall;
   
  @@ -73,6 +74,11 @@
           return RPCManager.getInstance(treeCache);
       }
   
  +    public StateTransferManager getStateTransferManager()
  +    {
  +       return treeCache.getStateTransferManager();
  +    }
  +
       public Object getLocalAddress()
       {
           return treeCache.getLocalAddress();
  
  
  
  1.208     +46 -689   JBossCache/src/org/jboss/cache/TreeCache.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TreeCache.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCache.java,v
  retrieving revision 1.207
  retrieving revision 1.208
  diff -u -b -r1.207 -r1.208
  --- TreeCache.java	20 Jul 2006 09:03:51 -0000	1.207
  +++ TreeCache.java	20 Jul 2006 21:58:21 -0000	1.208
  @@ -23,6 +23,7 @@
   import org.jboss.cache.lock.IdentityLock;
   import org.jboss.cache.lock.IsolationLevel;
   import org.jboss.cache.lock.LockStrategyFactory;
  +import org.jboss.cache.lock.LockUtil;
   import org.jboss.cache.lock.LockingException;
   import org.jboss.cache.lock.TimeoutException;
   import org.jboss.cache.marshall.JBCMethodCall;
  @@ -35,9 +36,7 @@
   import org.jboss.cache.marshall.TreeCacheMarshaller;
   import org.jboss.cache.marshall.VersionAwareMarshaller;
   import org.jboss.cache.optimistic.DataVersion;
  -import org.jboss.cache.statetransfer.StateTransferFactory;
  -import org.jboss.cache.statetransfer.StateTransferGenerator;
  -import org.jboss.cache.statetransfer.StateTransferIntegrator;
  +import org.jboss.cache.statetransfer.StateTransferManager;
   import org.jboss.cache.util.MBeanConfigurator;
   import org.jboss.invocation.MarshalledValueOutputStream;
   import org.jboss.system.ServiceMBeanSupport;
  @@ -74,7 +73,7 @@
    * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
    * @author Brian Stansberry
    * @author Daniel Huang (dhuang at jboss.org)
  - * @version $Id: TreeCache.java,v 1.207 2006/07/20 09:03:51 msurtani Exp $
  + * @version $Id: TreeCache.java,v 1.208 2006/07/20 21:58:21 bstansberry Exp $
    *          <p/>
    * @see <a href="http://labs.jboss.com/portal/jbosscache/docs">JBossCache doc</a>
    */
  @@ -253,6 +252,23 @@
        */
      protected BuddyManager buddyManager;
   
  +   /** State transfer manager. Do not access this field directly -- use the getter */
  +   private StateTransferManager stateTransferManager;
  +   
  +   public StateTransferManager getStateTransferManager()
  +   {
  +      if (stateTransferManager == null)
  +      {
  +         stateTransferManager = new StateTransferManager(this);
  +      }
  +      return stateTransferManager;
  +   }
  +   
  +   public void setStateTransferManager(StateTransferManager manager)
  +   {
  +      this.stateTransferManager = manager;
  +   }
  +
      private boolean isStandalone = false;
   
       private long stateFetchTimeout;
  @@ -262,7 +278,7 @@
       * activateReqion or inactivateRegion.  Requests for these fqns
       * will be ignored by _getState().
       */
  -   protected final Set activationChangeNodes = new HashSet();
  +   protected final Set activationChangeNodes = Collections.synchronizedSet(new HashSet());
       boolean started;
   
       public Configuration getConfiguration()
  @@ -957,22 +973,6 @@
      // -----------  Marshalling and State Transfer -----------------------
   
      /**
  -    * Returns the state bytes from the message listener.
  -    */
  -   public byte[] getStateBytes()
  -   {
  -      return this.getMessageListener().getState();
  -   }
  -
  -   /**
  -    * Sets the state bytes in the message listener.
  -    */
  -   public void setStateBytes(byte[] state)
  -   {
  -      this.getMessageListener().setState(state);
  -   }
  -
  -   /**
       * Registers a specific classloader for a region defined by a fully
       * qualified name.
       * A instance of {@link TreeCacheMarshaller} is used for marshalling.
  @@ -995,7 +995,7 @@
      }
   
      /**
  -    * Unregisteres a class loader for a region.
  +    * Unregisters a class loader for a region.
       *
       * @param fqn The fqn region.
       * @throws RegionNotFoundException If there is a conflict in fqn specification.
  @@ -1052,10 +1052,7 @@
   
            // Add this fqn to the set of those we are activating
            // so calls to _getState for the fqn can return quickly
  -         synchronized (activationChangeNodes)
  -         {
               activationChangeNodes.add(fqn);
  -         }
   
            // Start accepting messages for the subtree, but
            // queue them for later processing.  We do this early
  @@ -1082,7 +1079,7 @@
               }
   
               Object[] mbrArray = getMembers().toArray();
  -            _loadState(subtreeRoot.getFqn(), subtreeRoot, mbrArray, cl);
  +            getStateTransferManager().loadState(subtreeRoot.getFqn(), subtreeRoot, mbrArray, cl);
            }
            else
            {
  @@ -1101,7 +1098,7 @@
                     // We'll update this node with the state we receive
                     subtreeRoot = createSubtreeRootNode(buddyRoot);
                  }
  -               _loadState(fqn, subtreeRoot, sources, cl);
  +               getStateTransferManager().loadState(fqn, subtreeRoot, sources, cl);
               }
            }
   
  @@ -1143,11 +1140,13 @@
         }
         finally
         {
  -         synchronized (activationChangeNodes)
  -         {
               activationChangeNodes.remove(fqn);
            }
         }
  +   
  +   public boolean isActivatingDeactivating(Fqn fqn)
  +   {
  +      return activationChangeNodes.contains(fqn);
      }
   
      /**
  @@ -1175,123 +1174,6 @@
      }
   
      /**
  -    * Requests state from each of the given source nodes in the cluster
  -    * until it gets it or no node replies with a timeout exception.  If state
  -    * is returned, integrates it into the given DataNode.  If no state is
  -    * returned but a node replies with a timeout exception, the calls will be
  -    * repeated with a longer timeout, until 3 attempts have been made.
  -    *
  -    * @param subtreeRoot     Fqn of the topmost node in the subtree whose
  -    *                        state should be transferred.
  -    * @param integrationRoot the DataNode into which state should be integrated
  -    * @param sources         the cluster nodes to query for state
  -    * @param cl              the classloader to use to unmarshal the state.
  -    *                        Can be <code>null</code>.
  -    * @throws Exception
  -    */
  -   public void _loadState(Fqn subtreeRoot, DataNode integrationRoot,
  -                          Object[] sources, ClassLoader cl)
  -      throws Exception
  -   {
  -      // Call each node in the cluster with progressively longer timeouts
  -      // until we get state or no cluster node returns a TimeoutException
  -      long[] timeouts = {400, 800, 1600};
  -      Object ourself = getLocalAddress(); // ignore ourself when we call
  -      boolean stateSet = false;
  -      TimeoutException timeoutException = null;
  -      Object timeoutTarget = null;
  -
  -      boolean trace = log.isTraceEnabled();
  -
  -      for (int i = 0; i < timeouts.length; i++)
  -      {
  -         timeoutException = null;
  -
  -         Boolean force = (i == timeouts.length - 1) ? Boolean.TRUE
  -            : Boolean.FALSE;
  -
  -         MethodCall psmc = MethodCallFactory.create(MethodDeclarations.getPartialStateMethod,
  -            new Object[]{subtreeRoot,
  -               new Long(timeouts[i]),
  -               force,
  -               Boolean.FALSE});
  -
  -         MethodCall replPsmc = MethodCallFactory.create(MethodDeclarations.replicateMethod,
  -            new Object[]{psmc});
  -
  -         // Iterate over the group members, seeing if anyone
  -         // can give us state for this region
  -         for (int j = 0; j < sources.length; j++)
  -         {
  -            Object target = sources[j];
  -            if (ourself.equals(target))
  -               continue;
  -
  -            Vector targets = new Vector();
  -            targets.add(target);
  -
  -            List responses = callRemoteMethods(targets, replPsmc, true,
  -               true, configuration.getSyncReplTimeout());
  -            Object rsp = null;
  -            if (responses != null && responses.size() > 0)
  -            {
  -               rsp = responses.get(0);
  -               if (rsp instanceof byte[])
  -               {
  -                  _setState((byte[]) rsp, integrationRoot, cl);
  -                  stateSet = true;
  -
  -                  if (log.isDebugEnabled())
  -                  {
  -                     log.debug("TreeCache.activateRegion(): " + ourself +
  -                        " got state from " + target);
  -                  }
  -
  -                  break;
  -               }
  -               else if (rsp instanceof TimeoutException)
  -               {
  -                  timeoutException = (TimeoutException) rsp;
  -                  timeoutTarget = target;
  -                  if (trace)
  -                  {
  -                     log.trace("TreeCache.activateRegion(): " + ourself +
  -                        " got a TimeoutException from " + target);
  -                  }
  -               }
  -            }
  -
  -            if (trace)
  -            {
  -               log.trace("TreeCache.activateRegion(): " + ourself +
  -                  " No usable response from node " + target +
  -                  (rsp == null ? "" : (" -- received " + rsp)));
  -            }
  -         }
  -
  -         // We've looped through all targets; if we got state or didn't
  -         // but no one sent a timeout (which means no one had state)
  -         // we don't want to try again
  -         if (stateSet || timeoutException == null)
  -            break;
  -      }
  -
  -      if (!stateSet)
  -      {
  -         // If we got a timeout exception on the final try,
  -         // this is a failure condition
  -         if (timeoutException != null)
  -         {
  -            throw new CacheException("Failed getting state due to timeout on " +
  -               timeoutTarget, timeoutException);
  -         }
  -
  -         if (log.isDebugEnabled())
  -            log.debug("TreeCache.activateRegion(): No nodes able to give state");
  -      }
  -   }
  -
  -   /**
       * Creates a subtree in the local tree.
       * Returns the DataNode created.
       */
  @@ -1363,7 +1245,7 @@
       *                                     managed (either by activate/inactiveRegion()
       *                                     or by registerClassLoader())
       * @throws CacheException              if there is a problem evicting nodes
  -    * @throws IllegalStateException       if  is <code>false</code>
  +    * @throws IllegalStateException       if {@link Configuration#isUseRegionBasedMarshalling()} is <code>false</code>
       */
      public void inactivateRegion(String subtreeFqn) throws RegionNameConflictException, CacheException
      {
  @@ -1377,11 +1259,8 @@
         boolean subtreeLocked = false;
         try
         {
  -
  -         synchronized (activationChangeNodes)
  -         {
  +         // Record that this fqn is in status change, so can't provide state
               activationChangeNodes.add(fqn);
  -         }
   
            boolean inactive = marshaller_.isInactive(subtreeFqn);
            if (!inactive)
  @@ -1478,12 +1357,9 @@
               }
            }
   
  -         synchronized (activationChangeNodes)
  -         {
               activationChangeNodes.remove(fqn);
            }
         }
  -   }
   
      /**
       * Evicts the node at <code>subtree</code> along with all descendant nodes.
  @@ -1648,480 +1524,7 @@
       */
      public byte[] _getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
      {
  -
  -      if (marshaller_ != null)
  -      {
  -         // can't give state for regions currently being activated/inactivated
  -         synchronized (activationChangeNodes)
  -         {
  -            if (activationChangeNodes.contains(fqn))
  -            {
  -               if (log.isDebugEnabled())
  -                  log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
  -               return null;
  -            }
  -         }
  -
  -         // Can't give state for inactive nodes
  -         if (marshaller_.isInactive(fqn.toString()))
  -         {
  -            if (log.isDebugEnabled())
  -               log.debug("ignoring _getState() for inactive region " + fqn);
  -            return null;
  -         }
  -      }
  -
  -      DataNode rootNode = findNode(fqn);
  -      if (rootNode == null)
  -         return null;
  -
  -      boolean fetchTransientState  = configuration.isFetchInMemoryState();
  -      boolean fetchPersistentState = cacheLoaderManager !=null && cacheLoaderManager.isFetchPersistentState();
  -
  -      Object owner = getOwnerForLock();
  -
  -      try
  -      {
  -         if (fetchTransientState || fetchPersistentState)
  -         {
  -            log.info("locking the " + fqn + " subtree to return the in-memory (transient) state");
  -            acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
  -         }
  -
  -         StateTransferGenerator generator =
  -            StateTransferFactory.getStateTransferGenerator(this);
  -
  -         return generator.generateStateTransfer(rootNode,
  -            fetchTransientState,
  -            fetchPersistentState,
  -            suppressErrors);
  -      }
  -      finally
  -      {
  -         releaseStateTransferLocks(rootNode, owner, true);
  -      }
  -   }
  -
  -   /**
  -    * Set the portion of the cache rooted in <code>targetRoot</code>
  -    * to match the given state. Updates the contents of <code>targetRoot</code>
  -    * to reflect those in <code>new_state</code>.
  -    * <p/>
  -    * <strong>NOTE:</strong> This method performs no locking of nodes; it
  -    * is up to the caller to lock <code>targetRoot</code> before calling
  -    * this method.
  -    *
  -    * @param new_state  a serialized byte[][] array where element 0 is the
  -    *                   transient state (or null) , and element 1 is the
  -    *                   persistent state (or null)
  -    * @param targetRoot fqn of the node into which the state should be integrated
  -    * @param cl         classloader to use to unmarshal the state, or
  -    *                   <code>null</code> if the TCCL should be used
  -    */
  -   public void _setState(byte[] new_state, Fqn targetRoot, ClassLoader cl)
  -      throws Exception
  -   {
  -      DataNode target = findNode(targetRoot);
  -      if (target == null)
  -      {
  -         // Create the integration root, but do not replicate
  -         Option option = new Option();
  -         option.setCacheModeLocal(true);
  -         this.put(targetRoot, null, option);
  -         target = findNode(targetRoot);
  -      }
  -
  -      _setState(new_state, target, cl);
  -   }
  -
  -   /**
  -    * Set the portion of the cache rooted in <code>targetRoot</code>
  -    * to match the given state. Updates the contents of <code>targetRoot</code>
  -    * to reflect those in <code>new_state</code>.
  -    * <p/>
  -    * <strong>NOTE:</strong> This method performs no locking of nodes; it
  -    * is up to the caller to lock <code>targetRoot</code> before calling
  -    * this method.
  -    *
  -    * @param new_state a serialized byte[][] array where element 0 is the
  -    *                  transient state (or null) , and element 1 is the
  -    *                  persistent state (or null)
  -    * @param targetRoot node into which the state should be integrated
  -    * @param cl         classloader to use to unmarshal the state, or
  -    *                   <code>null</code> if the TCCL should be used
  -    */
  -   private void _setState(byte[] new_state, DataNode targetRoot, ClassLoader cl)
  -      throws Exception
  -   {
  -      if (new_state == null)
  -      {
  -         log.info("new_state is null (may be first member in cluster)");
  -         return;
  -      }
  -
  -      log.info("received the state (size=" + new_state.length + " bytes)");
  -
  -      Object owner = getOwnerForLock();
  -      try
  -      {
  -         // Acquire a lock on the root node
  -         acquireLocksForStateTransfer(targetRoot, owner, stateFetchTimeout,
  -            true, true);
  -
  -         // 1. Unserialize the states into transient and persistent state
  -         StateTransferIntegrator integrator =
  -            StateTransferFactory.getStateTransferIntegrator(new_state,
  -               targetRoot.getFqn(),
  -               this);
  -
  -         // 2. If transient state is available, integrate it
  -         try
  -         {
  -            integrator.integrateTransientState(targetRoot, cl);
  -            notifyAllNodesCreated(targetRoot);
  -         }
  -         catch (Throwable t)
  -         {
  -            log.error("failed setting transient state", t);
  -         }
  -
  -         // 3. Store any persistent state
  -         integrator.integratePersistentState();
  -      }
  -      finally
  -      {
  -         releaseStateTransferLocks(targetRoot, owner, true);
  -      }
  -
  -   }
  -
  -   /**
  -    * Acquires locks on a root node for an owner for state transfer.
  -    */
  -   protected void acquireLocksForStateTransfer(DataNode root,
  -                                               Object lockOwner,
  -                                               long timeout,
  -                                               boolean lockChildren,
  -                                               boolean force)
  -      throws Exception
  -   {
  -      try
  -      {
  -         if (lockChildren)
  -            root.acquireAll(lockOwner, timeout, DataNode.LOCK_TYPE_READ);
  -         else
  -            root.acquire(lockOwner, timeout, DataNode.LOCK_TYPE_READ);
  -      }
  -      catch (TimeoutException te)
  -      {
  -         log.error("Caught TimeoutException acquiring locks on region " +
  -            root.getFqn(), te);
  -         if (force)
  -         {
  -            // Until we have FLUSH in place, don't force locks
  -//            forceAcquireLock(root, lockOwner, lockChildren);
  -            throw te;
  -
  -         }
  -         else
  -         {
  -            throw te;
  -         }
  -      }
  -   }
  -
  -   /**
  -    * Releases all state transfer locks acquired.
  -    *
  -    * @see #acquireLocksForStateTransfer
  -    */
  -   protected void releaseStateTransferLocks(DataNode root,
  -                                            Object lockOwner,
  -                                            boolean childrenLocked)
  -   {
  -      try
  -      {
  -         if (childrenLocked)
  -            root.releaseAll(lockOwner);
  -         else
  -            root.release(lockOwner);
  -      }
  -      catch (Throwable t)
  -      {
  -         log.error("failed releasing locks", t);
  -      }
  -   }
  -
  -   /**
  -    * Forcibly acquire a read lock on the given node for the given owner,
  -    * breaking any existing locks that prevent the read lock.  If the
  -    * existing lock is held by a GlobalTransaction, breaking the lock may
  -    * result in a rollback of the transaction.
  -    *
  -    * @param node         the node
  -    * @param newOwner     the new owner (usually a Thread or GlobalTransaction)
  -    * @param lockChildren <code>true</code> if this method should be recursively
  -    *                     applied to <code>node</code>'s children.
  -    */
  -   protected void forceAcquireLock(DataNode node, Object newOwner, boolean lockChildren)
  -   {
  -      IdentityLock lock = node.getLock();
  -      boolean acquired = lock.isOwner(newOwner);
  -
  -      if (!acquired && log.isDebugEnabled())
  -         log.debug("Force acquiring lock on node " + node.getFqn());
  -
  -      while (!acquired)
  -      {
  -         Object curOwner = null;
  -         boolean attempted = false;
  -
  -         // Keep breaking write locks until we acquire a read lock
  -         // or there are no more write locks
  -         while (!acquired && ((curOwner = lock.getWriterOwner()) != null))
  -         {
  -            acquired = acquireLockFromOwner(node, lock, curOwner, newOwner);
  -            attempted = true;
  -         }
  -
  -         // If no more write locks, but we haven't acquired, see if we
  -         // need to break read locks as well
  -         if (!acquired && configuration.getIsolationLevel() == IsolationLevel.SERIALIZABLE)
  -         {
  -            Iterator it = lock.getReaderOwners().iterator();
  -            if (it.hasNext())
  -            {
  -               curOwner = it.next();
  -               acquired = acquireLockFromOwner(node, lock, it.next(), newOwner);
  -               attempted = true;
  -               // Don't keep iterating due to the risk of
  -               // ConcurrentModificationException if readers are removed
  -               // Just go back through our outer loop to get the next one
  -            }
  -         }
  -
  -         if (!acquired && !attempted)
  -         {
  -            // We only try to acquire above if someone else has the lock.
  -            // Seems no one is holding a lock and it's there for the taking.
  -            try
  -            {
  -               acquired = node.acquire(newOwner, 1, DataNode.LOCK_TYPE_READ);
  -            }
  -            catch (Exception ignored)
  -            {
  -            }
  -         }
  -      }
  -
  -      // Recursively unlock children
  -      if (lockChildren && node.hasChildren())
  -      {
  -         Collection children = node.getChildren().values();
  -         for (Iterator it = children.iterator(); it.hasNext();)
  -         {
  -            forceAcquireLock((DataNode) it.next(), newOwner, true);
  -         }
  -      }
  -   }
  -
  -   /**
  -    * Attempts to acquire a read lock on <code>node</code> for
  -    * <code>newOwner</code>, if necessary breaking locks held by
  -    * <code>curOwner</code>.
  -    *
  -    * @param node     the node
  -    * @param lock     the lock
  -    * @param curOwner the current owner
  -    * @param newOwner the new owner
  -    */
  -   private boolean acquireLockFromOwner(DataNode node,
  -                                        IdentityLock lock,
  -                                        Object curOwner,
  -                                        Object newOwner)
  -   {
  -      if (log.isTraceEnabled())
  -         log.trace("Attempting to acquire lock for node " + node.getFqn() +
  -            " from owner " + curOwner);
  -
  -      boolean acquired = false;
  -      boolean broken = false;
  -      int tryCount = 0;
  -      int lastStatus = TransactionLockStatus.STATUS_BROKEN;
  -
  -      while (!broken && !acquired)
  -      {
  -         if (curOwner instanceof GlobalTransaction)
  -         {
  -            int status = breakTransactionLock((GlobalTransaction) curOwner, lock, lastStatus, tryCount);
  -            if (status == TransactionLockStatus.STATUS_BROKEN)
  -               broken = true;
  -            else if (status != lastStatus)
  -               tryCount = 0;
  -            lastStatus = status;
  -         }
  -         else if (tryCount > 0)
  -         {
  -            lock.release(curOwner);
  -            broken = true;
  -         }
  -
  -         if (broken && log.isTraceEnabled())
  -            log.trace("Broke lock for node " + node.getFqn() +
  -               " held by owner " + curOwner);
  -
  -         try
  -         {
  -            acquired = node.acquire(newOwner, 1, DataNode.LOCK_TYPE_READ);
  -         }
  -         catch (Exception ignore)
  -         {
  -         }
  -
  -         tryCount++;
  -      }
  -
  -      return acquired;
  -   }
  -
  -   /**
  -    * Attempts to release the lock held by <code>gtx</code> by altering the
  -    * underlying transaction.  Different strategies will be employed
  -    * depending on the status of the transaction and param
  -    * <code>tryCount</code>.  Transaction may be rolled back or marked
  -    * rollback-only, or the lock may just be broken, ignoring the tx.  Makes an
  -    * effort to not affect the tx or break the lock if tx appears to be in
  -    * the process of completion; param <code>tryCount</code> is used to help
  -    * make decisions about this.
  -    * <p/>
  -    * This method doesn't guarantee to have broken the lock unless it returns
  -    * {@link TransactionLockStatus#STATUS_BROKEN}.
  -    *
  -    * @param gtx        the gtx holding the lock
  -    * @param lock       the lock
  -    * @param lastStatus the return value from a previous invocation of this
  -    *                   method for the same lock, or Status.STATUS_UNKNOW
  -    *                   for the first invocation.
  -    * @param tryCount   number of times this method has been called with
  -    *                   the same gtx, lock and lastStatus arguments. Should
  -    *                   be reset to 0 anytime lastStatus changes.
  -    * @return the current status of the Transaction associated with
  -    *         <code>gtx</code>, or {@link TransactionLockStatus#STATUS_BROKEN}
  -    *         if the lock held by gtx was forcibly broken.
  -    */
  -   private int breakTransactionLock(GlobalTransaction gtx,
  -                                    IdentityLock lock,
  -                                    int lastStatus,
  -                                    int tryCount)
  -   {
  -      int status = Status.STATUS_UNKNOWN;
  -      Transaction tx = tx_table.getLocalTransaction(gtx);
  -      if (tx != null)
  -      {
  -         try
  -         {
  -            status = tx.getStatus();
  -
  -            if (status != lastStatus)
  -               tryCount = 0;
  -
  -            switch (status)
  -            {
  -               case Status.STATUS_ACTIVE:
  -               case Status.STATUS_MARKED_ROLLBACK:
  -               case Status.STATUS_PREPARING:
  -               case Status.STATUS_UNKNOWN:
  -                  if (tryCount == 0)
  -                  {
  -                     if (log.isTraceEnabled())
  -                        log.trace("Attempting to break transaction lock held " +
  -                           " by " + gtx + " by rolling back local tx");
  -                     // This thread has to join the tx
  -                     tm.resume(tx);
  -                     try
  -                     {
  -                        tx.rollback();
  -                     }
  -                     finally
  -                     {
  -                        tm.suspend();
  -                     }
  -
  -                  }
  -                  else if (tryCount > 100)
  -                  {
  -                     // Something is wrong; our initial rollback call
  -                     // didn't generate a valid state change; just force it
  -                     lock.release(gtx);
  -                     status = TransactionLockStatus.STATUS_BROKEN;
  -                  }
  -                  break;
  -
  -               case Status.STATUS_COMMITTING:
  -               case Status.STATUS_ROLLING_BACK:
  -                  // We'll try up to 10 times before just releasing
  -                  if (tryCount < 10)
  -                     break; // let it finish
  -                  // fall through and release
  -
  -               case Status.STATUS_COMMITTED:
  -               case Status.STATUS_ROLLEDBACK:
  -               case Status.STATUS_NO_TRANSACTION:
  -                  lock.release(gtx);
  -                  status = TransactionLockStatus.STATUS_BROKEN;
  -                  break;
  -
  -               case Status.STATUS_PREPARED:
  -                  // If the tx was started here, we can still abort the commit,
  -                  // otherwise we are in the middle of a remote commit() call
  -                  // and the status is just about to change
  -                  if (tryCount == 0 && gtx.addr.equals(getLocalAddress()))
  -                  {
  -                     // We can still abort the commit
  -                     if (log.isTraceEnabled())
  -                        log.trace("Attempting to break transaction lock held " +
  -                           "by " + gtx + " by marking local tx as " +
  -                           "rollback-only");
  -                     tx.setRollbackOnly();
  -                     break;
  -                  }
  -                  else if (tryCount < 10)
  -                  {
  -                     // EITHER tx was started elsewhere (in which case we'll
  -                     // wait a bit to allow the commit() call to finish;
  -                     // same as STATUS_COMMITTING above)
  -                     // OR we marked the tx rollbackOnly above and are just
  -                     // waiting a bit for the status to change
  -                     break;
  -                  }
  -
  -                  // fall through and release
  -               default:
  -                  lock.release(gtx);
  -                  status = TransactionLockStatus.STATUS_BROKEN;
  -            }
  -         }
  -         catch (Exception e)
  -         {
  -            log.error("Exception breaking locks held by " + gtx, e);
  -            lock.release(gtx);
  -            status = TransactionLockStatus.STATUS_BROKEN;
  -         }
  -      }
  -      else
  -      {
  -         // Race condition; gtx was cleared from tx_table.
  -         // Just double check if gtx still holds a lock
  -         if (gtx == lock.getWriterOwner()
  -            || lock.getReaderOwners().contains(gtx))
  -         {
  -            // TODO should we throw an exception??
  -            lock.release(gtx);
  -            status = TransactionLockStatus.STATUS_BROKEN;
  -         }
  -      }
  -
  -      return status;
  +      return getStateTransferManager().getState(fqn, timeout, force, suppressErrors);
      }
   
      private void removeLocksForDeadMembers(DataNode node,
  @@ -2148,7 +1551,13 @@
   
         for (iter = deadOwners.iterator(); iter.hasNext();)
         {
  -         breakTransactionLock(node, lock, (GlobalTransaction) iter.next());
  +         GlobalTransaction deadOwner = (GlobalTransaction) iter.next();
  +         boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
  +         boolean broken = LockUtil.breakTransactionLock(lock, deadOwner, localTx, this);
  +
  +         if (broken && log.isTraceEnabled())
  +            log.trace("Broke lock for node " + node.getFqn() +
  +                      " held by " + deadOwner);
         }
   
         // Recursively unlock children
  @@ -2162,31 +1571,6 @@
         }
      }
   
  -   private void breakTransactionLock(DataNode node,
  -                                     IdentityLock lock,
  -                                     GlobalTransaction gtx)
  -   {
  -      boolean broken = false;
  -      int tryCount = 0;
  -      int lastStatus = TransactionLockStatus.STATUS_BROKEN;
  -
  -      while (!broken && lock.isOwner(gtx))
  -      {
  -         int status = breakTransactionLock(gtx, lock, lastStatus, tryCount);
  -         if (status == TransactionLockStatus.STATUS_BROKEN)
  -            broken = true;
  -         else if (status != lastStatus)
  -            tryCount = 0;
  -         lastStatus = status;
  -
  -         if (broken && log.isTraceEnabled())
  -            log.trace("Broke lock for node " + node.getFqn() +
  -               " held by owner " + gtx);
  -
  -         tryCount++;
  -      }
  -   }
  -
      private boolean isLockOwnerDead(Object owner, Vector deadMembers)
      {
         boolean result = false;
  @@ -4316,7 +3700,7 @@
               // but we have to catch the Throwable declared in the method sig
               my_log.error("Caught " + t.getClass().getName() +
                  " while responding to initial state transfer request;" +
  -               " returning null");
  +               " returning null", t);
               return null;
            }
         }
  @@ -4329,7 +3713,7 @@
               if (new_state == null)
                  my_log.info("transferred state is null (may be first member in cluster)");
               else
  -               TreeCache.this._setState(new_state, Fqn.ROOT, null);
  +               getStateTransferManager().setState(new_state, Fqn.ROOT, null);
   
               isStateSet = true;
            }
  @@ -4599,7 +3983,7 @@
       * @param fqn Fully qualified name for the corresponding node.
       * @return DataNode
       */
  -   private DataNode findNode(Fqn fqn)
  +   public DataNode findNode(Fqn fqn)
      {
         try
         {
  @@ -4911,27 +4295,6 @@
      }
   
      /**
  -    * Generates NodeAdded notifications for all nodes of the tree. This is
  -    * called whenever the tree is initially retrieved (state transfer)
  -    */
  -   protected void notifyAllNodesCreated(DataNode curr)
  -   {
  -      DataNode n;
  -      Map children;
  -
  -      if (curr == null) return;
  -      notifyNodeCreated(curr.getFqn());
  -      if ((children = curr.getChildren()) != null)
  -      {
  -         for (Iterator it = children.values().iterator(); it.hasNext();)
  -         {
  -            n = (DataNode) it.next();
  -            notifyAllNodesCreated(n);
  -         }
  -      }
  -   }
  -
  -   /**
       * Returns the default JGroup properties.
       * Subclasses may wish to override this method.
       */
  @@ -5075,10 +4438,4 @@
        
      }
   
  -   static interface TransactionLockStatus extends Status
  -   {
  -      public static final int STATUS_BROKEN = Integer.MIN_VALUE;
  -   }
  -
  -
   }
  
  
  



More information about the jboss-cvs-commits mailing list