[jboss-cvs] JBossCache/src/org/jboss/cache/interceptors ...

Manik Surtani msurtani at belmont.prod.atl2.jboss.com
Tue Aug 29 12:35:52 EDT 2006


  User: msurtani
  Date: 06/08/29 12:35:52

  Modified:    src/org/jboss/cache/interceptors  
                        PessimisticLockInterceptor.java
                        ReplicationInterceptor.java
  Log:
  Fixed basic replication issues
  
  Revision  Changes    Path
  1.29      +212 -159  JBossCache/src/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: PessimisticLockInterceptor.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/PessimisticLockInterceptor.java,v
  retrieving revision 1.28
  retrieving revision 1.29
  diff -u -b -r1.28 -r1.29
  --- PessimisticLockInterceptor.java	25 Aug 2006 14:10:07 -0000	1.28
  +++ PessimisticLockInterceptor.java	29 Aug 2006 16:35:52 -0000	1.29
  @@ -19,8 +19,8 @@
   import org.jboss.cache.lock.IsolationLevel;
   import org.jboss.cache.lock.LockingException;
   import org.jboss.cache.lock.TimeoutException;
  -import org.jboss.cache.marshall.MethodDeclarations;
   import org.jboss.cache.marshall.MethodCall;
  +import org.jboss.cache.marshall.MethodDeclarations;
   
   import javax.transaction.Transaction;
   import java.util.Collections;
  @@ -36,30 +36,34 @@
    * current method and unlock when the method returns.
    *
    * @author Bela Ban
  - * @version $Id: PessimisticLockInterceptor.java,v 1.28 2006/08/25 14:10:07 msurtani Exp $
  + * @version $Id: PessimisticLockInterceptor.java,v 1.29 2006/08/29 16:35:52 msurtani Exp $
    */
  -public class PessimisticLockInterceptor extends Interceptor {
  -   TransactionTable tx_table=null;
  +public class PessimisticLockInterceptor extends Interceptor
  +{
  +   TransactionTable tx_table = null;
   
  -   /** Map<Object, java.util.List>. Keys = threads, values = lists of locks held by that thread */
  +   /**
  +    * Map<Object, java.util.List>. Keys = threads, values = lists of locks held by that thread
  +    */
      Map lock_table;
      private long lock_acquisition_timeout;
      LockManager lockManager = new LockManager();
   
   
  -   public void setCache(CacheSPI cache) {
  +   public void setCache(CacheSPI cache)
  +   {
         super.setCache(cache);   
  -      tx_table=cache.getTransactionTable();
  -      lock_table=cache.getLockTable();
  -      lock_acquisition_timeout=cache.getConfiguration().getLockAcquisitionTimeout();
  +      tx_table = cache.getTransactionTable();
  +      lock_table = cache.getLockTable();
  +      lock_acquisition_timeout = cache.getConfiguration().getLockAcquisitionTimeout();
      }
   
   
  -
  -   public Object invoke(MethodCall m) throws Throwable {
  -      Fqn fqn=null;
  -      DataNode.LockType lock_type=DataNode.LockType.NONE;
  -      Object[] args=m.getArgs();
  +   public Object invoke(MethodCall m) throws Throwable
  +   {
  +      Fqn fqn = null;
  +      DataNode.LockType lock_type = DataNode.LockType.NONE;
  +      Object[] args = m.getArgs();
         InvocationContext ctx = cache.getInvocationContext();
   
          if (log.isTraceEnabled()) log.trace("PessimisticLockInterceptor invoked for method " + m);
  @@ -84,9 +88,8 @@
      * (keyed by TX) */
        // List locks=null;
   
  -      boolean recursive=false;
  -      boolean createIfNotExists=false;
  -
  +      boolean recursive = false;
  +      boolean createIfNotExists = false;
   
         // 1. Determine the type of lock (read, write, or none) depending on the method. If no lock is required, invoke
         //    the method, then return immediately
  @@ -96,24 +99,24 @@
            case MethodDeclarations.putDataMethodLocal_id:
            case MethodDeclarations.putDataEraseMethodLocal_id:
            case MethodDeclarations.putKeyValMethodLocal_id:
  -            createIfNotExists=true;
  -            fqn=(Fqn)args[1];
  -            lock_type=DataNode.LockType.WRITE;
  +            createIfNotExists = true;
  +            fqn = (Fqn) args[1];
  +            lock_type = DataNode.LockType.WRITE;
               break;
            case MethodDeclarations.removeNodeMethodLocal_id:
  -            fqn=(Fqn)args[1];
  -            lock_type=DataNode.LockType.WRITE;
  -            recursive=true; // remove node and *all* child nodes
  +            fqn = (Fqn) args[1];
  +            lock_type = DataNode.LockType.WRITE;
  +            recursive = true; // remove node and *all* child nodes
               break;
            case MethodDeclarations.removeKeyMethodLocal_id:
            case MethodDeclarations.removeDataMethodLocal_id:
            case MethodDeclarations.addChildMethodLocal_id:
  -            fqn=(Fqn)args[1];
  -            lock_type=DataNode.LockType.WRITE;
  +            fqn = (Fqn) args[1];
  +            lock_type = DataNode.LockType.WRITE;
               break;
            case MethodDeclarations.evictNodeMethodLocal_id:
  -            fqn=(Fqn)args[0];
  -            lock_type=DataNode.LockType.WRITE;
  +            fqn = (Fqn) args[0];
  +            lock_type = DataNode.LockType.WRITE;
               break;
            case MethodDeclarations.getKeyValueMethodLocal_id:
            case MethodDeclarations.getNodeMethodLocal_id:
  @@ -121,13 +124,13 @@
            case MethodDeclarations.getChildrenNamesMethodLocal_id:
            case MethodDeclarations.releaseAllLocksMethodLocal_id:
            case MethodDeclarations.printMethodLocal_id:
  -            fqn=(Fqn)args[0];
  -            lock_type=DataNode.LockType.READ;
  +            fqn = (Fqn) args[0];
  +            lock_type = DataNode.LockType.READ;
               break;
            case MethodDeclarations.lockMethodLocal_id:
  -            fqn=(Fqn)args[0];
  -            lock_type=(DataNode.LockType)args[1];
  -            recursive=((Boolean)args[2]).booleanValue();
  +            fqn = (Fqn) args[0];
  +            lock_type = (DataNode.LockType) args[1];
  +            recursive = ((Boolean) args[2]).booleanValue();
               break;
            case MethodDeclarations.commitMethod_id:
               // commit propagated up from the tx interceptor
  @@ -150,132 +153,167 @@
         // If no TX: add each acquired lock to the list of locks for this method (locks)
         // If TX: [merge code from TransactionInterceptor]: register with TxManager, on commit/rollback,
         // release the locks for the given TX
  -      if(fqn != null) {
  -         if(createIfNotExists) {
  -            do {
  +      if (fqn != null)
  +      {
  +         if (createIfNotExists)
  +         {
  +            do
  +            {
                   // TODO: WHat do we do about createIfNotExists flag?
                  lock(fqn, ctx.getGlobalTransaction(), lock_type, recursive, createIfNotExists);
               }
  -            while(!cache.hasChild(fqn)); // keep trying until we have the lock (fixes concurrent remove())
  +            while (!cache.hasChild(fqn)); // keep trying until we have the lock (fixes concurrent remove())
                                                  // terminates successfully, or with (Timeout)Exception
            }
            else
               // TODO: WHat do we do about createIfNotExists flag?
  +         {
               lock(fqn, ctx.getGlobalTransaction(), lock_type, recursive, createIfNotExists);
         }
  -      else {
  -         if(log.isTraceEnabled())
  +      }
  +      else
  +      {
  +         if (log.isTraceEnabled())
  +         {
               log.trace("bypassed locking as method " + m.getName() + "() doesn't require locking");
         }
  -      if(m.getMethodId() == MethodDeclarations.lockMethodLocal_id)
  +      }
  +      if (m.getMethodId() == MethodDeclarations.lockMethodLocal_id)
  +      {
            return null;
  +      }
         return super.invoke(m);
      }
   
   
  -
      /**
       * Locks a given node.
  +    *
       * @param fqn
       * @param gtx
       * @param lock_type DataNode.LOCK_TYPE_READ, DataNode.LOCK_TYPE_WRITE or DataNode.LOCK_TYPE_NONE
       * @param recursive Lock children recursively
       */
      private void lock(Fqn fqn, GlobalTransaction gtx, DataNode.LockType lock_type, boolean recursive, boolean createIfNotExists)
  -         throws TimeoutException, LockingException, InterruptedException {
  +           throws TimeoutException, LockingException, InterruptedException
  +   {
         Node       n;
         Node       child_node;
         Object         child_name;
         Thread         currentThread = Thread.currentThread();
  -      Object         owner = (gtx != null) ? (Object)gtx : currentThread;
  +      Object owner = (gtx != null) ? (Object) gtx : currentThread;
         int            treeNodeSize;
         boolean        acquired;
   
   
          if (log.isTraceEnabled()) log.trace("Attempting to lock node " + fqn + " for owner " + owner);
   
  -      if(fqn == null) {
  +      if (fqn == null)
  +      {
            log.error("fqn is null - this should not be the case");
            return;
         }
   
  -      if((treeNodeSize=fqn.size()) == 0)
  +      if ((treeNodeSize = fqn.size()) == 0)
  +      {
            return;
  +      }
   
  -      if(configuration.getIsolationLevel() == IsolationLevel.NONE)
  -         lock_type=DataNode.LockType.NONE;
  +      if (configuration.getIsolationLevel() == IsolationLevel.NONE)
  +      {
  +         lock_type = DataNode.LockType.NONE;
  +      }
   
  -      n=cache;
  -      for(int i=0; i < treeNodeSize; i++) {
  -         child_name=fqn.get(i);
  +      n = cache;
  +      for (int i = 0; i < treeNodeSize; i++)
  +      {
  +         child_name = fqn.get(i);
   
            cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
            child_node = n.getChild(new Fqn(child_name));
            if (child_node == null && createIfNotExists)
            {
                cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
  -             child_node= n.addChild(new Fqn(child_name));
  +            child_node = n.addChild(new Fqn(child_name));
            }
   
  -         if(child_node == null) {
  -            if(log.isTraceEnabled())
  +         if (child_node == null)
  +         {
  +            if (log.isTraceEnabled())
  +            {
                  log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
  +            }
               return;
            }
   
  -         if(lock_type == DataNode.LockType.NONE) {
  +         if (lock_type == DataNode.LockType.NONE)
  +         {
               // acquired=false;
  -            n=child_node;
  +            n = child_node;
               continue;
            }
  -         else {
  -            if(lock_type == DataNode.LockType.WRITE && i == (treeNodeSize - 1)) {
  +         else
  +         {
  +            if (lock_type == DataNode.LockType.WRITE && i == (treeNodeSize - 1))
  +            {
                  //acquired=child_node.acquire(owner, lock_timeout, DataNode.LOCK_TYPE_WRITE);
                  acquired = lockManager.acquire(child_node, owner, DataNode.LockType.WRITE);
               }
  -            else {
  +            else
  +            {
                  //acquired=child_node.acquire(owner, lock_timeout, DataNode.LOCK_TYPE_READ);
                  acquired = lockManager.acquire(child_node, owner, DataNode.LockType.READ);
               }
            }
   
   
  -         if(acquired) {
  -            if(gtx != null) {
  +         if (acquired)
  +         {
  +            if (gtx != null)
  +            {
                  // add the lock to the list of locks maintained for this transaction
                  // (needed for release of locks on commit or rollback)
                  cache.getTransactionTable().addLock(gtx, lockManager.getLock(child_node));
               }
  -            else {
  -               IdentityLock l=lockManager.getLock(child_node);
  +            else
  +            {
  +               IdentityLock l = lockManager.getLock(child_node);
                  List locks = getLocks(currentThread);
  -               if(!locks.contains(l))
  +               if (!locks.contains(l))
  +               {
                     locks.add(l);
               }
            }
  +         }
   
  -         if(recursive && i == (treeNodeSize - 1)) {
  +         if (recursive && i == (treeNodeSize - 1))
  +         {
               //Set acquired_locks=child_node.acquireAll(owner, lock_timeout, lock_type);
               Set acquired_locks = lockManager.acquireAll(child_node, owner, lock_type);
  -            if(acquired_locks.size() > 0) {
  -               if(gtx != null) {
  +            if (acquired_locks.size() > 0)
  +            {
  +               if (gtx != null)
  +               {
                     cache.getTransactionTable().addLocks(gtx, acquired_locks);
                  }
  -               else {
  +               else
  +               {
                     List locks = getLocks(currentThread);
                     locks.addAll(acquired_locks);
                  }
               }
            }
  -         n=child_node;
  +         n = child_node;
         }
      }
   
  -   private List getLocks(Thread currentThread) {
  +   private List getLocks(Thread currentThread)
  +   {
         // This sort of looks like a get/put race condition, but
         // since we key off the Thread, it's not
  -      List locks = (List)lock_table.get(currentThread);
  -      if (locks == null) {
  +      List locks = (List) lock_table.get(currentThread);
  +      if (locks == null)
  +      {
           locks = Collections.synchronizedList(new LinkedList());
           lock_table.put(currentThread, locks);
         }
  @@ -286,16 +324,19 @@
       private void createNodes(Fqn fqn, GlobalTransaction gtx)
       {
           int treeNodeSize;
  -        if ((treeNodeSize=fqn.size()) == 0) return;
  -        Node n=cache;
  -        for(int i=0; i < treeNodeSize; i++)
  +      if ((treeNodeSize = fqn.size()) == 0) return;
  +      Node n = cache;
  +      for (int i = 0; i < treeNodeSize; i++)
           {
  -            Object child_name=fqn.get(i);
  +         Object child_name = fqn.get(i);
               cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
  -            Node child_node= n.addChild(new Fqn(child_name)); //, gtx, true);
  -            if(child_node == null)
  +         Node child_node = n.addChild(new Fqn(child_name)); //, gtx, true);
  +         if (child_node == null)
               {
  -                if(log.isTraceEnabled()) log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
  +            if (log.isTraceEnabled())
  +            {
  +               log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
  +            }
                   return;
               }
               n = child_node;
  @@ -305,14 +346,19 @@
   
      /**
       * Remove all locks held by <tt>tx</tt>, remove the transaction from the transaction table
  +    *
       * @param gtx
       */
  -   private void commit(GlobalTransaction gtx) {
  -      if(log.isTraceEnabled())
  +   private void commit(GlobalTransaction gtx)
  +   {
  +      if (log.isTraceEnabled())
  +      {
            log.trace("committing cache with gtx " + gtx);
  +      }
   
  -      TransactionEntry entry=tx_table.get(gtx);
  -      if(entry == null) {
  +      TransactionEntry entry = tx_table.get(gtx);
  +      if (entry == null)
  +      {
            log.error("entry for transaction " + gtx + " not found (maybe already committed)");
            return;
         }
  @@ -320,9 +366,11 @@
         // Let's do it in stack style, LIFO
         entry.releaseAllLocksLIFO(gtx);
   
  -      Transaction ltx=entry.getTransaction();
  -      if(log.isTraceEnabled())
  +      Transaction ltx = entry.getTransaction();
  +      if (log.isTraceEnabled())
  +      {
            log.trace("removing local transaction " + ltx + " and global transaction " + gtx);
  +      }
         tx_table.remove(ltx);
         tx_table.remove(gtx);
      }
  @@ -339,15 +387,19 @@
        *
        * @param tx
        */
  -   private void rollback(GlobalTransaction tx) {
  +   private void rollback(GlobalTransaction tx)
  +   {
         List undo_ops;
  -      TransactionEntry entry=tx_table.get(tx);
  +      TransactionEntry entry = tx_table.get(tx);
         MethodCall undo_op;
   
  -      if(log.isTraceEnabled())
  +      if (log.isTraceEnabled())
  +      {
            log.trace("called to rollback cache with GlobalTransaction=" + tx);
  +      }
   
  -      if(entry == null) {
  +      if (entry == null)
  +      {
            log.error("entry for transaction " + tx + " not found (transaction has possibly already been rolled back)");
            return;
         }
  @@ -370,15 +422,16 @@
   //         }
   //      }
   
  -
         // 3. Finally, release all locks held by this TX
         // Let's do it in stack style, LIFO
         // Note that the lock could have been released already so don't panic.
         entry.releaseAllLocksLIFO(tx);
   
  -      Transaction ltx=entry.getTransaction();
  -      if(log.isTraceEnabled())
  +      Transaction ltx = entry.getTransaction();
  +      if (log.isTraceEnabled())
  +      {
            log.trace("removing local transaction " + ltx + " and global transaction " + tx);
  +      }
         tx_table.remove(ltx);
         tx_table.remove(tx);
      }
  
  
  
  1.39      +118 -114  JBossCache/src/org/jboss/cache/interceptors/ReplicationInterceptor.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ReplicationInterceptor.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/ReplicationInterceptor.java,v
  retrieving revision 1.38
  retrieving revision 1.39
  diff -u -b -r1.38 -r1.39
  --- ReplicationInterceptor.java	25 Aug 2006 14:10:07 -0000	1.38
  +++ ReplicationInterceptor.java	29 Aug 2006 16:35:52 -0000	1.39
  @@ -4,8 +4,8 @@
   import org.jboss.cache.InvocationContext;
   import org.jboss.cache.config.Configuration;
   import org.jboss.cache.config.Option;
  -import org.jboss.cache.marshall.MethodDeclarations;
   import org.jboss.cache.marshall.MethodCall;
  +import org.jboss.cache.marshall.MethodDeclarations;
   
   /**
    * Takes care of replicating modifications to other nodes in a cluster. Also
  @@ -13,7 +13,7 @@
    * 'side-ways' (see docs/design/Refactoring.txt).
    *
    * @author Bela Ban
  - * @version $Id: ReplicationInterceptor.java,v 1.38 2006/08/25 14:10:07 msurtani Exp $
  + * @version $Id: ReplicationInterceptor.java,v 1.39 2006/08/29 16:35:52 msurtani Exp $
    */
   public class ReplicationInterceptor extends BaseRpcInterceptor
   {
  @@ -25,9 +25,9 @@
   
           boolean isLocalCommitOrRollback = gtx != null && !gtx.isRemote() && (m.getMethodId() == MethodDeclarations.commitMethod_id || m.getMethodId() == MethodDeclarations.rollbackMethod_id);
   
  -
           // pass up the chain if not a local commit or rollback (in which case replicate first)
           Object o = isLocalCommitOrRollback ? null : super.invoke(m);
  +//       ctx = cache.getInvocationContext();
   
           Option optionOverride = ctx.getOptionOverrides();
   
  @@ -44,7 +44,7 @@
               if (gtx != null && !gtx.isRemote())
               {
                   // lets see what sort of method we've got.
  -               switch(m.getMethodId())
  +            switch (m.getMethodId())
                  {
                     case MethodDeclarations.commitMethod_id:
                        // REPL_ASYNC will result in only a prepare() method - 1 phase commit.
  @@ -53,14 +53,18 @@
                        o = super.invoke(m);
                        break;
                     case MethodDeclarations.prepareMethod_id:
  -                     if (containsModifications(m)) {
  +                  if (containsModifications(m))
  +                  {
                           // this is a prepare method
                           runPreparePhase(m, gtx);
                        }
                        break;
                     case MethodDeclarations.rollbackMethod_id:
                        // REPL_ASYNC will result in only a prepare() method - 1 phase commit.
  -                     if (containsModifications(m) && !ctx.isLocalRollbackOnly()) replicateCall(m, configuration.isSyncRollbackPhase());
  +                  if (containsModifications(m) && !ctx.isLocalRollbackOnly())
  +                  {
  +                     replicateCall(m, configuration.isSyncRollbackPhase());
  +                  }
                        // now pass up the chain
                        o = super.invoke(m);
                        break;
  
  
  



More information about the jboss-cvs-commits mailing list