[infinispan-commits] Infinispan SVN: r2323 - in branches/4.2.x/core/src: main/java/org/infinispan/context/impl and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Sun Sep 5 07:04:48 EDT 2010


Author: mircea.markus
Date: 2010-09-05 07:04:47 -0400 (Sun, 05 Sep 2010)
New Revision: 2323

Added:
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java
Modified:
   branches/4.2.x/core/src/main/java/org/infinispan/context/InvocationContext.java
   branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractInvocationContext.java
   branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
   branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java
   branches/4.2.x/core/src/main/java/org/infinispan/context/impl/NonTxInvocationContext.java
   branches/4.2.x/core/src/main/java/org/infinispan/context/impl/RemoteTxInvocationContext.java
   branches/4.2.x/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/CallInterceptor.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvocationContextInterceptor.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
   branches/4.2.x/core/src/test/java/org/infinispan/api/mvcc/LockTestBase.java
   branches/4.2.x/core/src/test/java/org/infinispan/test/TestingUtil.java
Log:
[ISPN-629] - document transaction's state if TimeoutException occurs

Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/InvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/InvocationContext.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/InvocationContext.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -60,11 +60,4 @@
 
    InvocationContext clone();
 
-   /**
-    * Retrieves a set of keys added to the context within the scope of the current ivocation up to the current point
-    * in time.  This is usually all of the keys added to the context, unless transactions are used in which case it is
-    * a subset of all the keys added to the context.
-    * @return a Set of keys, which may be an empty set.
-    */
-   Set<Object> getKeysAddedInCurrentInvocation();
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractInvocationContext.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractInvocationContext.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -3,14 +3,11 @@
 import org.infinispan.container.entries.CacheEntry;
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
-import org.infinispan.util.BidirectionalLinkedHashMap;
 import org.infinispan.util.BidirectionalMap;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -23,7 +20,6 @@
 public abstract class AbstractInvocationContext implements InvocationContext {
 
    protected volatile EnumSet<Flag> flags;
-   protected Set<Object> keysAddedInCurrentInvocation;
 
    // since this is finite, small, and strictly an internal API, it is cheaper/quicker to use bitmasking rather than
    // an EnumSet.
@@ -112,7 +108,6 @@
    public void reset() {
       flags = null;
       contextFlags = 0;
-      keysAddedInCurrentInvocation = null;
    }
 
    public boolean isFlagsUninitialized() {
@@ -148,28 +143,11 @@
       setContextFlag(ContextFlag.USE_FUTURE_RETURN_TYPE, useFutureReturnType);
    }
 
-   /**
-    * Records that a key has been added.  This method should be called by all implementations of {@link org.infinispan.context.InvocationContext#putLookedUpEntry(Object, org.infinispan.container.entries.CacheEntry)}
-    * and {@link org.infinispan.context.InvocationContext#putLookedUpEntries(java.util.Map)} for each key added.
-    *
-    * @param key key to record
-    */
-   protected void keyAddedInCurrentInvocation(Object key) {
-      if (keysAddedInCurrentInvocation == null) keysAddedInCurrentInvocation = new HashSet<Object>(4);
-      keysAddedInCurrentInvocation.add(key);
-   }
-
-   public Set<Object> getKeysAddedInCurrentInvocation() {
-      if (keysAddedInCurrentInvocation == null) return Collections.emptySet();
-      return keysAddedInCurrentInvocation;
-   }
-
    @Override
    public AbstractInvocationContext clone() {
       try {
          AbstractInvocationContext dolly = (AbstractInvocationContext) super.clone();
          if (flags != null) dolly.flags = flags.clone();
-         dolly.keysAddedInCurrentInvocation = null;
          return dolly;
       } catch (CloneNotSupportedException e) {
          throw new IllegalStateException("Impossible!");

Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -28,10 +28,6 @@
       return affectedKeys == null ? Collections.EMPTY_SET : affectedKeys;
    }
 
-   public boolean isValidRunningTx() {
-      return isValid(getRunningTransaction());
-   }
-
    public void addAffectedKeys(Object... keys) {
       if (affectedKeys == null) {
          affectedKeys = new HashSet<Object>();
@@ -39,60 +35,6 @@
       affectedKeys.addAll(Arrays.asList(keys));
    }
 
-
-   /**
-    * Return s true of tx's status is ACTIVE or PREPARING
-    *
-    * @param tx
-    * @return true if the tx is active or preparing
-    */
-   public static boolean isValid(Transaction tx) {
-      return isActive(tx) || isPreparing(tx);
-   }
-
-   /**
-    * Returns true if transaction is ACTIVE, false otherwise
-    */
-   public static boolean isActive(Transaction tx) {
-      if (tx == null) return false;
-      int status;
-      try {
-         status = tx.getStatus();
-         return status == Status.STATUS_ACTIVE;
-      }
-      catch (SystemException e) {
-         return false;
-      }
-   }
-
-   /**
-    * Returns true if transaction is PREPARING, false otherwise
-    */
-   public static boolean isPreparing(Transaction tx) {
-      if (tx == null) return false;
-      int status;
-      try {
-         status = tx.getStatus();
-         return status == Status.STATUS_PREPARING;
-      }
-      catch (SystemException e) {
-         return false;
-      }
-   }
-
-   /**
-    * Tests whether the caller is in a valid transaction.  If not, will throw a CacheException.
-    */
-   public static void assertTransactionValid(TxInvocationContext ctx) {
-      Transaction tx = ctx.getRunningTransaction();
-      if (!isValid(tx)) try {
-         throw new CacheException("Invalid transaction " + tx + ", status = " + (tx == null ? null : tx.getStatus()));
-      }
-      catch (SystemException e) {
-         throw new CacheException("Exception trying to analyse status of transaction " + tx, e);
-      }
-   }
-
    @Override
    public AbstractTxInvocationContext clone() {
       AbstractTxInvocationContext dolly = (AbstractTxInvocationContext) super.clone();

Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -57,13 +57,11 @@
    }
 
    public void putLookedUpEntry(Object key, CacheEntry e) {
-      keyAddedInCurrentInvocation(key);
       xaAdapter.putLookedUpEntry(key, e);
    }
 
    public void putLookedUpEntries(Map<Object, CacheEntry> lookedUpEntries) {
       for (Map.Entry<Object, CacheEntry> ce: lookedUpEntries.entrySet()) {
-         keyAddedInCurrentInvocation(ce.getKey());
          xaAdapter.putLookedUpEntry(ce.getKey(), ce.getValue());
       }
    }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/NonTxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/NonTxInvocationContext.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/NonTxInvocationContext.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -27,14 +27,12 @@
 
    public void putLookedUpEntry(Object key, CacheEntry e) {
       initLookedUpEntries();
-      keyAddedInCurrentInvocation(key);
       lookedUpEntries.put(key, e);
    }
 
    public void putLookedUpEntries(Map<Object, CacheEntry> lookedUpEntries) {
       initLookedUpEntries();
       for (Map.Entry<Object, CacheEntry> ce: lookedUpEntries.entrySet()) {
-         keyAddedInCurrentInvocation(ce.getKey());
          lookedUpEntries.put(ce.getKey(), ce.getValue());
       }
    }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/RemoteTxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/RemoteTxInvocationContext.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/RemoteTxInvocationContext.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -61,7 +61,6 @@
    }
 
    public void putLookedUpEntry(Object key, CacheEntry e) {
-      keyAddedInCurrentInvocation(key);
       remoteTransaction.putLookedUpEntry(key, e);
    }
 
@@ -75,7 +74,6 @@
 
    public void putLookedUpEntries(Map<Object, CacheEntry> lookedUpEntries) {
       for (Map.Entry<Object, CacheEntry> ce: lookedUpEntries.entrySet()) {
-         keyAddedInCurrentInvocation(ce.getKey());
          remoteTransaction.putLookedUpEntry(ce.getKey(), ce.getValue());
       }
    }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -46,11 +46,6 @@
    Transaction getRunningTransaction();
 
    /**
-    * Returns true if the current tx is either ACTIVE or PREPARING, false otherwise.
-    */
-   boolean isValidRunningTx();
-
-   /**
     * Registers a new participant with the transaction.
     */
    void addAffectedKeys(Object... keys);

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/CallInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/CallInterceptor.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/CallInterceptor.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -37,6 +37,7 @@
  * transaction.
  *
  * @author Bela Ban
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 public class CallInterceptor extends CommandInterceptor {
@@ -67,19 +68,6 @@
    @Override
    final public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
       if (trace) log.trace("Executing command: " + command + ".");
-      Object retval;
-      try {
-         retval = command.perform(ctx);
-      }
-      catch (Throwable t) {
-         if (ctx.isInTxScope()) {
-            TxInvocationContext txContext = (TxInvocationContext) ctx;
-            if (txContext.isValidRunningTx()) {
-               txContext.getRunningTransaction().setRollbackOnly();
-            }
-         }
-         throw t;
-      }
-      return retval;
+      return command.perform(ctx);
    }
 }
\ No newline at end of file

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -52,6 +52,7 @@
 
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
+import java.sql.ResultSet;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -70,6 +71,7 @@
  *
  * @author Manik Surtani
  * @author Galder Zamarreño
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 @MBean(objectName = "Invalidation", description = "Component responsible for invalidating entries on remote caches when entries are written to locally.")
@@ -164,12 +166,6 @@
             }
             catch (Throwable t) {
                log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
-               try {
-                  tx.setRollbackOnly();
-               }
-               catch (SystemException se) {
-                  throw new RuntimeException("setting tx rollback failed ", se);
-               }
                if (t instanceof RuntimeException)
                   throw (RuntimeException) t;
                else
@@ -198,6 +194,12 @@
          result.add(command.getKey());
          return null;
       }
+
+      @Override
+      public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
+         result.addAll(command.getAffectedKeys());
+         return null;
+      }
    }
 
 

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvocationContextInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvocationContextInterceptor.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvocationContextInterceptor.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -22,28 +22,21 @@
 package org.infinispan.interceptors;
 
 
-import org.infinispan.CacheException;
 import org.infinispan.commands.VisitableCommand;
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
-import org.infinispan.factories.annotations.Inject;
 import org.infinispan.interceptors.base.CommandInterceptor;
-import org.infinispan.util.concurrent.locks.LockManager;
 
+/**
+ * @author Mircea.Markus at jboss.com
+ */
 public class InvocationContextInterceptor extends CommandInterceptor {
 
-   LockManager lockManager;
-
    @Override
    public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
       return handleAll(ctx, command);
    }
 
-   @Inject
-   public void injectLockManager(LockManager lockManager) {
-      this.lockManager = lockManager;
-   }
-
    private Object handleAll(InvocationContext ctx, VisitableCommand command) throws Throwable {
       boolean suppressExceptions = false;
 
@@ -58,26 +51,11 @@
          return invokeNextInterceptor(ctx, command);
       }
       catch (Throwable th) {
-         // make sure we release locks for all keys locked in this invocation!
-         for (Object key: ctx.getKeysAddedInCurrentInvocation()) {
-            if (ctx.hasLockedKey(key)) {
-               if (suppressExceptions) {
-                  if (log.isDebugEnabled()) log.debug("Caught exception, Releasing lock on key " + key + " acquired during the current invocation!");
-               } else {
-                  if (log.isInfoEnabled()) log.info("Caught exception, Releasing lock on key " + key + " acquired during the current invocation!");
-               }
-               // unlock key!
-               lockManager.unlock(key);
-               ctx.removeLookedUpEntry(key);
-            }
-         }
-
          if (suppressExceptions) {
             log.trace("Exception while executing code, failing silently...", th);
             return null;
          } else {
             log.error("Execution error: ", th);
-
             throw th;
          }
       } finally {

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -58,6 +58,7 @@
  * Interceptor to implement <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a> functionality.
  *
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @author Mircea.Markus at jboss.com
  * @see <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC designs</a>
  * @since 4.0
  */
@@ -138,17 +139,21 @@
          if (localTxScope) {
             c.attachGlobalTransaction((GlobalTransaction) ctx.getLockOwner());
          }
-         for (Object key : c.getKeys()) {
-            if (c.isImplicit() && localTxScope && !lockManager.ownsLock(key, ctx.getLockOwner())) {
-               //if even one key is unlocked we need to invoke this lock command cluster wide... 
-               shouldInvokeOnCluster = true;
+         try {
+            for (Object key : c.getKeys()) {
+               if (c.isImplicit() && localTxScope && !lockManager.ownsLock(key, ctx.getLockOwner())) {
+                  //if even one key is unlocked we need to invoke this lock command cluster wide...
+                  shouldInvokeOnCluster = true;
+               }
+               entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
             }
-            entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
+            if (shouldInvokeOnCluster || c.isExplicit())
+               return invokeNextInterceptor(ctx, c);
+            else
+               return null;
+         } catch (Throwable te) {
+            return cleanLocksAndRethrow(ctx, te);
          }
-         if (shouldInvokeOnCluster || c.isExplicit())
-            return invokeNextInterceptor(ctx, c);
-         else
-            return null;
       } finally {
          if (ctx.isInTxScope()) {
             doAfterCall(ctx);
@@ -167,6 +172,8 @@
          for (InternalCacheEntry entry : dataContainer.entrySet())
             entryFactory.wrapEntryForWriting(ctx, entry, false, false, false, false, false);
          return invokeNextInterceptor(ctx, command);
+      } catch (Throwable te) {
+         return cleanLocksAndRethrow(ctx, te);
       } finally {
          doAfterCall(ctx);
       }
@@ -183,10 +190,14 @@
    public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
       try {
          if (command.getKeys() != null) {
-            for (Object key : command.getKeys()) entryFactory.wrapEntryForWriting(ctx, key, false, true, false, false, false);
+            for (Object key : command.getKeys())
+               entryFactory.wrapEntryForWriting(ctx, key, false, true, false, false, false);
          }
          return invokeNextInterceptor(ctx, command);
-      } finally {
+      } catch (Throwable te) {
+         return cleanLocksAndRethrow(ctx, te);
+      }
+      finally {
          doAfterCall(ctx);
       }
    }
@@ -196,7 +207,10 @@
       try {
          entryFactory.wrapEntryForWriting(ctx, command.getKey(), true, false, false, false, !command.isPutIfAbsent());
          return invokeNextInterceptor(ctx, command);
-      } finally {
+      } catch (Throwable te) {
+         return cleanLocksAndRethrow(ctx, te);
+      }
+      finally {
          doAfterCall(ctx);
       }
    }
@@ -208,6 +222,8 @@
             entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, true);
          }
          return invokeNextInterceptor(ctx, command);
+      } catch (Throwable te) {
+         return cleanLocksAndRethrow(ctx, te);
       }
       finally {
          doAfterCall(ctx);
@@ -219,6 +235,8 @@
       try {
          entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true, false, true, false);
          return invokeNextInterceptor(ctx, command);
+      } catch (Throwable te) {
+         return cleanLocksAndRethrow(ctx, te);
       }
       finally {
          doAfterCall(ctx);
@@ -230,6 +248,8 @@
       try {
          entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true, false, false, false);
          return invokeNextInterceptor(ctx, command);
+      } catch (Throwable te) {
+         return cleanLocksAndRethrow(ctx, te);
       }
       finally {
          doAfterCall(ctx);
@@ -294,6 +314,11 @@
       }
    }
 
+   private Object cleanLocksAndRethrow(InvocationContext ctx, Throwable te) throws Throwable {
+      cleanupLocks(ctx, false);
+      throw te;
+   }
+
    protected void commitEntry(CacheEntry entry) {
       entry.commit(dataContainer);
    }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -1,5 +1,6 @@
 package org.infinispan.interceptors;
 
+import org.infinispan.CacheException;
 import org.infinispan.commands.VisitableCommand;
 import org.infinispan.commands.control.LockControlCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
@@ -112,21 +113,13 @@
 
    @Override
    public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
-      return enlistReadAndInvokeNext(ctx, command);
+      try {
+         return enlistReadAndInvokeNext(ctx, command);
+      } catch (Throwable t) {
+         return markTxForRollbackAndRethrow(ctx, t);
+      }
    }
 
-   /**
-    * Designed to be overridden.  Returns a VisitableCommand fit for replaying locally, based on the modification passed
-    * in.  If a null value is returned, this means that the command should not be replayed.
-    *
-    * @param modification modification in a prepare call
-    * @return a VisitableCommand representing this modification, fit for replaying, or null if the command should not be
-    *         replayed.
-    */
-   protected VisitableCommand getCommandToReplay(VisitableCommand modification) {
-      return modification;
-   }
-
    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
       return enlistWriteAndInvokeNext(ctx, command);
@@ -180,7 +173,8 @@
          if (localModeNotForced(ctx)) shouldAddMod = true;
          localTxContext.setXaCache(xaAdapter);
       }
-      Object rv = invokeNextInterceptor(ctx, command);
+      Object rv;
+      rv = invokeNextAndRollbackTxOnFailure(ctx, command);
       if (!ctx.isInTxScope())
          transactionLog.logNoTxWrite(command);
       if (command.isSuccessful() && shouldAddMod) xaAdapter.addModification(command);
@@ -247,4 +241,48 @@
    public long getRollbacks() {
       return rollbacks.get();
    }
+
+   /**
+    * Designed to be overridden.  Returns a VisitableCommand fit for replaying locally, based on the modification passed
+    * in.  If a null value is returned, this means that the command should not be replayed.
+    *
+    * @param modification modification in a prepare call
+    * @return a VisitableCommand representing this modification, fit for replaying, or null if the command should not be
+    *         replayed.
+    */
+   protected VisitableCommand getCommandToReplay(VisitableCommand modification) {
+      return modification;
+   }
+
+   private Object markTxForRollbackAndRethrow(InvocationContext ctx, Throwable te) throws Throwable {
+      if (ctx.isOriginLocal() && ctx.isInTxScope()) {
+         Transaction transaction = tm.getTransaction();
+         if (transaction != null && isValidRunningTx(transaction)) {
+            transaction.setRollbackOnly();
+         }
+      }
+      throw te;
+   }
+
+   private Object invokeNextAndRollbackTxOnFailure(InvocationContext ctx, WriteCommand command) throws Throwable {
+      Object rv;
+      try {
+         rv = invokeNextInterceptor(ctx, command);
+      } catch (Throwable te) {
+         markTxForRollbackAndRethrow(ctx, te);
+         throw new IllegalStateException("This should not be reached");
+      }
+      return rv;
+   }
+
+   public boolean isValidRunningTx(Transaction tx) throws Exception {
+      int status;
+      try {
+         status = tx.getStatus();
+      }
+      catch (SystemException e) {
+         throw new CacheException("Unexpected!", e);
+      }
+      return status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARING;
+   }
 }

Modified: branches/4.2.x/core/src/test/java/org/infinispan/api/mvcc/LockTestBase.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/api/mvcc/LockTestBase.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/test/java/org/infinispan/api/mvcc/LockTestBase.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -270,7 +270,7 @@
       catch (TimeoutException expected) {
 //         expected.printStackTrace();  // for debugging
       }
-      tm.commit();
+      tm.rollback();
       tm.resume(t1);
       tm.commit();
       assertNoLocks();

Modified: branches/4.2.x/core/src/test/java/org/infinispan/test/TestingUtil.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/test/TestingUtil.java	2010-09-02 16:01:52 UTC (rev 2322)
+++ branches/4.2.x/core/src/test/java/org/infinispan/test/TestingUtil.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -30,6 +30,7 @@
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.transport.Transport;
 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
+import org.infinispan.transaction.xa.TransactionTable;
 import org.infinispan.util.concurrent.locks.LockManager;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -827,4 +828,8 @@
    public static Object v(Method method) {
       return v(method, "");
    }
+
+   public static TransactionTable getTransactionTable(Cache<Object, Object> cache) {
+      return cache.getAdvancedCache().getComponentRegistry().getComponent(TransactionTable.class);
+   }
 }

Added: branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -0,0 +1,139 @@
+package org.infinispan.tx.timeout;
+
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.control.LockControlCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.testng.annotations.Test;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "tx.timeout.EagerLockingAndTimeoutTest")
+public class ExplicitLockingAndTimeoutTest extends MultipleCacheManagersTest {
+
+   private LockManager lm1;
+   private LockManager lm0;
+   private TransactionTable txTable0;
+   private TransactionTable txTable1;
+   private TransactionManager tm;
+   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration defaultConfig = getDefaultConfig();
+      defaultConfig.setLockAcquisitionTimeout(500);
+      defaultConfig.setUseLockStriping(false);
+      addClusterEnabledCacheManager(defaultConfig);
+      addClusterEnabledCacheManager(defaultConfig);
+      lm0 = TestingUtil.extractLockManager(cache(0));
+      lm1 = TestingUtil.extractLockManager(cache(1));
+      txTable0 = TestingUtil.getTransactionTable(cache(0));
+      txTable1 = TestingUtil.getTransactionTable(cache(1));
+      tm = cache(0).getAdvancedCache().getTransactionManager();
+      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
+      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
+   }
+
+   protected Configuration getDefaultConfig() {
+      return getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+   }
+
+   public void testExplicitLockingRemoteTimeout() throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
+      txStatus.reset();
+      tm.begin();
+      cache(1).put("k1", "v1");
+      Transaction k1LockOwner = tm.suspend();
+      assert lm1.isLocked("k1");
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      tm.begin();
+      cache(0).getAdvancedCache().lock("k2");
+      assert lm0.isLocked("k2");
+      assert lm1.isLocked("k2");
+
+      try {
+         cache(0).getAdvancedCache().lock("k1");
+         assert false;
+      } catch (TimeoutException e) {
+         //expected
+      }
+
+      assert txStatus.teReceived;
+      assert txStatus.isTxInTableAfterTeOnEagerLocking;
+      //expect 1 as k1 is locked by the other tx
+      assertEquals(lm1.isLocked("k2"), false, "Even though rollback was not received yet lock on k2, which was acquired, is no longer held");
+      assert tm.getStatus() == Status.STATUS_MARKED_ROLLBACK;
+
+      assertEquals(1, txTable0.getLocalTxCount());
+      assertEquals(1, txTable1.getLocalTxCount());
+      assertEquals(1, txTable1.getRemoteTxCount());
+
+      tm.rollback();
+      assertEquals(0, txTable0.getLocalTxCount());
+      assertEquals(1, txTable1.getLocalTxCount());
+      assertEquals(0, txTable1.getRemoteTxCount());
+
+
+      tm.resume(k1LockOwner);
+      tm.commit();
+      assertEquals("v1", cache(0).get("k1"));
+      assertEquals("v1", cache(1).get("k1"));
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, txTable0.getLocalTxCount());
+      assertEquals(0, lm0.getNumberOfLocksHeld());
+      assertEquals(0, lm1.getNumberOfLocksHeld());
+   }
+
+   private class TxStatusInterceptor extends CommandInterceptor {
+
+      private boolean teReceived;
+
+      private boolean isTxInTableAfterTeOnEagerLocking;
+
+      private int numLocksAfterTeOnEagerLocking;
+
+      @Override
+      public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
+         try {
+            return invokeNextInterceptor(ctx, command);
+         } catch (TimeoutException te) {
+            teReceived = true;
+            isTxInTableAfterTeOnEagerLocking = txTable1.containRemoteTx(ctx.getGlobalTransaction());
+            numLocksAfterTeOnEagerLocking = lm1.getNumberOfLocksHeld();
+            throw te;
+         }
+      }
+
+      @Override
+      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+         return super.handleDefault(ctx, command);
+      }
+
+      public void reset() {
+         this.teReceived = false;
+         this.isTxInTableAfterTeOnEagerLocking = false;
+         this.numLocksAfterTeOnEagerLocking = -1;
+      }
+   }
+}
\ No newline at end of file

Added: branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -0,0 +1,174 @@
+package org.infinispan.tx.timeout;
+
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test (testName = "tx.timeout.TxAndInvalidationTimeoutTest", groups = "functional")
+public class TxAndInvalidationTimeoutTest extends MultipleCacheManagersTest {
+
+   private static Log log = LogFactory.getLog(TxAndInvalidationTimeoutTest.class);
+
+   private LockManager lm1;
+   private LockManager lm0;
+   private TransactionTable txTable0;
+   private TransactionTable txTable1;
+   private TransactionManager tm;
+   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration defaultConfig = getDefaultConfig();
+      defaultConfig.setLockAcquisitionTimeout(500);
+      defaultConfig.setUseLockStriping(false);
+      addClusterEnabledCacheManager(defaultConfig);
+      addClusterEnabledCacheManager(defaultConfig);
+      lm0 = TestingUtil.extractLockManager(cache(0));
+      lm1 = TestingUtil.extractLockManager(cache(1));
+      txTable0 = TestingUtil.getTransactionTable(cache(0));
+      txTable1 = TestingUtil.getTransactionTable(cache(1));
+      tm = cache(0).getAdvancedCache().getTransactionManager();
+      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
+      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
+   }
+
+   protected Configuration getDefaultConfig() {
+      return getDefaultClusteredConfig(Configuration.CacheMode.INVALIDATION_SYNC, true);
+   }
+
+   public void testPutTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).put("k1", "v2222");
+         }
+      });
+   }
+
+   public void testRemoveTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).remove("k1");
+         }
+      });
+   }
+
+   public void testPutAllTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            Map toAdd = new HashMap();
+            toAdd.put("k1", "v22222");
+            cache(0).putAll(toAdd);
+         }
+      });
+   }
+
+
+   private void runAssertion(CacheOperation operation) throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
+      txStatus.reset();
+      tm.begin();
+      cache(1).put("k1", "v1");
+      Transaction k1LockOwner = tm.suspend();
+      assert lm1.isLocked("k1");
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      tm.begin();
+      cache(0).put("k2", "v2");
+      assert lm0.isLocked("k2");
+      assert !lm1.isLocked("k2");
+
+      operation.execute();
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      assertEquals(1, txTable0.getLocalTxCount());
+
+
+      log.trace("Before commit");
+
+      try {
+         tm.commit();
+         assert false;
+      } catch (RollbackException re) {
+         //expected
+      }
+      assert txStatus.teReceived;
+      //expect 1 as k1 is locked by the other tx
+      assertEquals(txStatus.numLocksAfterTeOnInvalidate, 1, "This would make sure that locks are being released quickly, not waiting for a remote rollback to happen");
+
+      assertEquals(0, txTable0.getLocalTxCount());
+      assertEquals(1, txTable1.getLocalTxCount());
+
+      log.trace("Right before second commit");
+      tm.resume(k1LockOwner);
+      tm.commit();
+      assertEquals(null, cache(0).get("k1"));
+      assertEquals("v1", cache(1).get("k1"));
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, lm0.getNumberOfLocksHeld());
+      assertEquals(0, lm1.getNumberOfLocksHeld());
+   }
+
+   private class TxStatusInterceptor extends CommandInterceptor {
+
+      private boolean teReceived;
+
+      private int numLocksAfterTeOnInvalidate;
+
+      @Override
+      public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand invalidateCommand) throws Throwable {
+         try {
+            return invokeNextInterceptor(ctx, invalidateCommand);
+         } catch (TimeoutException te) {
+            numLocksAfterTeOnInvalidate = lm1.getNumberOfLocksHeld();
+            teReceived = true;
+            throw te;
+         }
+      }
+
+      @Override
+      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+         return super.handleDefault(ctx, command);
+      }
+
+      public void reset() {
+         this.teReceived = false;
+         this.numLocksAfterTeOnInvalidate = -1;
+      }
+   }
+
+   public interface CacheOperation {
+
+      public abstract void execute();
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -0,0 +1,202 @@
+package org.infinispan.tx.timeout;
+
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tester for https://jira.jboss.org/browse/ISPN-629.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "tx.TestTxAndRemoteTimeoutException")
+public class TxAndRemoteTimeoutExceptionTest extends MultipleCacheManagersTest {
+
+   private static Log log = LogFactory.getLog(TxAndRemoteTimeoutExceptionTest.class);
+
+   private LockManager lm1;
+   private LockManager lm0;
+   private TransactionTable txTable0;
+   private TransactionTable txTable1;
+   private TransactionManager tm;
+   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration defaultConfig = getDefaultConfig();
+      defaultConfig.setLockAcquisitionTimeout(500);
+      defaultConfig.setUseLockStriping(false);
+      addClusterEnabledCacheManager(defaultConfig);
+      addClusterEnabledCacheManager(defaultConfig);
+      lm0 = TestingUtil.extractLockManager(cache(0));
+      lm1 = TestingUtil.extractLockManager(cache(1));
+      txTable0 = TestingUtil.getTransactionTable(cache(0));
+      txTable1 = TestingUtil.getTransactionTable(cache(1));
+      tm = cache(0).getAdvancedCache().getTransactionManager();
+      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
+      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
+   }
+
+   protected Configuration getDefaultConfig() {
+      return getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+   }
+
+   public void testClearTimeoutsInTx() throws Exception {
+      cache(0).put("k1", "value");
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).clear();
+         }
+      });
+   }
+
+   public void testPutTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).put("k1", "v2222");
+         }
+      });
+   }
+
+   public void testRemoveTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).remove("k1");
+         }
+      });
+   }
+
+   public void testReplaceTimeoutsInTx() throws Exception {
+      cache(1).put("k1", "value");
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).replace("k1", "newValue");
+         }
+      });
+   }
+
+   public void testPutAllTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            Map toAdd = new HashMap();
+            toAdd.put("k1", "v22222");
+            cache(0).putAll(toAdd);
+         }
+      });
+   }
+
+
+   private void runAssertion(CacheOperation operation) throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
+      txStatus.reset();
+      tm.begin();
+      cache(1).put("k1", "v1");
+      Transaction k1LockOwner = tm.suspend();
+      assert lm1.isLocked("k1");
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      tm.begin();
+      cache(0).put("k2", "v2");
+      assert lm0.isLocked("k2");
+      assert !lm1.isLocked("k2");
+
+      operation.execute();
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      assertEquals(1, txTable0.getLocalTxCount());
+
+
+      try {
+         tm.commit();
+         assert false;
+      } catch (RollbackException re) {
+         //expected
+      }
+      assert txStatus.teReceived;
+      assert txStatus.isTxInTableAfterTeOnPrepare;
+      //expect 1 as k1 is locked by the other tx
+      assertEquals(txStatus.numLocksAfterTeOnPrepare, 1, "This would make sure that locks are being released quickly, not waiting for a remote rollback to happen");
+
+      assertEquals(0, txTable0.getLocalTxCount());
+      assertEquals(1, txTable1.getLocalTxCount());
+
+      log.trace("Right before second commit");
+      tm.resume(k1LockOwner);
+      tm.commit();
+      assertEquals("v1", cache(0).get("k1"));
+      assertEquals("v1", cache(1).get("k1"));
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, lm0.getNumberOfLocksHeld());
+      assertEquals(0, lm1.getNumberOfLocksHeld());
+   }
+
+   private class TxStatusInterceptor extends CommandInterceptor {
+
+      private boolean teReceived;
+
+      private boolean isTxInTableAfterTeOnPrepare;
+
+      private int numLocksAfterTeOnPrepare;
+
+      @Override
+      public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
+         try {
+            return invokeNextInterceptor(ctx, command);
+         } catch (TimeoutException te) {
+            numLocksAfterTeOnPrepare = lm1.getNumberOfLocksHeld();
+            isTxInTableAfterTeOnPrepare = txTable1.containRemoteTx(ctx.getGlobalTransaction());
+            teReceived = true;
+            throw te;
+         }
+      }
+
+      @Override
+      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+         return super.handleDefault(ctx, command);
+      }
+
+      public void reset() {
+         this.teReceived = false;
+         this.isTxInTableAfterTeOnPrepare = false;
+         this.numLocksAfterTeOnPrepare = -1;
+      }
+   }
+
+   public interface CacheOperation {
+
+      public abstract void execute();
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java	2010-09-05 11:04:47 UTC (rev 2323)
@@ -0,0 +1,143 @@
+package org.infinispan.tx.timeout;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.testng.annotations.Test;
+
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tester for https://jira.jboss.org/browse/ISPN-629.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(testName = "tx.TxAndTimeoutExceptionTest", groups = "functional")
+public class TxAndTimeoutExceptionTest extends SingleCacheManagerTest {
+
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      Configuration config = getDefaultStandaloneConfig(true);
+      config.setUseLockStriping(false);
+      config.setLockAcquisitionTimeout(1000);
+      EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(config, true);
+      cache = cm.getCache();
+      return cm;
+   }
+
+
+   public void testPutTimeoutsInTx() throws Exception {
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache.put("k1", "v2222");
+         }
+      });
+   }
+
+   public void testRemoveTimeoutsInTx() throws Exception {
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache.remove("k1");
+         }
+      });
+   }
+
+   public void testClearTimeoutsInTx() throws Exception {
+      cache.put("k1", "value");
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache.clear();
+         }
+      });
+   }
+
+   public void testReplaceTimeoutsInTx() throws Exception {
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache.replace("k1", "newValue");
+         }
+      });
+   }
+
+   public void testPutAllTimeoutsInTx() throws Exception {
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            Map toAdd = new HashMap();
+            toAdd.put("k1", "v22222");
+            cache.putAll(toAdd);
+         }
+      });
+   }
+
+   private void assertExpectedBehavior(CacheOperation op) throws Exception {
+      LockManager lm = TestingUtil.extractLockManager(cache);
+      TransactionTable txTable = TestingUtil.getTransactionTable(cache);
+      TransactionManager tm = cache.getAdvancedCache().getTransactionManager();
+      tm.begin();
+      cache.put("k1", "v1");
+      Transaction k1LockOwner = tm.suspend();
+      assert lm.isLocked("k1");
+
+      assertEquals(1, txTable.getLocalTxCount());
+      tm.begin();
+      cache.put("k2", "v2");
+      assert lm.isLocked("k2");
+      assertEquals(2, txTable.getLocalTxCount());
+      try {
+         op.execute();
+         assert false : "Timeout exception expected";
+      } catch (TimeoutException e) {
+         //expected
+      }
+
+      //make sure that locks acquired by that tx were released even before the transaction is rolled back, the tx object
+      //was marked for rollback 
+      Transaction transaction = tm.getTransaction();
+      assert transaction != null;
+      assert transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK;
+      assert !lm.isLocked("k2");
+      assert lm.isLocked("k1");
+      try {
+         cache.put("k3", "v3");
+         assert false;
+      } catch (IllegalStateException e) {
+         //expected
+      }
+      assertEquals(txTable.getLocalTxCount(), 2);
+
+      //now the TM is expected to rollback the tx
+      tm.rollback();
+      assertEquals(txTable.getLocalTxCount(), 1);
+
+      tm.resume(k1LockOwner);
+      tm.commit();
+
+      //now test that the other tx works as expected
+      assertEquals(0, txTable.getLocalTxCount());
+      assertEquals(cache.get("k1"), "v1");
+      assert !lm.isLocked("k1");
+      assertEquals(txTable.getLocalTxCount(), 0);
+   }
+
+   public interface CacheOperation {
+
+      public abstract void execute();
+   }
+}



More information about the infinispan-commits mailing list