[jboss-cvs] JBossCache/src/org/jboss/cache/interceptors ...

Manik Surtani msurtani at jboss.com
Sat Nov 11 14:55:19 EST 2006


  User: msurtani
  Date: 06/11/11 14:55:19

  Modified:    src/org/jboss/cache/interceptors    TxInterceptor.java
                        OptimisticValidatorInterceptor.java
                        OptimisticReplicationInterceptor.java
  Log:
  JBCACHE-842 and JBCACHE-843
  
  Revision  Changes    Path
  1.64      +30 -4     JBossCache/src/org/jboss/cache/interceptors/TxInterceptor.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TxInterceptor.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/TxInterceptor.java,v
  retrieving revision 1.63
  retrieving revision 1.64
  diff -u -b -r1.63 -r1.64
  --- TxInterceptor.java	10 Nov 2006 02:48:46 -0000	1.63
  +++ TxInterceptor.java	11 Nov 2006 19:55:19 -0000	1.64
  @@ -20,6 +20,7 @@
   import org.jboss.cache.marshall.MethodCall;
   import org.jboss.cache.marshall.MethodCallFactory;
   import org.jboss.cache.marshall.MethodDeclarations;
  +import org.jboss.cache.optimistic.DataVersion;
   import org.jgroups.Address;
   
   import javax.transaction.Status;
  @@ -355,7 +356,7 @@
         // TODO: Manik: Refactor this to pass across entire workspace?
         Object retval;
         if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx);
  -      replayModifications(modifications, ltx);
  +      replayModifications(modifications, ltx, true);
         retval = super.invoke(m);
         // JBCACHE-361 Confirm that the transaction is ACTIVE
         if (!isActive(ltx))
  @@ -376,7 +377,7 @@
            // now pass up the prepare method itself.
            try
            {
  -            replayModifications(modifications, ltx);
  +            replayModifications(modifications, ltx, false);
               if (isOnePhaseCommitPrepareMehod(m))
               {
                  log.trace("Using one-phase prepare.  Not propagating the prepare call up the stack until called to do so by the sync handler.");
  @@ -449,7 +450,7 @@
         return null;
      }
   
  -   private Object replayModifications(List<MethodCall> modifications, Transaction tx)
  +   private Object replayModifications(List<MethodCall> modifications, Transaction tx, boolean injectDataVersions)
      {
         Object retval = null;
   
  @@ -459,7 +460,20 @@
            {
               try
               {
  +               if (injectDataVersions)
  +               {
  +                  Object[] origArgs = modification.getArgs();
  +                  injectDataVersion((DataVersion) origArgs[origArgs.length - 1]);
  +                  // modify the call to the non-dataversioned counterpart since we've popped out the data version
  +                  Object[] args = new Object[origArgs.length - 1];
  +                  System.arraycopy(origArgs, 0, args, 0, args.length);
  +
  +                  retval = super.invoke(MethodCallFactory.create(MethodDeclarations.getUnversionedMethod(modification.getMethodId()), args));
  +               }
  +               else
  +               {
                  retval = super.invoke(modification);
  +               }
                  if (!isActive(tx))
                  {
                     throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
  @@ -470,6 +484,11 @@
                  log.error("method invocation failed", t);
                  retval = t;
               }
  +            finally
  +            {
  +               // reset any options
  +               if (injectDataVersions) cache.getInvocationContext().setOptionOverrides(null);
  +            }
               if (retval != null && retval instanceof Exception)
               {
                  throw new RuntimeException((Exception) retval);
  @@ -480,6 +499,13 @@
         return retval;
      }
   
  +   public void injectDataVersion(DataVersion v)
  +   {
  +      Option o = new Option();
  +      o.setDataVersion(v);
  +      cache.getInvocationContext().setOptionOverrides(o);
  +   }
  +
      /**
       * Handles a commit or a rollback for a remote gtx.  Called by invoke().
       *
  
  
  
  1.38      +13 -3     JBossCache/src/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: OptimisticValidatorInterceptor.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java,v
  retrieving revision 1.37
  retrieving revision 1.38
  diff -u -b -r1.37 -r1.38
  --- OptimisticValidatorInterceptor.java	10 Nov 2006 20:32:51 -0000	1.37
  +++ OptimisticValidatorInterceptor.java	11 Nov 2006 19:55:19 -0000	1.38
  @@ -15,6 +15,7 @@
   import org.jboss.cache.TreeCacheProxyImpl;
   import org.jboss.cache.marshall.MethodCall;
   import org.jboss.cache.marshall.MethodDeclarations;
  +import org.jboss.cache.optimistic.DataVersioningException;
   import org.jboss.cache.optimistic.DefaultDataVersion;
   import org.jboss.cache.optimistic.TransactionWorkspace;
   import org.jboss.cache.optimistic.WorkspaceNode;
  @@ -132,10 +133,19 @@
            {
               throw new CacheException("Real node for " + fqn + " is null, and this wasn't newly created in this tx!");
            }
  -         if (!workspaceNode.isCreated() && realNode.getVersion().newerThan(workspaceNode.getVersion()))
  +
  +         if (!workspaceNode.isCreated())
  +         {
  +            // test that the 2 DataVersion types match up
  +            if (!realNode.getVersion().getClass().equals(workspaceNode.getVersion().getClass()))
  +            {
  +               throw new DataVersioningException("Attempting to apply data version of type " + workspaceNode.getVersion().getClass() + " to a node that already contains version of type " + realNode.getVersion().getClass());
  +            }
  +            if ((workspaceNode.isDeleted() || workspaceNode.isDirty()) && realNode.getVersion().newerThan(workspaceNode.getVersion()))
            {
               // we have an out of date node here
  -            throw new CacheException("DataNode [" + fqn + "] version " + ((OptimisticTreeNode) workspaceNode.getNode()).getVersion() + " is newer than workspace node " + workspaceNode.getVersion());
  +               throw new DataVersioningException("DataNode [" + fqn + "] version " + ((OptimisticTreeNode) workspaceNode.getNode()).getVersion() + " is newer than workspace node " + workspaceNode.getVersion());
  +            }
            }
         }
      }
  
  
  
  1.28      +83 -1     JBossCache/src/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: OptimisticReplicationInterceptor.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java,v
  retrieving revision 1.27
  retrieving revision 1.28
  diff -u -b -r1.27 -r1.28
  --- OptimisticReplicationInterceptor.java	10 Nov 2006 20:32:51 -0000	1.27
  +++ OptimisticReplicationInterceptor.java	11 Nov 2006 19:55:19 -0000	1.28
  @@ -8,14 +8,22 @@
   
   import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
   import org.jboss.cache.CacheException;
  +import org.jboss.cache.Fqn;
   import org.jboss.cache.GlobalTransaction;
   import org.jboss.cache.InvocationContext;
  +import org.jboss.cache.OptimisticTransactionEntry;
   import org.jboss.cache.config.Configuration;
   import org.jboss.cache.config.Option;
   import org.jboss.cache.marshall.MethodCall;
   import org.jboss.cache.marshall.MethodCallFactory;
   import org.jboss.cache.marshall.MethodDeclarations;
  +import org.jboss.cache.optimistic.DataVersion;
  +import org.jboss.cache.optimistic.DefaultDataVersion;
  +import org.jboss.cache.optimistic.TransactionWorkspace;
  +import org.jboss.cache.optimistic.WorkspaceNode;
   
  +import java.util.ArrayList;
  +import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
   
  @@ -153,13 +161,16 @@
         if (cache.getMembers() != null && cache.getMembers().size() > 1)
         {
   
  +         // See JBCACHE-843 and docs/design/DataVersioning.txt
  +         MethodCall toBroadcast = mapDataVersionedMethodCalls(methodCall, getTransactionWorkspace(gtx));
  +
            //record the things we have possibly sent
            broadcastTxs.put(gtx, gtx);
            if (log.isDebugEnabled()) log.debug("(" + cache.getLocalAddress()
                    + "): broadcasting prepare for " + gtx
                    + " (" + num_mods + " modifications");
   
  -         replicateCall(methodCall, remoteCallSync);
  +         replicateCall(toBroadcast, remoteCallSync);
         }
         else
         {
  @@ -225,4 +236,75 @@
            }
         }
      }
  +
  +   private MethodCall mapDataVersionedMethodCalls(MethodCall m, TransactionWorkspace w)
  +   {
  +      Object[] origArgs = m.getArgs();
  +      return MethodCallFactory.create(m.getMethod(), origArgs[0], translate((List) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4]);
  +   }
  +
  +   /**
  +    * Translates a list of MethodCalls from non-versioned calls to versioned calls.
  +    */
  +   private List translate(List l, TransactionWorkspace w)
  +   {
  +      List newList = new ArrayList();
  +      Iterator origCalls = l.iterator();
  +      while (origCalls.hasNext())
  +      {
  +         MethodCall origCall = (MethodCall) origCalls.next();
  +         Object[] origArgs = origCall.getArgs();
  +         // get the data version associated with this orig call.
  +
  +         // since these are all crud methods the Fqn is at arg subscript 1.
  +         Fqn fqn = (Fqn) origArgs[1];
  +         // now get a hold of the data version for this specific modification
  +         DataVersion versionToBroadcast = getVersionToBroadcast(w, fqn);
  +
  +         // build up the new arguments list for the new call.  Identical to the original lis except that it has the
  +         // data version tacked on to the end.
  +         Object[] newArgs = new Object[origArgs.length + 1];
  +         for (int i = 0; i < origArgs.length; i++) newArgs[i] = origArgs[i];
  +         newArgs[origArgs.length] = versionToBroadcast;
  +
  +         // now create a new method call which contains this data version
  +         MethodCall newCall = MethodCallFactory.create(MethodDeclarations.getVersionedMethod(origCall.getMethodId()), newArgs);
  +
  +         // and add it to the new list.
  +         newList.add(newCall);
  +      }
  +      return newList;
  +   }
  +
  +   /**
  +    * Digs out the DataVersion for a given Fqn.  If the versioning is explicit, it is passed as-is.  If implicit, it is
  +    * cloned and then incremented, and the clone is returned.
  +    */
  +   private DataVersion getVersionToBroadcast(TransactionWorkspace w, Fqn f)
  +   {
  +      WorkspaceNode n = w.getNode(f);
  +      if (w.isVersioningImplicit())
  +      {
  +         DefaultDataVersion v = (DefaultDataVersion) n.getVersion();
  +         return v.increment();
  +      }
  +      else
  +      {
  +         return n.getVersion();
  +      }
  +   }
  +
  +   protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws CacheException
  +   {
  +      OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry) cache.getTransactionTable().get(gtx);
  +
  +      if (transactionEntry == null)
  +      {
  +         throw new CacheException("unable to map global transaction " + gtx + " to transaction entry");
  +      }
  +
  +      // try and get the workspace from the transaction
  +      return transactionEntry.getTransactionWorkSpace();
  +   }
  +
   }
  \ No newline at end of file
  
  
  



More information about the jboss-cvs-commits mailing list