[jbosscache-commits] JBoss Cache SVN: r5834 - in core/trunk/src: main/java/org/jboss/cache/factories and 2 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue May 13 10:10:18 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-05-13 10:10:18 -0400 (Tue, 13 May 2008)
New Revision: 5834

Added:
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
   core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java
   core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
Log:
JBCACHE-1339: Refactor and simplify TxInterceptor, and other enhancements

Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -93,9 +93,14 @@
        */
       public boolean isInvalidation()
       {
-         return this.equals(INVALIDATION_SYNC) || this.equals(INVALIDATION_SYNC);
+         return this == INVALIDATION_SYNC || this == INVALIDATION_SYNC;
       }
 
+      public boolean isSynchronous()
+      {
+         return this == REPL_SYNC || this == INVALIDATION_SYNC;
+      }
+
    }
 
    public static CacheMode legacyModeToCacheMode(int legacyMode)

Modified: core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -254,13 +254,6 @@
       return new OptimisticPrepareCommand(gtx, modifications, data, address, onePhaseCommit);
    }
 
-
-   public OptimisticPrepareCommand buildOptimisticPrepareCommand(GlobalTransaction gtx, ReversibleCommand command)
-   {
-      List<ReversibleCommand> list = (List<ReversibleCommand>) (command != null ? Collections.singletonList(command) : Collections.emptyList());
-      return buildOptimisticPrepareCommand(gtx, list, null, null, false);
-   }
-
    public AnnounceBuddyPoolNameCommand buildAnnounceBuddyPoolNameCommand(Address address, String buddyPoolName)
    {
       AnnounceBuddyPoolNameCommand command = new AnnounceBuddyPoolNameCommand(address, buddyPoolName);

Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -34,12 +34,10 @@
 
    private CommandInterceptor createInterceptor(Class<? extends CommandInterceptor> clazz) throws IllegalAccessException, InstantiationException
    {
-//      CommandInterceptor chainedInterceptor = componentRegistry.getComponent(clazz.getName(), clazz);
       CommandInterceptor chainedInterceptor = componentRegistry.getComponent(clazz);
       if (chainedInterceptor == null)
       {
          chainedInterceptor = clazz.newInstance();
-//         componentRegistry.registerComponent(clazz.getName(), chainedInterceptor, clazz);
          componentRegistry.registerComponent(chainedInterceptor, clazz);
       }
       else
@@ -66,7 +64,8 @@
          interceptorChain.appendIntereceptor(createInterceptor(CacheMgmtInterceptor.class));
 
       // load the tx interceptor
-      interceptorChain.appendIntereceptor(createInterceptor(TxInterceptor.class));
+      interceptorChain.appendIntereceptor(createInterceptor(optimistic ? OptimisticTxInterceptor.class : TxInterceptor.class));
+
       if (configuration.isUseLazyDeserialization())
          interceptorChain.appendIntereceptor(createInterceptor(MarshalledValueInterceptor.class));
       interceptorChain.appendIntereceptor(createInterceptor(NotificationInterceptor.class));

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -9,7 +9,6 @@
 import org.jboss.cache.cluster.ReplicationQueue;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.commands.VisitableCommand;
-import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.config.Option;
 import org.jboss.cache.factories.CommandsFactory;
 import org.jboss.cache.factories.annotations.Inject;
@@ -56,8 +55,7 @@
    private void init()
    {
       usingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled();
-      CacheMode mode = configuration.getCacheMode();
-      defaultSynchronous = (mode == CacheMode.REPL_SYNC || mode == CacheMode.INVALIDATION_SYNC);
+      defaultSynchronous = configuration.getCacheMode().isSynchronous();
    }
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -43,7 +43,7 @@
       }
    }
 
-   protected void setTransactionalContext(Transaction tx, GlobalTransaction gtx, InvocationContext ctx)
+   protected void setTransactionalContext(Transaction tx, GlobalTransaction gtx, TransactionEntry entry, InvocationContext ctx)
    {
       if (trace)
       {
@@ -52,7 +52,14 @@
       }
       ctx.setTransaction(tx);
       ctx.setGlobalTransaction(gtx);
-      if (gtx != null) ctx.setTransactionEntry(txTable.get(gtx));
+      if (entry == null)
+      {
+         if (gtx != null) ctx.setTransactionEntry(txTable.get(gtx));
+      }
+      else
+      {
+         ctx.setTransactionEntry(entry);
+      }
    }
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -26,10 +26,8 @@
 import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
 import org.jboss.cache.commands.tx.CommitCommand;
 import org.jboss.cache.commands.tx.RollbackCommand;
-import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.CommandsFactory;
 import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.factories.annotations.Start;
 import org.jboss.cache.marshall.NodeData;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jgroups.Address;
@@ -66,7 +64,6 @@
 public class DataGravitatorInterceptor extends BaseRpcInterceptor
 {
    private BuddyManager buddyManager;
-   private boolean syncCommunications = false;
    /**
     * Map that contains commands that need cleaning up.  This is keyed on global transaction, and contains a list of
     * cleanup commands corresponding to all gravitate calls made during the course of the transaction in question.
@@ -85,12 +82,6 @@
       this.cacheSPI = cacheSPI;
    }
 
-   @Start
-   public void startInterceptor()
-   {
-      syncCommunications = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC || configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC;
-   }
-
    @Override
    public Object visitGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
    {

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -44,65 +44,65 @@
    @Override
    public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), false);
    }
 
    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), false);
    }
 
    @Override
    public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), false);
    }
 
    @Override
    public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), false);
    }
 
    @Override
    public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), false);
    }
 
    @Override
    public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), true);
    }
 
    @Override
    public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), true);
    }
 
    @Override
    public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), true);
    }
 
    @Override
    public Object visitOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
    {
-      return handleAll(ctx, command, command.getGlobalTransaction());
+      return handleAll(ctx, command, command.getGlobalTransaction(), true);
    }
 
    @Override
    public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
    {
-      return handleAll(ctx, command, null);
+      return handleAll(ctx, command, null, false);
    }
 
    @SuppressWarnings("deprecation")
-   public Object handleAll(InvocationContext ctx, VisitableCommand command, GlobalTransaction gtx) throws Throwable
+   private Object handleAll(InvocationContext ctx, VisitableCommand command, GlobalTransaction gtx, boolean scrubContextOnCompletion) throws Throwable
    {
       Option optionOverride = ctx.getOptionOverrides();
       boolean suppressExceptions = false;
@@ -118,11 +118,11 @@
             Transaction tx = getTransaction();
             GlobalTransaction realGtx = getGlobalTransaction(tx, gtx);
             if (tx == null && realGtx != null && realGtx.isRemote()) tx = txTable.getLocalTransaction(gtx);
-            setTransactionalContext(tx, realGtx, ctx);
+            setTransactionalContext(tx, realGtx, null, ctx);
          }
          else
          {
-            setTransactionalContext(null, null, ctx);
+            setTransactionalContext(null, null, null, ctx);
          }
 
          if (optionOverride != null)
@@ -134,7 +134,7 @@
                if (ctx.getTransaction() != null)
                {
                   suspendedTransaction = txManager.suspend();
-                  setTransactionalContext(null, null, ctx);
+                  setTransactionalContext(null, null, null, ctx);
                   if (trace) log.trace("Suspending transaction " + suspendedTransaction);
                   resumeSuspended = true;
                }
@@ -169,10 +169,19 @@
       }
       finally
       {
+         /*
+          * we should scrub txs after every call to prevent race conditions
+          * basically any other call coming in on the same thread and hijacking any running tx's
+          * was highlighted in JBCACHE-606
+          */
+         if (scrubContextOnCompletion) setTransactionalContext(null, null, null, ctx);
+
          // clean up any invocation-scope options set up
          if (trace) log.trace("Resetting invocation-scope options");
          ctx.getOptionOverrides().reset();
 
+         // if this is a prepare, opt prepare or
+
          if (resumeSuspended)
          {
             txManager.resume(suspendedTransaction);

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -51,6 +51,8 @@
       {
          timeout = ctx.getOptionOverrides().getLockAcquisitionTimeout();
       }
+
+      boolean succeeded = false;
       try
       {
          TransactionWorkspace<?, ?> workspace = getTransactionWorkspace(ctx);
@@ -75,25 +77,23 @@
             }
 
          }
+
+         // locks have acquired so lets pass on up
+         Object retval = invokeNextInterceptor(ctx, command);
+         succeeded = true;
+         return retval;
       }
       catch (Throwable e)
       {
+         succeeded = false;
          log.debug("Caught exception attempting to lock nodes ", e);
          //we have failed - set to rollback and throw exception
-         try
-         {
-            unlock(ctx, gtx);
-         }
-         catch (Throwable t)
-         {
-            // we have failed to unlock - now what?
-            log.error("Failed to unlock nodes, after failing to lock nodes during a prepare!  Locks are possibly in a very inconsistent state now!", t);
-         }
          throw e;
       }
-
-      // locks have acquired so lets pass on up
-      return invokeNextInterceptor(ctx, command);
+      finally
+      {
+         if (!succeeded || command.isOnePhaseCommit()) unlock(ctx, gtx);
+      }
    }
 
    @Override
@@ -119,15 +119,7 @@
       }
       finally
       {
-         try
-         {
-            unlock(ctx, getGlobalTransaction(ctx));
-         }
-         catch (Exception e)
-         {
-            // we have failed to unlock - now what?
-            log.error("Failed to unlock nodes after a commit or rollback!  Locks are possibly in a very inconsistent state now!", e);
-         }
+         unlock(ctx, getGlobalTransaction(ctx));
       }
       return retval;
    }
@@ -139,8 +131,16 @@
     */
    private void unlock(InvocationContext ctx, GlobalTransaction gtx)
    {
-      TransactionEntry entry = ctx.getTransactionEntry();
-      entry.releaseAllLocksFIFO(gtx);
+      try
+      {
+         TransactionEntry entry = ctx.getTransactionEntry();
+         entry.releaseAllLocksFIFO(gtx);
+      }
+      catch (Exception e)
+      {
+         // we have failed to unlock - now what?
+         log.error("Failed to unlock nodes after a commit or rollback!  Locks are possibly in a very inconsistent state now!", e);
+      }
    }
 
 }

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -26,7 +26,6 @@
 import org.jboss.cache.commands.write.RemoveDataCommand;
 import org.jboss.cache.commands.write.RemoveKeyCommand;
 import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Option;
 import org.jboss.cache.factories.CommandsFactory;
 import org.jboss.cache.factories.annotations.Inject;
@@ -187,8 +186,6 @@
 
    protected void broadcastPrepare(OptimisticPrepareCommand command, GlobalTransaction gtx, InvocationContext ctx) throws Throwable
    {
-      boolean remoteCallSync = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
-
       // this method will return immediately if we're the only member
       if (rpcManager.getMembers() != null && rpcManager.getMembers().size() > 1)
       {
@@ -210,7 +207,7 @@
          {
             log.debug("(" + rpcManager.getLocalAddress() + "): broadcasting prepare for " + gtx + " (" + command.getModificationsCount() + " modifications");
          }
-         replicateCall(ctx, toBroadcast, remoteCallSync, ctx.getOptionOverrides());
+         replicateCall(ctx, toBroadcast, defaultSynchronous, ctx.getOptionOverrides());
       }
       else
       {

Added: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -0,0 +1,222 @@
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.commands.AbstractVisitor;
+import org.jboss.cache.commands.VersionedDataCommand;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.write.RemoveDataCommand;
+import org.jboss.cache.commands.write.RemoveKeyCommand;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.OptimisticTransactionEntry;
+import org.jboss.cache.transaction.TransactionEntry;
+
+import javax.transaction.Transaction;
+import java.util.List;
+
+/**
+ * A new interceptor to simplify functionality in the {@link org.jboss.cache.interceptors.TxInterceptor}.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.2.0
+ */
+public class OptimisticTxInterceptor extends TxInterceptor
+{
+   protected final ModificationsReplayVisitor replayVisitor = new ModificationsReplayVisitor();
+
+   @Override
+   public Object visitOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
+   {
+      // nothing really different from a pessimistic prepare command.
+      return visitPrepareCommand(ctx, command);
+   }
+
+   @Override
+   public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+   {
+      try
+      {
+         Transaction tx = ctx.getTransaction();
+         boolean implicitTransaction = tx == null;
+         if (implicitTransaction)
+         {
+            tx = createLocalTx();
+            // we need to attach this tx to the InvocationContext.
+            ctx.setTransaction(tx);
+         }
+
+         try
+         {
+            Object retval = attachGtxAndPassUpChain(ctx, command);
+            if (implicitTransaction)
+            {
+               copyInvocationScopeOptionsToTxScope(ctx);
+               copyForcedCacheModeToTxScope(ctx);
+               txManager.commit();
+            }
+            return retval;
+         }
+         catch (Throwable t)
+         {
+            if (implicitTransaction)
+            {
+               log.warn("Rolling back, exception encountered", t);
+               try
+               {
+                  copyInvocationScopeOptionsToTxScope(ctx);
+                  copyForcedCacheModeToTxScope(ctx);
+                  txManager.rollback();
+               }
+               catch (Throwable th)
+               {
+                  log.warn("Roll back failed encountered", th);
+               }
+               throw t;
+            }
+         }
+      }
+      catch (Throwable th)
+      {
+         ctx.throwIfNeeded(th);
+      }
+
+      return null;
+   }
+
+   private void copyForcedCacheModeToTxScope(InvocationContext ctx)
+   {
+      Option optionOverride = ctx.getOptionOverrides();
+      if (optionOverride != null
+            && (optionOverride.isForceAsynchronous() || optionOverride.isForceSynchronous()))
+      {
+         TransactionEntry entry = ctx.getTransactionEntry();
+         if (entry != null)
+         {
+            if (optionOverride.isForceAsynchronous())
+               entry.setForceAsyncReplication(true);
+            else
+               entry.setForceSyncReplication(true);
+         }
+      }
+   }
+
+   @Override
+   protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
+   {
+      return commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null, rpcManager.getLocalAddress(), onePhaseCommit);
+   }
+
+   /**
+    * Replays modifications by passing them up the interceptor chain.
+    *
+    * @throws Throwable
+    */
+   @Override
+   protected boolean replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable
+   {
+      if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + ctx.getGlobalTransaction());
+
+      // invoke all modifications by passing them up the chain, setting data versions first.
+      try
+      {
+         replayVisitor.visitCollection(ctx, command.getModifications());
+      }
+      catch (Throwable t)
+      {
+         log.error("Prepare failed!", t);
+         return false;
+      }
+      return true;
+   }
+
+   @Override
+   protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
+   {
+      TransactionEntry entry = ctx.getTransactionEntry();
+      if (entry != null)
+      {
+         entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
+         ((OptimisticTransactionEntry) entry).getTransactionWorkSpace().clearNodes();
+      }
+   }
+
+   @Override
+   protected TransactionEntry createNewTransactionEntry(Transaction tx) throws Exception
+   {
+      return new OptimisticTransactionEntry(tx);
+   }
+
+   private class ModificationsReplayVisitor extends AbstractVisitor
+   {
+      @Override
+      public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+      {
+         Object result = invokeNextInterceptor(ctx, command);
+         assertTxIsStillValid(ctx.getTransaction());
+         return result;
+      }
+
+      @Override
+      public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
+      {
+         return handleDataVersionCommand(ctx, command);
+      }
+
+      @Override
+      public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+      {
+         return handleDataVersionCommand(ctx, command);
+      }
+
+      @Override
+      public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
+      {
+         return handleDataVersionCommand(ctx, command);
+      }
+
+      @Override
+      public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand command) throws Throwable
+      {
+         return handleDataVersionCommand(ctx, command);
+      }
+
+      @Override
+      public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
+      {
+         return handleDataVersionCommand(ctx, command);
+      }
+
+      private Object handleDataVersionCommand(InvocationContext ctx, VersionedDataCommand command) throws Throwable
+      {
+         Option originalOption = ctx.getOptionOverrides();
+         if (command.isVersioned())
+         {
+            Option option = new Option();
+            option.setDataVersion(command.getDataVersion());
+            ctx.setOptionOverrides(option);
+         }
+         Object retval;
+         try
+         {
+            retval = invokeNextInterceptor(ctx, command);
+            assertTxIsStillValid(ctx.getTransaction());
+         }
+         catch (Throwable t)
+         {
+            log.error("method invocation failed", t);
+            throw t;
+         }
+         finally
+         {
+            ctx.setOptionOverrides(originalOption);
+         }
+         return retval;
+      }
+   }
+
+}

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -136,23 +136,21 @@
          }
       }
       log.debug("Successfully validated nodes");
-      return invokeNextInterceptor(ctx, command);
+      Object retval = invokeNextInterceptor(ctx, command);
+      if (command.isOnePhaseCommit())
+      {
+         // do a comit-phase
+         commitTransaction(ctx);
+      }
+      return retval;
    }
 
-   @Override
-   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+   private void commitTransaction(InvocationContext ctx)
    {
       GlobalTransaction gtx = getGlobalTransaction(ctx);
       TransactionWorkspace workspace;
-      try
-      {
-         workspace = getTransactionWorkspace(ctx);
-      }
-      catch (CacheException e)
-      {
-         log.warn("we can't rollback", e);
-         return invokeNextInterceptor(ctx, command);
-      }
+      workspace = getTransactionWorkspace(ctx);
+
       if (log.isDebugEnabled()) log.debug("Commiting successfully validated changes for GlobalTransaction " + gtx);
       Collection<WorkspaceNode> workspaceNodes = workspace.getNodes().values();
       for (WorkspaceNode workspaceNode : workspaceNodes)
@@ -253,6 +251,12 @@
             }
          }
       }
+   }
+
+   @Override
+   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+   {
+      commitTransaction(ctx);
       return invokeNextInterceptor(ctx, command);
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -13,7 +13,6 @@
 import org.jboss.cache.commands.AbstractVisitor;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.commands.ReversibleCommand;
-import org.jboss.cache.commands.VersionedDataCommand;
 import org.jboss.cache.commands.VisitableCommand;
 import org.jboss.cache.commands.tx.CommitCommand;
 import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
@@ -26,7 +25,6 @@
 import org.jboss.cache.commands.write.RemoveDataCommand;
 import org.jboss.cache.commands.write.RemoveKeyCommand;
 import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Option;
 import org.jboss.cache.factories.CommandsFactory;
 import org.jboss.cache.factories.ComponentRegistry;
@@ -34,9 +32,9 @@
 import org.jboss.cache.invocation.InvocationContextContainer;
 import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.OptimisticTransactionEntry;
 import org.jboss.cache.transaction.TransactionEntry;
 import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.concurrent.ConcurrentHashSet;
 
 import javax.transaction.InvalidTransactionException;
 import javax.transaction.Status;
@@ -48,6 +46,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -61,22 +60,17 @@
  */
 public class TxInterceptor extends BaseTransactionalContextInterceptor implements TxInterceptorMBean
 {
-   private final static Object NULL = new Object();
-
-   private CommandsFactory commandsFactory;
-   private RPCManager rpcManager;
+   protected CommandsFactory commandsFactory;
+   protected RPCManager rpcManager;
    private Notifier notifier;
    private InvocationContextContainer invocationContextContainer;
    private ComponentRegistry componentRegistry;
 
-   private final ModificationsReplayVisitor replayVisitorNoInject = new ModificationsReplayVisitor(false);
-   private final ModificationsReplayVisitor replayVisitorWithInject = new ModificationsReplayVisitor(true);
-
    /**
     * List <Transaction>that we have registered for
     */
-   private final Map transactions = new ConcurrentHashMap(16);
-   private final Map rollbackTransactions = new ConcurrentHashMap(16);
+   private final Set<Transaction> transactions = new ConcurrentHashSet<Transaction>();
+   private final Map<Transaction, GlobalTransaction> rollbackTransactions = new ConcurrentHashMap<Transaction, GlobalTransaction>(16);
    private long prepares = 0;
    private long commits = 0;
    private long rollbacks = 0;
@@ -94,17 +88,9 @@
    }
 
    @Override
-   @SuppressWarnings("unchecked")
-   public Object visitOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
-   {
-      return visitPrepareCommand(ctx, command);
-   }
-
-   @Override
    public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
    {
       Object result = null;
-      boolean scrubTxsOnExit = false;
 
       // this is a prepare, commit, or rollback.
       if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction());
@@ -113,8 +99,7 @@
          if (ctx.getGlobalTransaction().isRemote())
          {
             result = handleRemotePrepare(ctx, command);
-            scrubTxsOnExit = true;
-            increasePrepares();
+            if (getStatisticsEnabled()) prepares++;
          }
          else
          {
@@ -126,18 +111,10 @@
       {
          ctx.throwIfNeeded(e);
       }
-      finally
-      {
-         scrubOnExist(ctx, scrubTxsOnExit);
-      }
+
       return result;
    }
 
-   private void increasePrepares()
-   {
-      if (getStatisticsEnabled()) prepares++;
-   }
-
    @Override
    @SuppressWarnings("unchecked")
    public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
@@ -167,7 +144,7 @@
             }
             if (log.isDebugEnabled()) log.debug(" executing commit() with local TX " + ltx + " under global tx " + gtx);
             txManager.commit();
-            increaseCommits();
+            if (getStatisticsEnabled()) commits++;
          }
          finally
          {
@@ -187,10 +164,7 @@
       {
          ctx.throwIfNeeded(throwable);
       }
-      finally
-      {
-         scrubOnExist(ctx, true);
-      }
+
       return null;
    }
 
@@ -211,7 +185,6 @@
          {
             log.warn("No local transaction for this remotely originating rollback.  Possibly rolling back before a prepare call was broadcast?");
             txTable.remove(gtx);
-            scrubOnExist(ctx, true);
             return null;
          }
          // disconnect if we have a current tx associated
@@ -229,7 +202,7 @@
             }
             if (log.isDebugEnabled()) log.debug("executing with local TX " + ltx + " under global tx " + gtx);
             txManager.rollback();
-            increaseRollbacks();
+            if (getStatisticsEnabled()) rollbacks++;
          }
          finally
          {
@@ -251,26 +224,48 @@
       {
          ctx.throwIfNeeded(throwable);
       }
-      finally
-      {
-         scrubOnExist(ctx, true);
-      }
+
       return null;
    }
 
+   @Override
+   public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable
+   {
+      return invokeNextInterceptor(ctx, command);
+   }
+
    /**
-    * we should scrub txs after every call to prevent race conditions
-    * basically any other call coming in on the same thread and hijacking any running tx's
-    * was highlighted in JBCACHE-606
+    * Tests if we already have a tx running.  If so, register a sync handler for this method invocation.
+    * if not, create a local tx if we're using opt locking.
+    *
+    * @return
+    * @throws Throwable
     */
-   private void scrubOnExist(InvocationContext ctx, boolean scrubTxsOnExit)
+   @Override
+   public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
    {
-      if (scrubTxsOnExit)
+      try
       {
-         setTransactionalContext(null, null, ctx);
+         return attachGtxAndPassUpChain(ctx, command);
       }
+      catch (Throwable throwable)
+      {
+         ctx.throwIfNeeded(throwable);
+         return null;
+      }
    }
 
+   protected Object attachGtxAndPassUpChain(InvocationContext ctx, VisitableCommand command) throws Throwable
+   {
+      Transaction tx = ctx.getTransaction();
+      if (tx != null) attachGlobalTransaction(ctx, tx, command);
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   // ------------------------------------------------------------------------
+   // JMX statistics
+   // ------------------------------------------------------------------------
+
    public long getPrepares()
    {
       return prepares;
@@ -306,42 +301,56 @@
 
    // --------------------------------------------------------------
 
+   /**
+    * Handles a remotely originating prepare call, by creating a local transaction for the remote global transaction
+    * and replaying modifications in this new local transaction.
+    *
+    * @param ctx     invocation context
+    * @param command prepare command
+    * @return result of the prepare, typically a null.
+    * @throws Throwable in the event of problems.
+    */
    private Object handleRemotePrepare(InvocationContext ctx, PrepareCommand command) throws Throwable
    {
+      // the InvocationContextInterceptor would have set this for us
       GlobalTransaction gtx = ctx.getGlobalTransaction();
-      // Is there a local transaction associated with GTX ?
+
+      // Is there a local transaction associated with GTX?  (not the current tx associated with the thread, which may be
+      // in the invocation context
       Transaction ltx = txTable.getLocalTransaction(gtx);
       Transaction currentTx = txManager.getTransaction();
-      Object retval = null;
 
+      Object retval = null;
+      boolean success = false;
       try
       {
          if (ltx == null)
          {
             if (currentTx != null) txManager.suspend();
-            ltx = createLocalTxForGlobalTx(gtx, ctx);// creates new LTX and associates it with a GTX
+            // create a new local transaction
+            ltx = createLocalTx();
+            // associate this with a global tx
+            txTable.put(ltx, gtx);
+            if (trace) log.trace("Created new tx for gtx " + gtx);
+
             if (log.isDebugEnabled())
-            {
                log.debug("Started new local tx as result of remote prepare: local tx=" + ltx + " (status=" + ltx.getStatus() + "), gtx=" + gtx);
-            }
          }
          else
          {
             //this should be valid
-            if (!ctx.isValidTransaction())
+            if (!TransactionTable.isValid(ltx))
                throw new CacheException("Transaction " + ltx + " not in correct state to be prepared");
 
-            //associate this thread with this ltx if this ltx is NOT the current tx.
+            //associate this thread with the local transaction associated with the global transaction, IF the localTx is NOT the current tx.
             if (currentTx == null || !ltx.equals(currentTx))
             {
+               if (trace) log.trace("Suspending current tx " + currentTx);
                txManager.suspend();
                txManager.resume(ltx);
             }
          }
-         if (trace)
-         {
-            log.trace("Resuming existing transaction " + ltx + ", global TX=" + gtx);
-         }
+         if (trace) log.trace("Resuming existing tx " + ltx + ", global tx=" + gtx);
 
          // at this point we have a non-null ltx
 
@@ -353,239 +362,42 @@
          if (entry == null)
          {
             // create a new transaction entry
-            entry = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry(ltx) : new TransactionEntry(ltx);
-            log.debug("creating new tx entry");
+            if (log.isDebugEnabled()) log.debug("creating new tx entry");
+            entry = createNewTransactionEntry(ltx);
             txTable.put(gtx, entry);
-            ctx.setTransactionEntry(entry);
-            if (trace) log.trace("TxTable contents: " + txTable);
          }
 
-         setTransactionalContext(ltx, gtx, ctx);
+         setTransactionalContext(ltx, gtx, entry, ctx);
+
          // register a sync handler for this tx.
-         registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx);
+         registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, entry), ctx);
 
-         if (configuration.isNodeLockingOptimistic())
+         // replay modifications
+         success = replayModifications(ctx, ltx, command);
+
+         // now pass the prepare command up the chain as well.
+         if (command.isOnePhaseCommit())
          {
-            retval = handleOptimisticPrepare(ctx, gtx, ltx, (OptimisticPrepareCommand) command);
+            if (trace)
+               log.trace("Using one-phase prepare.  Not propagating the prepare call up the stack until called to do so by the sync handler.");
          }
          else
          {
-            retval = handlePessimisticPrepare(ctx, ltx, command);
+            // now pass up the prepare method itself.
+            invokeNextInterceptor(ctx, command);
          }
+         // JBCACHE-361 Confirm that the transaction is ACTIVE
+         assertTxIsStillValid(ltx);
       }
       finally
       {
-         txManager.suspend();// suspends ltx - could be null
-         // resume whatever else we had going.
-         if (currentTx != null) txManager.resume(currentTx);
-         if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
-      }
+         // if we are running a one-phase commit, perform a commit or rollback now.
+         if (trace) log.trace("Are we running a 1-phase commit? " + command.isOnePhaseCommit());
 
-      return retval;
-   }
-   //   handler methods.
-   // --------------------------------------------------------------
-
-
-   @Override
-   public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable
-   {
-      return invokeNextInterceptor(ctx, command);
-   }
-
-   /**
-    * Tests if we already have a tx running.  If so, register a sync handler for this method invocation.
-    * if not, create a local tx if we're using opt locking.
-    *
-    * @return
-    * @throws Throwable
-    */
-   @Override
-   public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
-   {
-      Object result;
-      try
-      {
-         Transaction tx = ctx.getTransaction();
-         // if there is no current tx and we're using opt locking, we need to use an implicit tx.
-         boolean implicitTransaction = configuration.isNodeLockingOptimistic() && tx == null;
-         if (implicitTransaction)
-         {
-            tx = createLocalTx();
-            // we need to attach this tx to the InvocationContext.
-            ctx.setTransaction(tx);
-         }
-         if (tx != null)
-            attachGlobalTransaction(ctx, tx, command);
-
-         GlobalTransaction gtx = ctx.getGlobalTransaction();
-
-         try
-         {
-            result = invokeNextInterceptor(ctx, command);
-            if (implicitTransaction)
-            {
-               copyInvocationScopeOptionsToTxScope(ctx);
-               copyForcedCacheModeToTxScope(ctx);
-               txManager.commit();
-            }
-         }
-         catch (Throwable t)
-         {
-            if (implicitTransaction)
-            {
-               log.warn("Rolling back, exception encountered", t);
-               try
-               {
-                  setTransactionalContext(tx, gtx, ctx);
-                  txManager.rollback();
-               }
-               catch (Throwable th)
-               {
-                  log.warn("Roll back failed encountered", th);
-               }
-               throw t;
-            }
-            else
-            {
-               throw t;
-            }
-         }
-         return result;
-      }
-      catch (Throwable throwable)
-      {
-         ctx.throwIfNeeded(throwable);
-         return null;
-      }
-   }
-
-   private void copyForcedCacheModeToTxScope(InvocationContext ctx)
-   {
-      Option optionOverride = ctx.getOptionOverrides();
-      if (optionOverride != null
-            && (optionOverride.isForceAsynchronous() || optionOverride.isForceSynchronous()))
-      {
-         TransactionEntry entry = ctx.getTransactionEntry();
-         if (entry != null)
-         {
-            if (optionOverride.isForceAsynchronous())
-               entry.setForceAsyncReplication(true);
-            else
-               entry.setForceSyncReplication(true);
-         }
-      }
-   }
-
-   private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
-   {
-      if (trace)
-      {
-         log.trace(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
-      }
-      if (trace)
-      {
-         GlobalTransaction tempGtx = txTable.get(tx);
-         log.trace("Associated gtx in txTable is " + tempGtx);
-      }
-
-      // register a sync handler for this tx - only if the globalTransaction is not remotely initiated.
-      GlobalTransaction gtx = registerTransaction(tx, ctx);
-      if (gtx != null)
-      {
-         command = replaceGtx(command, gtx);
-      }
-      else
-      {
-         // get the current globalTransaction from the txTable.
-         gtx = txTable.get(tx);
-      }
-
-      // make sure we attach this globalTransaction to the invocation context.
-      ctx.setGlobalTransaction(gtx);
-
-      return command;
-   }
-
-   /**
-    * This is called by invoke() if we are in a remote gtx's prepare() phase.
-    * Finds the appropriate tx, suspends any existing txs, registers a sync handler
-    * and passes up the chain.
-    * <p/>
-    * Resumes any existing txs before returning.
-    *
-    * @throws Throwable
-    */
-   private Object handleOptimisticPrepare(InvocationContext ctx, GlobalTransaction gtx, Transaction ltx, OptimisticPrepareCommand command) throws Throwable
-   {
-      Object retval;
-      if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx);
-      replayVisitorWithInject.visitCollection(ctx, command.getModifications());
-      retval = invokeNextInterceptor(ctx, command);
-      // JBCACHE-361 Confirm that the transaction is ACTIVE
-      if (!TransactionTable.isActive(ltx))
-      {
-         throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE;" +
-               " is " + ltx.getStatus());
-      }
-      return retval;
-   }
-
-   private Object handlePessimisticPrepare(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Exception
-   {
-      boolean success = true;
-      Object retval = null;
-      try
-      {
-         // now pass up the prepare method itself.
-         try
-         {
-            replayVisitorNoInject.visitCollection(ctx, command.getModifications());
-            if (command.isOnePhaseCommit())
-            {
-               if (trace)
-                  log.trace("Using one-phase prepare.  Not propagating the prepare call up the stack until called to do so by the sync handler.");
-            }
-            else
-            {
-               retval = invokeNextInterceptor(ctx, command);
-            }
-
-            // JBCACHE-361 Confirm that the transaction is ACTIVE
-            if (!ctx.isValidTransaction())
-            {
-               throw new ReplicationException("prepare() failed -- " +
-                     "local transaction status is not valid;" +
-                     " is " + ltx.getStatus());
-            }
-         }
-         catch (Throwable th)
-         {
-            log.error("prepare method invocation failed", th);
-            retval = th;
-            success = false;
-            if (retval instanceof Exception)
-            {
-               throw (Exception) retval;
-            }
-         }
-      }
-      finally
-      {
-
-         if (trace)
-         {
-            log.trace("Are we running a 1-phase commit? " + command.isOnePhaseCommit());
-         }
-         // 4. If commit == true (one-phase-commit): commit (or rollback) the TX; this will cause
-         //    {before/after}Completion() to be called in all registered interceptors: the TransactionInterceptor
-         //    will then commit/rollback against the cache
-
          if (command.isOnePhaseCommit())
          {
             try
             {
-               //                    invokeOnePhaseCommitMethod(globalTransaction, modifications.size() > 0, success);
                if (success)
                {
                   ltx.commit();
@@ -618,92 +430,76 @@
                transactions.remove(ltx);// JBAS-298
             }
          }
+
+         txManager.suspend();// suspends ltx - could be null
+         // resume whatever else we had going.
+         if (currentTx != null) txManager.resume(currentTx);
+         if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
       }
+
       return retval;
    }
 
-   public class ModificationsReplayVisitor extends AbstractVisitor
+   protected TransactionEntry createNewTransactionEntry(Transaction tx) throws Exception
    {
-      private final boolean injectDataVersions;
+      return new TransactionEntry(tx);
+   }
 
-      public ModificationsReplayVisitor(boolean injectDataVersions)
+   private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
+   {
+      if (trace)
       {
-         this.injectDataVersions = injectDataVersions;
+         log.trace(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
       }
-
-      @Override
-      public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+      if (trace)
       {
-         Object result = invokeNextInterceptor(ctx, command);
-         assertTxIsActive(ctx);
-         return result;
+         GlobalTransaction tempGtx = txTable.get(tx);
+         log.trace("Associated gtx in txTable is " + tempGtx);
       }
 
-      @Override
-      public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
+      // register a sync handler for this tx - only if the globalTransaction is not remotely initiated.
+      GlobalTransaction gtx = registerTransaction(tx, ctx);
+      if (gtx != null)
       {
-         return handleDataVersionCommand(ctx, command);
+         command = replaceGtx(command, gtx);
       }
-
-      @Override
-      public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+      else
       {
-         return handleDataVersionCommand(ctx, command);
+         // get the current globalTransaction from the txTable.
+         gtx = txTable.get(tx);
       }
 
-      @Override
-      public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
-      {
-         return handleDataVersionCommand(ctx, command);
-      }
+      // make sure we attach this globalTransaction to the invocation context.
+      ctx.setGlobalTransaction(gtx);
 
-      @Override
-      public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand command) throws Throwable
-      {
-         return handleDataVersionCommand(ctx, command);
-      }
+      return command;
+   }
 
-      @Override
-      public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
+   /**
+    * Replays modifications
+    *
+    * @param ctx
+    * @param ltx
+    * @param command
+    * @return
+    * @throws Exception
+    */
+   protected boolean replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable
+   {
+      try
       {
-         return handleDataVersionCommand(ctx, command);
-      }
-
-      private Object handleDataVersionCommand(InvocationContext ctx, VersionedDataCommand command) throws Throwable
-      {
-         if (!injectDataVersions) return handleDefault(ctx, command);
-         Option originalOption = ctx.getOptionOverrides();
-         if (command.isVersioned())
+         // replay modifications
+         for (ReversibleCommand modification : command.getModifications())
          {
-            Option option = new Option();
-            option.setDataVersion(command.getDataVersion());
-            ctx.setOptionOverrides(option);
+            invokeNextInterceptor(ctx, modification);
+            assertTxIsStillValid(ltx);
          }
-         Object retval;
-         try
-         {
-            retval = invokeNextInterceptor(ctx, command);
-            assertTxIsActive(ctx);
-         }
-         catch (Throwable t)
-         {
-            log.error("method invocation failed", t);
-            throw t;
-         }
-         finally
-         {
-            ctx.setOptionOverrides(originalOption);
-         }
-         return retval;
+         return true;
       }
-
-      private void assertTxIsActive(InvocationContext ctx)
-            throws SystemException
+      catch (Throwable th)
       {
-         if (!TransactionTable.isActive(ctx.getTransaction()))
-         {
-            throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + ctx.getTransaction().getStatus());
-         }
+         log.error("prepare failed!", th);
+         return false;
       }
    }
 
@@ -719,16 +515,6 @@
       }
    }
 
-   private void increaseCommits()
-   {
-      if (getStatisticsEnabled()) commits++;
-   }
-
-   private void increaseRollbacks()
-   {
-      if (getStatisticsEnabled()) rollbacks++;
-   }
-
    /**
     * Handles a commit or a rollback.  Called by the synch handler.  Simply tests that we are in the correct tx and
     * passes the meth call up the interceptor chain.
@@ -760,38 +546,24 @@
    //   Transaction phase runners
    // --------------------------------------------------------------
 
+   protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
+   {
+      return commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(), onePhaseCommit);
+   }
+
    /**
-    * creates a commit() MethodCall and feeds it to handleCommitRollback();
+    * creates a commit()
     */
-   protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx, List modifications, List clModifications, boolean onePhaseCommit)
+   protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, List clModifications, boolean onePhaseCommit)
    {
       // set the hasMods flag in the invocation ctx.  This should not be replicated, just used locally by the interceptors.
       ctx.setTxHasMods(modifications != null && modifications.size() > 0);
       ctx.setCacheLoaderHasMods(clModifications != null && clModifications.size() > 0);
       try
       {
-         VisitableCommand commitCommand;
-         if (onePhaseCommit)
-         {
-            // running a 1-phase commit.
-            if (configuration.isNodeLockingOptimistic())
-            {
-               commitCommand = commandsFactory.buildOptimisticPrepareCommand(gtx, null);
-            }
-            else
-            {
-               commitCommand = commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(), true);
-            }
-         }
-         else
-         {
-            commitCommand = commandsFactory.buildCommitCommand(gtx);
-         }
+         VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
 
-         if (trace)
-         {
-            log.trace(" running commit for " + gtx);
-         }
+         if (trace) log.trace("Running commit for " + gtx);
 
          handleCommitRollback(ctx, commitCommand);
       }
@@ -819,38 +591,27 @@
       }
    }
 
-
-   private void cleanupStaleLocks(InvocationContext ctx) throws Throwable
+   protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
    {
       TransactionEntry entry = ctx.getTransactionEntry();
-      if (entry != null)
-      {
-         entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
-      }
+      if (entry != null) entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
    }
 
    /**
-    * creates a rollback() MethodCall and feeds it to handleCommitRollback();
-    *
-    * @param gtx
+    * creates a rollback()
     */
    protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx, List<ReversibleCommand> modifications)
    {
-      //Transaction ltx = null;
       try
       {
          ctx.setTxHasMods(modifications != null && modifications.size() > 0);
 
          // JBCACHE-457
          VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
-         if (trace)
-         {
-            log.trace(" running rollback for " + gtx);
-         }
+         if (trace) log.trace(" running rollback for " + gtx);
 
          //JBCACHE-359 Store a lookup for the globalTransaction so a listener
          // callback can find it
-         //ltx = getLocalTxForGlobalTx(globalTransaction);
          rollbackTransactions.put(tx, gtx);
 
          handleCommitRollback(ctx, rollbackCommand);
@@ -861,7 +622,7 @@
       }
       finally
       {
-         if (tx != null) rollbackTransactions.remove(tx);
+         rollbackTransactions.remove(tx);
       }
    }
 
@@ -884,48 +645,35 @@
       return newList;
    }
 
+   private boolean isOnePhaseCommit()
+   {
+      if (!configuration.getCacheMode().isSynchronous())
+      {
+         // this is a REPL_ASYNC call - do 1-phase commit.  break!
+         if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
+         return true;
+      }
+      return false;
+   }
+
    /**
     * Handles a local prepare - invoked by the sync handler.  Tests if the current tx matches the gtx passed in to the
     * method call and passes the prepare() call up the chain.
-    *
-    * @return
-    * @throws Throwable
     */
    @SuppressWarnings("deprecation")
    public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<ReversibleCommand> modifications) throws Throwable
    {
-      // build the method call
-      VisitableCommand prepareCommand;
-      //        if (cache.getCacheModeInternal() != CacheImpl.REPL_ASYNC)
-      //        {
       // running a 2-phase commit.
-      if (configuration.isNodeLockingOptimistic())
-      {
-         prepareCommand = commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null, rpcManager.getLocalAddress(), false);
-      }
-      else if (configuration.getCacheMode() != Configuration.CacheMode.REPL_ASYNC)
-      {
-         prepareCommand = commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(),
-               false);// don't commit or rollback - wait for call
-      }
-      //}
-      else
-      {
-         // this is a REPL_ASYNC call - do 1-phase commit.  break!
-         if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
-         return null;
-      }
+      VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
 
-      // passes a prepare call up the local interceptor chain.  The replication interceptor
-      // will do the broadcasting if needed.  This is so all requests (local/remote) are
-      // treated the same
       Object result;
 
       // Is there a local transaction associated with GTX ?
       Transaction ltx = ctx.getTransaction();
 
       //if ltx is not null and it is already running
-      if (txManager.getTransaction() != null && ltx != null && txManager.getTransaction().equals(ltx))
+      Transaction currentTransaction = txManager.getTransaction();
+      if (currentTransaction != null && ltx != null && currentTransaction.equals(ltx))
       {
          VisitableCommand originalCommand = ctx.getCommand();
          ctx.setCommand(prepareCommand);
@@ -953,6 +701,20 @@
    //   Private helper methods
    // --------------------------------------------------------------
 
+   protected void assertTxIsStillValid(Transaction tx)
+   {
+      if (!TransactionTable.isActive(tx))
+      {
+         try
+         {
+            throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
+         }
+         catch (SystemException e)
+         {
+            throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; Unable to retrieve transaction status.");
+         }
+      }
+   }
 
    /**
     * Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
@@ -964,41 +726,42 @@
    private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx) throws Exception
    {
       GlobalTransaction gtx;
-      if (TransactionTable.isValid(tx) && transactions.put(tx, NULL) == null)
+
+      if (TransactionTable.isValid(tx) && transactions.add(tx))
       {
          gtx = txTable.getCurrentTransaction(tx, true);
+         TransactionEntry entry;
          if (ctx.getGlobalTransaction() == null)
          {
             ctx.setGlobalTransaction(gtx);
-            ctx.setTransactionEntry(txTable.get(gtx));
+            entry = txTable.get(gtx);
+            ctx.setTransactionEntry(entry);
          }
+         else
+         {
+            entry = ctx.getTransactionEntry();
+         }
          if (gtx.isRemote())
          {
             // should be no need to register a handler since this a remotely initiated globalTransaction
-            if (trace)
-            {
-               log.trace("is a remotely initiated gtx so no need to register a tx for it");
-            }
+            if (trace) log.trace("is a remotely initiated gtx so no need to register a tx for it");
          }
          else
          {
-            if (trace)
-            {
-               log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
-            }
+            if (trace) log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
+
             // see the comment in the LocalSyncHandler for the last isOriginLocal param.
-            LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, !ctx.isOriginLocal());
+            LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, entry, !ctx.isOriginLocal());
             registerHandler(tx, myHandler, ctx);
          }
       }
-      else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null)
+      else if ((gtx = rollbackTransactions.get(tx)) != null)
       {
          if (trace) log.trace("Transaction " + tx + " is already registered and is rolling back.");
       }
       else
       {
          if (trace) log.trace("Transaction " + tx + " is already registered.");
-
       }
       return gtx;
    }
@@ -1022,11 +785,11 @@
    }
 
    /**
-    * Replaces the global transaction in a method call with a new global transaction passed in.
+    * Replaces the global transaction in a VisitableCommand with a new global transaction passed in.
     */
-   private VisitableCommand replaceGtx(VisitableCommand m, final GlobalTransaction gtx) throws Throwable
+   private VisitableCommand replaceGtx(VisitableCommand command, final GlobalTransaction gtx) throws Throwable
    {
-      m.acceptVisitor(null, new AbstractVisitor()
+      command.acceptVisitor(null, new AbstractVisitor()
       {
          @Override
          public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
@@ -1091,7 +854,7 @@
             return null;
          }
       });
-      return m;
+      return command;
    }
 
    /**
@@ -1100,7 +863,7 @@
     * @return
     * @throws Exception
     */
-   private Transaction createLocalTx() throws Exception
+   protected Transaction createLocalTx() throws Exception
    {
       if (trace)
       {
@@ -1113,23 +876,6 @@
       return localTx;
    }
 
-   /**
-    * Creates a new local transaction for a given global transaction.
-    *
-    * @param gtx
-    * @return
-    * @throws Exception
-    */
-   private Transaction createLocalTxForGlobalTx(GlobalTransaction gtx, InvocationContext ctx) throws Exception
-   {
-      Transaction localTx = createLocalTx();
-      txTable.put(localTx, gtx);
-      // attach this to the context
-      ctx.setTransaction(localTx);
-      if (trace) log.trace("Created new tx for gtx " + gtx);
-      return localTx;
-   }
-
    // ------------------------------------------------------------------------
    // Synchronization classes
    // ------------------------------------------------------------------------
@@ -1144,26 +890,27 @@
       TransactionEntry entry = null;
       protected InvocationContext ctx; // the context for this call.
 
-      RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx)
+      RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionEntry entry)
       {
          this.gtx = gtx;
          this.tx = tx;
+         this.entry = entry;
       }
 
       public void beforeCompletion()
       {
          if (trace) log.trace("Running beforeCompletion on gtx " + gtx);
-         entry = txTable.get(gtx);
+
          if (entry == null)
          {
             log.error("Transaction has a null transaction entry - beforeCompletion() will fail.");
-            log.error("TxTable contents: " + txTable);
             throw new IllegalStateException("cannot find transaction entry for " + gtx);
          }
 
          modifications = entry.getModifications();
          ctx = invocationContextContainer.get();
-         ctx.setTransactionEntry(entry);
+         setTransactionalContext(tx, gtx, entry, ctx);
+
          if (ctx.isOptionsUninitialised() && entry.getOption() != null) ctx.setOptionOverrides(entry.getOption());
 
          assertCanContinue();
@@ -1179,21 +926,18 @@
          if (ctx == null)
          {
             ctx = invocationContextContainer.get();
-         }
+            setTransactionalContext(tx, gtx, entry, ctx);
 
-         entry = txTable.get(gtx);
-         ctx.setTransactionEntry(entry);
-
-         if (ctx.isOptionsUninitialised() && entry != null && entry.getOption() != null)
-         {
-            // use the options from the transaction entry instead
-            ctx.setOptionOverrides(entry.getOption());
+            if (ctx.isOptionsUninitialised() && entry != null && entry.getOption() != null)
+            {
+               // use the options from the transaction entry instead
+               ctx.setOptionOverrides(entry.getOption());
+            }
          }
 
          try
          {
             assertCanContinue();
-            setTransactionalContext(tx, gtx, ctx);
 
             try
             {
@@ -1220,11 +964,9 @@
             switch (status)
             {
                case Status.STATUS_COMMITTED:
-
-                  // if this is optimistic or sync repl
-                  boolean onePhaseCommit = !configuration.isNodeLockingOptimistic() && configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
+                  boolean onePhaseCommit = !configuration.getCacheMode().isSynchronous();
                   if (log.isDebugEnabled()) log.debug("Running commit phase.  One phase? " + onePhaseCommit);
-                  runCommitPhase(ctx, gtx, tx, modifications, cacheLoaderModifications, onePhaseCommit);
+                  runCommitPhase(ctx, gtx, modifications, cacheLoaderModifications, onePhaseCommit);
                   log.debug("Finished commit phase");
                   break;
                case Status.STATUS_UNKNOWN:
@@ -1240,12 +982,17 @@
                   throw new IllegalStateException("illegal status: " + status);
             }
          }
+         catch (Exception th)
+         {
+            log.trace("Caught exception ", th);
+
+         }
          finally
          {
             // clean up the tx table
             txTable.remove(gtx);
             txTable.remove(tx);
-            setTransactionalContext(null, null, ctx);
+            setTransactionalContext(null, null, null, ctx);
             cleanupInternalState();
          }
       }
@@ -1308,9 +1055,9 @@
        * @param tx
        * @param remoteLocal
        */
-      LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, boolean remoteLocal)
+      LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionEntry entry, boolean remoteLocal)
       {
-         super(gtx, tx);
+         super(gtx, tx, entry);
          this.remoteLocal = remoteLocal;
       }
 
@@ -1321,7 +1068,7 @@
          ctx.setOriginLocal(!remoteLocal); // this is the LOCAL sync handler after all!
          // fetch the modifications before the transaction is committed
          // (and thus removed from the txTable)
-         setTransactionalContext(tx, gtx, ctx);
+         setTransactionalContext(tx, gtx, entry, ctx);
          if (!entry.hasModifications())
          {
             if (trace) log.trace("No modifications in this tx.  Skipping beforeCompletion()");
@@ -1329,7 +1076,7 @@
             return;
          }
 
-         // set any transaction wide options as current for this thread.
+         // set any transaction wide options as current for this thread, caching original options that would then be reset
          originalOptions = ctx.getOptionOverrides();
          transactionalOptions = entry.getOption();
          ctx.setOptionOverrides(transactionalOptions);
@@ -1343,8 +1090,9 @@
                case Status.STATUS_PREPARING:
                   // run a prepare call.
                   modifications = compact(modifications);
-                  Object result = runPreparePhase(ctx, gtx, modifications);
 
+                  Object result = isOnePhaseCommit() ? null : runPreparePhase(ctx, gtx, modifications);
+
                   if (result instanceof Throwable)
                   {
                      if (log.isDebugEnabled())
@@ -1376,7 +1124,7 @@
          finally
          {
             localRollbackOnly = false;
-            setTransactionalContext(null, null, ctx);
+            setTransactionalContext(null, null, null, ctx);
             ctx.setOptionOverrides(originalOptions);
          }
       }
@@ -1387,9 +1135,8 @@
          // could happen if a rollback is called and beforeCompletion() doesn't get called.
          if (ctx == null) ctx = invocationContextContainer.get();
          ctx.setLocalRollbackOnly(localRollbackOnly);
+         setTransactionalContext(tx, gtx, entry, ctx);
          ctx.setOptionOverrides(transactionalOptions);
-         ctx.setTransaction(tx);
-         ctx.setGlobalTransaction(gtx);
          try
          {
             super.afterCompletion(status);

Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java	2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java	2008-05-13 14:10:18 UTC (rev 5834)
@@ -146,27 +146,18 @@
 
       mgr.resume(tx);
 
-      boolean fail = false;
-      try
-      {
-         mgr.commit();
-      }
-      catch (Exception e)
-      {
-         fail = true;
+      // one-phase-commits wont throw an exception on failure.
+      mgr.commit();
 
-      }
       assertNull(mgr.getTransaction());
       assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
       assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
 
-      assertEquals(true, fail);
-
       assertTrue(cache.exists(Fqn.fromString("/one/two")));
       assertNotNull(cache.getNode("/one"));
       assertEquals(false, cache.getRoot().getLock().isLocked());
-      assertEquals(false, ((NodeSPI<Object, Object>) cache.getNode("/one")).getLock().isLocked());
-      assertEquals(false, ((NodeSPI<Object, Object>) cache.getNode("/one/two")).getLock().isLocked());
+      assertEquals(false, cache.getNode("/one").getLock().isLocked());
+      assertEquals(false, cache.getNode("/one/two").getLock().isLocked());
       assertNotNull(cache.getNode("/one").getChild("two"));
       assertEquals(pojo2, cache.get(Fqn.fromString("/one/two"), "key1"));
 
@@ -350,18 +341,9 @@
 
       mgr.resume(tx);
 
-      boolean fail = false;
-      try
-      {
-         mgr.commit();
-      }
-      catch (Exception e)
-      {
-         fail = true;
+      // 1-pc commits wont throw an exception if there is a problem.
+      mgr.commit();
 
-      }
-
-      assertEquals(true, fail);
       assertNull(mgr.getTransaction());
       assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
       assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());




More information about the jbosscache-commits mailing list