[infinispan-commits] Infinispan SVN: r2383 - in branches/4.2.x/core/src: main/java/org/infinispan/interceptors and 6 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Sep 15 11:59:39 EDT 2010


Author: mircea.markus
Date: 2010-09-15 11:59:38 -0400 (Wed, 15 Sep 2010)
New Revision: 2383

Added:
   branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/
   branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/CustomInterceptorException.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/ExplicitLockingAndTimeoutTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/ReplicationExceptionTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndInvalidationTimeoutTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndRemoteTimeoutExceptionTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndTimeoutExceptionTest.java
Removed:
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java
Modified:
   branches/4.2.x/core/src/main/java/org/infinispan/CacheException.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvocationContextInterceptor.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
   branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/TimeoutException.java
   branches/4.2.x/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java
   branches/4.2.x/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java
Log:
[ISPN-629] - document transaction's state if an Exception occurs
- added more test

Modified: branches/4.2.x/core/src/main/java/org/infinispan/CacheException.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/CacheException.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/main/java/org/infinispan/CacheException.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -26,9 +26,13 @@
  * <p/>
  * Specific subclasses such as {@link org.infinispan.util.concurrent.TimeoutException} and {@link
  * org.infinispan.config.ConfigurationException} have more specific uses.
+ * <p/>
+ * Transactions: if a CacheException (including any subclasses) is thrown for an operation on a JTA transaction, then
+ * the transaction is marked for rollback. 
  *
  * @author <a href="mailto:bela at jboss.org">Bela Ban</a>
  * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 public class CacheException extends RuntimeException {

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvocationContextInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvocationContextInterceptor.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvocationContextInterceptor.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -22,16 +22,30 @@
 package org.infinispan.interceptors;
 
 
+import org.infinispan.CacheException;
 import org.infinispan.commands.VisitableCommand;
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.annotations.Inject;
 import org.infinispan.interceptors.base.CommandInterceptor;
 
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
 /**
  * @author Mircea.Markus at jboss.com
  */
 public class InvocationContextInterceptor extends CommandInterceptor {
 
+   private TransactionManager tm;
+
+   @Inject
+   public void init(TransactionManager tm) {
+      this.tm = tm;
+   }
+
    @Override
    public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
       return handleAll(ctx, command);
@@ -55,6 +69,11 @@
             log.trace("Exception while executing code, failing silently...", th);
             return null;
          } else {
+            if (ctx.isInTxScope() && ctx.isOriginLocal()) {
+               if (trace) log.trace("Transaction marked for rollback as exception was received.");
+               markTxForRollbackAndRethrow(ctx, th);
+               throw new IllegalStateException("This should not be reached");
+            }
             log.error("Execution error: ", th);
             throw th;
          }
@@ -62,4 +81,25 @@
          ctx.reset();
       }
    }
+
+   private Object markTxForRollbackAndRethrow(InvocationContext ctx, Throwable te) throws Throwable {
+      if (ctx.isOriginLocal() && ctx.isInTxScope()) {
+         Transaction transaction = tm.getTransaction();
+         if (transaction != null && isValidRunningTx(transaction)) {
+            transaction.setRollbackOnly();
+         }
+      }
+      throw te;
+   }   
+
+   public boolean isValidRunningTx(Transaction tx) throws Exception {
+      int status;
+      try {
+         status = tx.getStatus();
+      }
+      catch (SystemException e) {
+         throw new CacheException("Unexpected!", e);
+      }
+      return status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARING;
+   }
 }
\ No newline at end of file

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -1,6 +1,5 @@
 package org.infinispan.interceptors;
 
-import org.infinispan.CacheException;
 import org.infinispan.commands.VisitableCommand;
 import org.infinispan.commands.control.LockControlCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
@@ -113,11 +112,7 @@
 
    @Override
    public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
-      try {
-         return enlistReadAndInvokeNext(ctx, command);
-      } catch (Throwable t) {
-         return markTxForRollbackAndRethrow(ctx, t);
-      }
+       return enlistReadAndInvokeNext(ctx, command);
    }
 
    @Override
@@ -174,7 +169,7 @@
          localTxContext.setXaCache(xaAdapter);
       }
       Object rv;
-      rv = invokeNextAndRollbackTxOnFailure(ctx, command);
+      rv = invokeNextInterceptor(ctx, command);
       if (!ctx.isInTxScope())
          transactionLog.logNoTxWrite(command);
       if (command.isSuccessful() && shouldAddMod) xaAdapter.addModification(command);
@@ -253,36 +248,4 @@
    protected VisitableCommand getCommandToReplay(VisitableCommand modification) {
       return modification;
    }
-
-   private Object markTxForRollbackAndRethrow(InvocationContext ctx, Throwable te) throws Throwable {
-      if (ctx.isOriginLocal() && ctx.isInTxScope()) {
-         Transaction transaction = tm.getTransaction();
-         if (transaction != null && isValidRunningTx(transaction)) {
-            transaction.setRollbackOnly();
-         }
-      }
-      throw te;
-   }
-
-   private Object invokeNextAndRollbackTxOnFailure(InvocationContext ctx, WriteCommand command) throws Throwable {
-      Object rv;
-      try {
-         rv = invokeNextInterceptor(ctx, command);
-      } catch (Throwable te) {
-         markTxForRollbackAndRethrow(ctx, te);
-         throw new IllegalStateException("This should not be reached");
-      }
-      return rv;
-   }
-
-   public boolean isValidRunningTx(Transaction tx) throws Exception {
-      int status;
-      try {
-         status = tx.getStatus();
-      }
-      catch (SystemException e) {
-         throw new CacheException("Unexpected!", e);
-      }
-      return status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARING;
-   }
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/TimeoutException.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/TimeoutException.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/TimeoutException.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -29,11 +29,7 @@
  * from all members.
  *
  * @author <a href="mailto:bela at jboss.org">Bela Ban</a>.
- *         <p/>
- *         <p/>
- *         <p><b>Revisions:</b>
- *         <p/>
- *         <p>Dec 28 2002 Bela Ban: first implementation
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 public class TimeoutException extends CacheException {

Modified: branches/4.2.x/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -31,6 +31,7 @@
    public void store(InternalCacheEntry ed) {
       if (ed != null) {
          if (trace) log.trace("Store {0} in dummy map store@{1}", ed, Integer.toHexString(System.identityHashCode(store)));
+         config.failIfNeeded(ed.getKey());
          store.put(ed.getKey(), ed);
       }
    }
@@ -158,6 +159,7 @@
       boolean debug;
       String store = "__DEFAULT_STORE__";
       boolean cleanBetweenRestarts = true;
+      private Object failKey;
 
       public Cfg() {
          setCacheLoaderClassName(DummyInMemoryCacheStore.class.getName());
@@ -197,5 +199,13 @@
       public boolean isCleanBetweenRestarts() {
          return cleanBetweenRestarts;
       }
+
+      public void setFailKey(Object failKey) {
+         this.failKey = failKey;
+      }
+
+      public void failIfNeeded(Object key) {
+         if(failKey != null && failKey.equals(key)) throw new RuntimeException("Induced failure on key:" + key);
+      }
    }
 }

Modified: branches/4.2.x/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -25,46 +25,63 @@
 
    private volatile CountDownLatch replicationLatch;
 
+   private boolean fail;
+
    public ControlledRpcManager(RpcManager realOne) {
       this.realOne = realOne;
    }
 
    private RpcManager realOne;
 
+   public boolean isFail() {
+      return fail;
+   }
+
+   public void setFail(boolean fail) {
+      this.fail = fail;
+   }
+
    public void setReplicationLatch(CountDownLatch replicationLatch) {
       this.replicationLatch = replicationLatch;
    }
 
    public List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) {
+      failIfNeeded();
       waitFirst(rpcCommand);
       return realOne.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter);
    }
 
    public List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) {
+      failIfNeeded();
       waitFirst(rpcCommand);
       return realOne.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue);
    }
 
    public List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout) throws Exception {
+      failIfNeeded();
       waitFirst(rpcCommand);
       return realOne.invokeRemotely(recipients, rpcCommand, mode, timeout);
    }
 
    public void retrieveState(String cacheName, long timeout) throws StateTransferException {
+      failIfNeeded();
       realOne.retrieveState(cacheName, timeout);
    }
 
    public void broadcastRpcCommand(ReplicableCommand rpc, boolean sync) throws ReplicationException {
+      failIfNeeded();
       waitFirst(rpc);
       realOne.broadcastRpcCommand(rpc, sync);
    }
 
    public void broadcastRpcCommand(ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+      failIfNeeded();
       waitFirst(rpc);
       realOne.broadcastRpcCommand(rpc, sync, usePriorityQueue);
    }
 
    private void waitFirst(ReplicableCommand rpcCommand) {
+      failIfNeeded();
       if (!(rpcCommand instanceof ClusteredGetCommand) && !(rpcCommand instanceof LockControlCommand)) {
          System.out.println(Thread.currentThread().getName() + " -- replication trigger called!");
          try {
@@ -77,30 +94,37 @@
    }
 
    public void broadcastRpcCommandInFuture(ReplicableCommand rpc, NotifyingNotifiableFuture<Object> future) {
+      failIfNeeded();
       realOne.broadcastRpcCommandInFuture(rpc, future);
    }
 
    public void broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future) {
+      failIfNeeded();
       realOne.broadcastRpcCommandInFuture(rpc, usePriorityQueue, future);
    }
 
    public void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync) throws ReplicationException {
+      failIfNeeded();
       realOne.invokeRemotely(recipients, rpc, sync);
    }
 
    public void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+      failIfNeeded();
       realOne.invokeRemotely(recipients, rpc, sync, usePriorityQueue);
    }
 
    public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, NotifyingNotifiableFuture<Object> future) {
+      failIfNeeded();
       realOne.invokeRemotelyInFuture(recipients, rpc, future);
    }
 
    public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future) {
+      failIfNeeded();
       realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future);
    }
 
    public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future, long timeout) {
+      failIfNeeded();
       realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout);
    }
 
@@ -115,4 +139,8 @@
    public Address getAddress() {
       return realOne.getAddress();
    }
+
+   public void failIfNeeded() {
+      if (fail) throw new IllegalStateException("Induced failure!");
+   }
 }

Added: branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/CustomInterceptorException.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/CustomInterceptorException.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/CustomInterceptorException.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -0,0 +1,45 @@
+package org.infinispan.tx.exception;
+
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.CustomInterceptorConfigTest;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "CustomInterceptorException")
+public class CustomInterceptorException extends SingleCacheManagerTest {
+
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      EmbeddedCacheManager eCm =
+            TestCacheManagerFactory.createCacheManager(getDefaultClusteredConfig(Configuration.CacheMode.LOCAL), true);
+      eCm.getCache().getAdvancedCache().addInterceptor(new CustomInterceptorConfigTest.DummyInterceptor() {
+         @Override
+         public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
+            throw new IllegalStateException("Induce failure!");
+         }
+      }, 1);
+      return eCm;
+   }
+
+   public void testFailure() throws Exception {
+      TransactionManager transactionManager = cache.getAdvancedCache().getTransactionManager();
+      transactionManager.begin();
+      try {
+         cache.put("k", "v");
+         assert false;
+      } catch (Exception e) {
+         assert transactionManager.getTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK;
+      }
+   }
+}

Copied: branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/ExplicitLockingAndTimeoutTest.java (from rev 2374, branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java)
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/ExplicitLockingAndTimeoutTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/ExplicitLockingAndTimeoutTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -0,0 +1,139 @@
+package org.infinispan.tx.exception;
+
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.control.LockControlCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.testng.annotations.Test;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "tx.timeout.EagerLockingAndTimeoutTest")
+public class ExplicitLockingAndTimeoutTest extends MultipleCacheManagersTest {
+
+   private LockManager lm1;
+   private LockManager lm0;
+   private TransactionTable txTable0;
+   private TransactionTable txTable1;
+   private TransactionManager tm;
+   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration defaultConfig = getDefaultConfig();
+      defaultConfig.setLockAcquisitionTimeout(500);
+      defaultConfig.setUseLockStriping(false);
+      addClusterEnabledCacheManager(defaultConfig);
+      addClusterEnabledCacheManager(defaultConfig);
+      lm0 = TestingUtil.extractLockManager(cache(0));
+      lm1 = TestingUtil.extractLockManager(cache(1));
+      txTable0 = TestingUtil.getTransactionTable(cache(0));
+      txTable1 = TestingUtil.getTransactionTable(cache(1));
+      tm = cache(0).getAdvancedCache().getTransactionManager();
+      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
+      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
+   }
+
+   protected Configuration getDefaultConfig() {
+      return getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+   }
+
+   public void testExplicitLockingRemoteTimeout() throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
+      txStatus.reset();
+      tm.begin();
+      cache(1).put("k1", "v1");
+      Transaction k1LockOwner = tm.suspend();
+      assert lm1.isLocked("k1");
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      tm.begin();
+      cache(0).getAdvancedCache().lock("k2");
+      assert lm0.isLocked("k2");
+      assert lm1.isLocked("k2");
+
+      try {
+         cache(0).getAdvancedCache().lock("k1");
+         assert false;
+      } catch (TimeoutException e) {
+         //expected
+      }
+
+      assert txStatus.teReceived;
+      assert txStatus.isTxInTableAfterTeOnEagerLocking;
+      //expect 1 as k1 is locked by the other tx
+      assertEquals(lm1.isLocked("k2"), false, "Even though rollback was not received yet lock on k2, which was acquired, is no longer held");
+      assert tm.getStatus() == Status.STATUS_MARKED_ROLLBACK;
+
+      assertEquals(1, txTable0.getLocalTxCount());
+      assertEquals(1, txTable1.getLocalTxCount());
+      assertEquals(1, txTable1.getRemoteTxCount());
+
+      tm.rollback();
+      assertEquals(0, txTable0.getLocalTxCount());
+      assertEquals(1, txTable1.getLocalTxCount());
+      assertEquals(0, txTable1.getRemoteTxCount());
+
+
+      tm.resume(k1LockOwner);
+      tm.commit();
+      assertEquals("v1", cache(0).get("k1"));
+      assertEquals("v1", cache(1).get("k1"));
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, txTable0.getLocalTxCount());
+      assertEquals(0, lm0.getNumberOfLocksHeld());
+      assertEquals(0, lm1.getNumberOfLocksHeld());
+   }
+
+   private class TxStatusInterceptor extends CommandInterceptor {
+
+      private boolean teReceived;
+
+      private boolean isTxInTableAfterTeOnEagerLocking;
+
+      private int numLocksAfterTeOnEagerLocking;
+
+      @Override
+      public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
+         try {
+            return invokeNextInterceptor(ctx, command);
+         } catch (TimeoutException te) {
+            teReceived = true;
+            isTxInTableAfterTeOnEagerLocking = txTable1.containRemoteTx(ctx.getGlobalTransaction());
+            numLocksAfterTeOnEagerLocking = lm1.getNumberOfLocksHeld();
+            throw te;
+         }
+      }
+
+      @Override
+      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+         return super.handleDefault(ctx, command);
+      }
+
+      public void reset() {
+         this.teReceived = false;
+         this.isTxInTableAfterTeOnEagerLocking = false;
+         this.numLocksAfterTeOnEagerLocking = -1;
+      }
+   }
+}
\ No newline at end of file

Added: branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/ReplicationExceptionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/ReplicationExceptionTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/ReplicationExceptionTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -0,0 +1,43 @@
+package org.infinispan.tx.exception;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.tx.dld.BaseDldTest;
+import org.infinispan.tx.dld.ControlledRpcManager;
+import org.testng.annotations.Test;
+
+import javax.transaction.RollbackException;
+import javax.transaction.TransactionManager;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(testName = "tx.exception.ReplicationExceptionTest")
+public class ReplicationExceptionTest extends MultipleCacheManagersTest {
+   private ControlledRpcManager crm0;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
+      registerCacheManager(TestCacheManagerFactory.createCacheManager(config, true));
+      registerCacheManager(TestCacheManagerFactory.createCacheManager(config, true));
+      TestingUtil.blockUntilViewsReceived(10000, cache(0), cache(1));
+      crm0 = BaseDldTest.replaceRpcManager(cache(0));
+   }
+
+
+   public void testReplicationFailure() throws Exception {
+      TransactionManager tm = cache(0).getAdvancedCache().getTransactionManager();
+      tm.begin();
+      cache(0).put("k0","v");
+      try {
+         tm.commit();
+         assert false;
+      } catch (RollbackException e) {
+         //expected
+      }
+   }
+}

Copied: branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndInvalidationTimeoutTest.java (from rev 2374, branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java)
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndInvalidationTimeoutTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndInvalidationTimeoutTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -0,0 +1,174 @@
+package org.infinispan.tx.exception;
+
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test (testName = "tx.timeout.TxAndInvalidationTimeoutTest", groups = "functional")
+public class TxAndInvalidationTimeoutTest extends MultipleCacheManagersTest {
+
+   private static Log log = LogFactory.getLog(TxAndInvalidationTimeoutTest.class);
+
+   private LockManager lm1;
+   private LockManager lm0;
+   private TransactionTable txTable0;
+   private TransactionTable txTable1;
+   private TransactionManager tm;
+   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration defaultConfig = getDefaultConfig();
+      defaultConfig.setLockAcquisitionTimeout(500);
+      defaultConfig.setUseLockStriping(false);
+      addClusterEnabledCacheManager(defaultConfig);
+      addClusterEnabledCacheManager(defaultConfig);
+      lm0 = TestingUtil.extractLockManager(cache(0));
+      lm1 = TestingUtil.extractLockManager(cache(1));
+      txTable0 = TestingUtil.getTransactionTable(cache(0));
+      txTable1 = TestingUtil.getTransactionTable(cache(1));
+      tm = cache(0).getAdvancedCache().getTransactionManager();
+      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
+      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
+   }
+
+   protected Configuration getDefaultConfig() {
+      return getDefaultClusteredConfig(Configuration.CacheMode.INVALIDATION_SYNC, true);
+   }
+
+   public void testPutTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).put("k1", "v2222");
+         }
+      });
+   }
+
+   public void testRemoveTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).remove("k1");
+         }
+      });
+   }
+
+   public void testPutAllTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            Map toAdd = new HashMap();
+            toAdd.put("k1", "v22222");
+            cache(0).putAll(toAdd);
+         }
+      });
+   }
+
+
+   private void runAssertion(CacheOperation operation) throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
+      txStatus.reset();
+      tm.begin();
+      cache(1).put("k1", "v1");
+      Transaction k1LockOwner = tm.suspend();
+      assert lm1.isLocked("k1");
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      tm.begin();
+      cache(0).put("k2", "v2");
+      assert lm0.isLocked("k2");
+      assert !lm1.isLocked("k2");
+
+      operation.execute();
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      assertEquals(1, txTable0.getLocalTxCount());
+
+
+      log.trace("Before commit");
+
+      try {
+         tm.commit();
+         assert false;
+      } catch (RollbackException re) {
+         //expected
+      }
+      assert txStatus.teReceived;
+      //expect 1 as k1 is locked by the other tx
+      assertEquals(txStatus.numLocksAfterTeOnInvalidate, 1, "This would make sure that locks are being released quickly, not waiting for a remote rollback to happen");
+
+      assertEquals(0, txTable0.getLocalTxCount());
+      assertEquals(1, txTable1.getLocalTxCount());
+
+      log.trace("Right before second commit");
+      tm.resume(k1LockOwner);
+      tm.commit();
+      assertEquals(null, cache(0).get("k1"));
+      assertEquals("v1", cache(1).get("k1"));
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, lm0.getNumberOfLocksHeld());
+      assertEquals(0, lm1.getNumberOfLocksHeld());
+   }
+
+   private class TxStatusInterceptor extends CommandInterceptor {
+
+      private boolean teReceived;
+
+      private int numLocksAfterTeOnInvalidate;
+
+      @Override
+      public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand invalidateCommand) throws Throwable {
+         try {
+            return invokeNextInterceptor(ctx, invalidateCommand);
+         } catch (TimeoutException te) {
+            numLocksAfterTeOnInvalidate = lm1.getNumberOfLocksHeld();
+            teReceived = true;
+            throw te;
+         }
+      }
+
+      @Override
+      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+         return super.handleDefault(ctx, command);
+      }
+
+      public void reset() {
+         this.teReceived = false;
+         this.numLocksAfterTeOnInvalidate = -1;
+      }
+   }
+
+   public interface CacheOperation {
+
+      public abstract void execute();
+   }
+}

Copied: branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndRemoteTimeoutExceptionTest.java (from rev 2374, branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java)
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndRemoteTimeoutExceptionTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndRemoteTimeoutExceptionTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -0,0 +1,200 @@
+package org.infinispan.tx.exception;
+
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tester for https://jira.jboss.org/browse/ISPN-629.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "tx.TestTxAndRemoteTimeoutException")
+public class TxAndRemoteTimeoutExceptionTest extends MultipleCacheManagersTest {
+
+   private static Log log = LogFactory.getLog(TxAndRemoteTimeoutExceptionTest.class);
+
+   private LockManager lm1;
+   private LockManager lm0;
+   private TransactionTable txTable0;
+   private TransactionTable txTable1;
+   private TransactionManager tm;
+   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration defaultConfig = getDefaultConfig();
+      defaultConfig.setLockAcquisitionTimeout(500);
+      defaultConfig.setUseLockStriping(false);
+      addClusterEnabledCacheManager(defaultConfig);
+      addClusterEnabledCacheManager(defaultConfig);
+      lm0 = TestingUtil.extractLockManager(cache(0));
+      lm1 = TestingUtil.extractLockManager(cache(1));
+      txTable0 = TestingUtil.getTransactionTable(cache(0));
+      txTable1 = TestingUtil.getTransactionTable(cache(1));
+      tm = cache(0).getAdvancedCache().getTransactionManager();
+      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
+      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
+   }
+
+   protected Configuration getDefaultConfig() {
+      return getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+   }
+
+   public void testClearTimeoutsInTx() throws Exception {
+      cache(0).put("k1", "value");
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).clear();
+         }
+      });
+   }
+
+   public void testPutTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).put("k1", "v2222");
+         }
+      });
+   }
+
+   public void testRemoveTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).remove("k1");
+         }
+      });
+   }
+
+   public void testReplaceTimeoutsInTx() throws Exception {
+      cache(1).put("k1", "value");
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache(0).replace("k1", "newValue");
+         }
+      });
+   }
+
+   public void testPutAllTimeoutsInTx() throws Exception {
+      runAssertion(new CacheOperation() {
+         @Override
+         public void execute() {
+            Map toAdd = new HashMap();
+            toAdd.put("k1", "v22222");
+            cache(0).putAll(toAdd);
+         }
+      });
+   }
+
+
+   private void runAssertion(CacheOperation operation) throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
+      txStatus.reset();
+      tm.begin();
+      cache(1).put("k1", "v1");
+      Transaction k1LockOwner = tm.suspend();
+      assert lm1.isLocked("k1");
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      tm.begin();
+      cache(0).put("k2", "v2");
+      assert lm0.isLocked("k2");
+      assert !lm1.isLocked("k2");
+
+      operation.execute();
+
+      assertEquals(1, txTable1.getLocalTxCount());
+      assertEquals(1, txTable0.getLocalTxCount());
+
+
+      try {
+         tm.commit();
+         assert false;
+      } catch (RollbackException re) {
+         //expected
+      }
+      assert txStatus.teReceived;
+      assert txStatus.isTxInTableAfterTeOnPrepare;
+      //expect 1 as k1 is locked by the other tx
+      assertEquals(txStatus.numLocksAfterTeOnPrepare, 1, "This would make sure that locks are being released quickly, not waiting for a remote rollback to happen");
+
+      assertEquals(0, txTable0.getLocalTxCount());
+      assertEquals(1, txTable1.getLocalTxCount());
+
+      log.trace("Right before second commit");
+      tm.resume(k1LockOwner);
+      tm.commit();
+      assertEquals("v1", cache(0).get("k1"));
+      assertEquals("v1", cache(1).get("k1"));
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, txTable1.getLocalTxCount());
+      assertEquals(0, lm0.getNumberOfLocksHeld());
+      assertEquals(0, lm1.getNumberOfLocksHeld());
+   }
+
+   private class TxStatusInterceptor extends CommandInterceptor {
+
+      private boolean teReceived;
+
+      private boolean isTxInTableAfterTeOnPrepare;
+
+      private int numLocksAfterTeOnPrepare;
+
+      @Override
+      public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
+         try {
+            return invokeNextInterceptor(ctx, command);
+         } catch (TimeoutException te) {
+            numLocksAfterTeOnPrepare = lm1.getNumberOfLocksHeld();
+            isTxInTableAfterTeOnPrepare = txTable1.containRemoteTx(ctx.getGlobalTransaction());
+            teReceived = true;
+            throw te;
+         }
+      }
+
+      @Override
+      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+         return super.handleDefault(ctx, command);
+      }
+
+      public void reset() {
+         this.teReceived = false;
+         this.isTxInTableAfterTeOnPrepare = false;
+         this.numLocksAfterTeOnPrepare = -1;
+      }
+   }
+
+   public interface CacheOperation {
+
+      public abstract void execute();
+   }
+}

Copied: branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndTimeoutExceptionTest.java (from rev 2374, branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java)
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndTimeoutExceptionTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/exception/TxAndTimeoutExceptionTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -0,0 +1,143 @@
+package org.infinispan.tx.exception;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.transaction.xa.TransactionTable;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.testng.annotations.Test;
+
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tester for https://jira.jboss.org/browse/ISPN-629.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(testName = "tx.TxAndTimeoutExceptionTest", groups = "functional")
+public class TxAndTimeoutExceptionTest extends SingleCacheManagerTest {
+
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      Configuration config = getDefaultStandaloneConfig(true);
+      config.setUseLockStriping(false);
+      config.setLockAcquisitionTimeout(1000);
+      EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(config, true);
+      cache = cm.getCache();
+      return cm;
+   }
+
+
+   public void testPutTimeoutsInTx() throws Exception {
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache.put("k1", "v2222");
+         }
+      });
+   }
+
+   public void testRemoveTimeoutsInTx() throws Exception {
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache.remove("k1");
+         }
+      });
+   }
+
+   public void testClearTimeoutsInTx() throws Exception {
+      cache.put("k1", "value");
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache.clear();
+         }
+      });
+   }
+
+   public void testReplaceTimeoutsInTx() throws Exception {
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            cache.replace("k1", "newValue");
+         }
+      });
+   }
+
+   public void testPutAllTimeoutsInTx() throws Exception {
+      assertExpectedBehavior(new CacheOperation() {
+         @Override
+         public void execute() {
+            Map toAdd = new HashMap();
+            toAdd.put("k1", "v22222");
+            cache.putAll(toAdd);
+         }
+      });
+   }
+
+   private void assertExpectedBehavior(CacheOperation op) throws Exception {
+      LockManager lm = TestingUtil.extractLockManager(cache);
+      TransactionTable txTable = TestingUtil.getTransactionTable(cache);
+      TransactionManager tm = cache.getAdvancedCache().getTransactionManager();
+      tm.begin();
+      cache.put("k1", "v1");
+      Transaction k1LockOwner = tm.suspend();
+      assert lm.isLocked("k1");
+
+      assertEquals(1, txTable.getLocalTxCount());
+      tm.begin();
+      cache.put("k2", "v2");
+      assert lm.isLocked("k2");
+      assertEquals(2, txTable.getLocalTxCount());
+      try {
+         op.execute();
+         assert false : "Timeout exception expected";
+      } catch (TimeoutException e) {
+         //expected
+      }
+
+      //make sure that locks acquired by that tx were released even before the transaction is rolled back, the tx object
+      //was marked for rollback 
+      Transaction transaction = tm.getTransaction();
+      assert transaction != null;
+      assert transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK;
+      assert !lm.isLocked("k2");
+      assert lm.isLocked("k1");
+      try {
+         cache.put("k3", "v3");
+         assert false;
+      } catch (IllegalStateException e) {
+         //expected
+      }
+      assertEquals(txTable.getLocalTxCount(), 2);
+
+      //now the TM is expected to rollback the tx
+      tm.rollback();
+      assertEquals(txTable.getLocalTxCount(), 1);
+
+      tm.resume(k1LockOwner);
+      tm.commit();
+
+      //now test that the other tx works as expected
+      assertEquals(0, txTable.getLocalTxCount());
+      assertEquals(cache.get("k1"), "v1");
+      assert !lm.isLocked("k1");
+      assertEquals(txTable.getLocalTxCount(), 0);
+   }
+
+   public interface CacheOperation {
+
+      public abstract void execute();
+   }
+}

Deleted: branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/ExplicitLockingAndTimeoutTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -1,139 +0,0 @@
-package org.infinispan.tx.timeout;
-
-import org.infinispan.commands.VisitableCommand;
-import org.infinispan.commands.control.LockControlCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.context.impl.TxInvocationContext;
-import org.infinispan.interceptors.base.CommandInterceptor;
-import org.infinispan.test.MultipleCacheManagersTest;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.transaction.xa.TransactionTable;
-import org.infinispan.util.concurrent.TimeoutException;
-import org.infinispan.util.concurrent.locks.LockManager;
-import org.testng.annotations.Test;
-
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.2
- */
- at Test(groups = "functional", testName = "tx.timeout.EagerLockingAndTimeoutTest")
-public class ExplicitLockingAndTimeoutTest extends MultipleCacheManagersTest {
-
-   private LockManager lm1;
-   private LockManager lm0;
-   private TransactionTable txTable0;
-   private TransactionTable txTable1;
-   private TransactionManager tm;
-   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
-
-   @Override
-   protected void createCacheManagers() throws Throwable {
-      Configuration defaultConfig = getDefaultConfig();
-      defaultConfig.setLockAcquisitionTimeout(500);
-      defaultConfig.setUseLockStriping(false);
-      addClusterEnabledCacheManager(defaultConfig);
-      addClusterEnabledCacheManager(defaultConfig);
-      lm0 = TestingUtil.extractLockManager(cache(0));
-      lm1 = TestingUtil.extractLockManager(cache(1));
-      txTable0 = TestingUtil.getTransactionTable(cache(0));
-      txTable1 = TestingUtil.getTransactionTable(cache(1));
-      tm = cache(0).getAdvancedCache().getTransactionManager();
-      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
-      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
-   }
-
-   protected Configuration getDefaultConfig() {
-      return getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
-   }
-
-   public void testExplicitLockingRemoteTimeout() throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
-      txStatus.reset();
-      tm.begin();
-      cache(1).put("k1", "v1");
-      Transaction k1LockOwner = tm.suspend();
-      assert lm1.isLocked("k1");
-
-      assertEquals(1, txTable1.getLocalTxCount());
-      tm.begin();
-      cache(0).getAdvancedCache().lock("k2");
-      assert lm0.isLocked("k2");
-      assert lm1.isLocked("k2");
-
-      try {
-         cache(0).getAdvancedCache().lock("k1");
-         assert false;
-      } catch (TimeoutException e) {
-         //expected
-      }
-
-      assert txStatus.teReceived;
-      assert txStatus.isTxInTableAfterTeOnEagerLocking;
-      //expect 1 as k1 is locked by the other tx
-      assertEquals(lm1.isLocked("k2"), false, "Even though rollback was not received yet lock on k2, which was acquired, is no longer held");
-      assert tm.getStatus() == Status.STATUS_MARKED_ROLLBACK;
-
-      assertEquals(1, txTable0.getLocalTxCount());
-      assertEquals(1, txTable1.getLocalTxCount());
-      assertEquals(1, txTable1.getRemoteTxCount());
-
-      tm.rollback();
-      assertEquals(0, txTable0.getLocalTxCount());
-      assertEquals(1, txTable1.getLocalTxCount());
-      assertEquals(0, txTable1.getRemoteTxCount());
-
-
-      tm.resume(k1LockOwner);
-      tm.commit();
-      assertEquals("v1", cache(0).get("k1"));
-      assertEquals("v1", cache(1).get("k1"));
-      assertEquals(0, txTable1.getLocalTxCount());
-      assertEquals(0, txTable0.getLocalTxCount());
-      assertEquals(0, lm0.getNumberOfLocksHeld());
-      assertEquals(0, lm1.getNumberOfLocksHeld());
-   }
-
-   private class TxStatusInterceptor extends CommandInterceptor {
-
-      private boolean teReceived;
-
-      private boolean isTxInTableAfterTeOnEagerLocking;
-
-      private int numLocksAfterTeOnEagerLocking;
-
-      @Override
-      public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
-         try {
-            return invokeNextInterceptor(ctx, command);
-         } catch (TimeoutException te) {
-            teReceived = true;
-            isTxInTableAfterTeOnEagerLocking = txTable1.containRemoteTx(ctx.getGlobalTransaction());
-            numLocksAfterTeOnEagerLocking = lm1.getNumberOfLocksHeld();
-            throw te;
-         }
-      }
-
-      @Override
-      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
-         return super.handleDefault(ctx, command);
-      }
-
-      public void reset() {
-         this.teReceived = false;
-         this.isTxInTableAfterTeOnEagerLocking = false;
-         this.numLocksAfterTeOnEagerLocking = -1;
-      }
-   }
-}
\ No newline at end of file

Deleted: branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndInvalidationTimeoutTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -1,174 +0,0 @@
-package org.infinispan.tx.timeout;
-
-import org.infinispan.commands.VisitableCommand;
-import org.infinispan.commands.write.InvalidateCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.interceptors.base.CommandInterceptor;
-import org.infinispan.test.MultipleCacheManagersTest;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.transaction.xa.TransactionTable;
-import org.infinispan.util.concurrent.TimeoutException;
-import org.infinispan.util.concurrent.locks.LockManager;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-import org.testng.annotations.Test;
-
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.2
- */
- at Test (testName = "tx.timeout.TxAndInvalidationTimeoutTest", groups = "functional")
-public class TxAndInvalidationTimeoutTest extends MultipleCacheManagersTest {
-
-   private static Log log = LogFactory.getLog(TxAndInvalidationTimeoutTest.class);
-
-   private LockManager lm1;
-   private LockManager lm0;
-   private TransactionTable txTable0;
-   private TransactionTable txTable1;
-   private TransactionManager tm;
-   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
-
-   @Override
-   protected void createCacheManagers() throws Throwable {
-      Configuration defaultConfig = getDefaultConfig();
-      defaultConfig.setLockAcquisitionTimeout(500);
-      defaultConfig.setUseLockStriping(false);
-      addClusterEnabledCacheManager(defaultConfig);
-      addClusterEnabledCacheManager(defaultConfig);
-      lm0 = TestingUtil.extractLockManager(cache(0));
-      lm1 = TestingUtil.extractLockManager(cache(1));
-      txTable0 = TestingUtil.getTransactionTable(cache(0));
-      txTable1 = TestingUtil.getTransactionTable(cache(1));
-      tm = cache(0).getAdvancedCache().getTransactionManager();
-      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
-      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
-   }
-
-   protected Configuration getDefaultConfig() {
-      return getDefaultClusteredConfig(Configuration.CacheMode.INVALIDATION_SYNC, true);
-   }
-
-   public void testPutTimeoutsInTx() throws Exception {
-      runAssertion(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache(0).put("k1", "v2222");
-         }
-      });
-   }
-
-   public void testRemoveTimeoutsInTx() throws Exception {
-      runAssertion(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache(0).remove("k1");
-         }
-      });
-   }
-
-   public void testPutAllTimeoutsInTx() throws Exception {
-      runAssertion(new CacheOperation() {
-         @Override
-         public void execute() {
-            Map toAdd = new HashMap();
-            toAdd.put("k1", "v22222");
-            cache(0).putAll(toAdd);
-         }
-      });
-   }
-
-
-   private void runAssertion(CacheOperation operation) throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
-      txStatus.reset();
-      tm.begin();
-      cache(1).put("k1", "v1");
-      Transaction k1LockOwner = tm.suspend();
-      assert lm1.isLocked("k1");
-
-      assertEquals(1, txTable1.getLocalTxCount());
-      tm.begin();
-      cache(0).put("k2", "v2");
-      assert lm0.isLocked("k2");
-      assert !lm1.isLocked("k2");
-
-      operation.execute();
-
-      assertEquals(1, txTable1.getLocalTxCount());
-      assertEquals(1, txTable0.getLocalTxCount());
-
-
-      log.trace("Before commit");
-
-      try {
-         tm.commit();
-         assert false;
-      } catch (RollbackException re) {
-         //expected
-      }
-      assert txStatus.teReceived;
-      //expect 1 as k1 is locked by the other tx
-      assertEquals(txStatus.numLocksAfterTeOnInvalidate, 1, "This would make sure that locks are being released quickly, not waiting for a remote rollback to happen");
-
-      assertEquals(0, txTable0.getLocalTxCount());
-      assertEquals(1, txTable1.getLocalTxCount());
-
-      log.trace("Right before second commit");
-      tm.resume(k1LockOwner);
-      tm.commit();
-      assertEquals(null, cache(0).get("k1"));
-      assertEquals("v1", cache(1).get("k1"));
-      assertEquals(0, txTable1.getLocalTxCount());
-      assertEquals(0, txTable1.getLocalTxCount());
-      assertEquals(0, lm0.getNumberOfLocksHeld());
-      assertEquals(0, lm1.getNumberOfLocksHeld());
-   }
-
-   private class TxStatusInterceptor extends CommandInterceptor {
-
-      private boolean teReceived;
-
-      private int numLocksAfterTeOnInvalidate;
-
-      @Override
-      public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand invalidateCommand) throws Throwable {
-         try {
-            return invokeNextInterceptor(ctx, invalidateCommand);
-         } catch (TimeoutException te) {
-            numLocksAfterTeOnInvalidate = lm1.getNumberOfLocksHeld();
-            teReceived = true;
-            throw te;
-         }
-      }
-
-      @Override
-      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
-         return super.handleDefault(ctx, command);
-      }
-
-      public void reset() {
-         this.teReceived = false;
-         this.numLocksAfterTeOnInvalidate = -1;
-      }
-   }
-
-   public interface CacheOperation {
-
-      public abstract void execute();
-   }
-}

Deleted: branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndRemoteTimeoutExceptionTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -1,202 +0,0 @@
-package org.infinispan.tx.timeout;
-
-import org.infinispan.commands.VisitableCommand;
-import org.infinispan.commands.tx.PrepareCommand;
-import org.infinispan.commands.write.InvalidateCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.context.impl.TxInvocationContext;
-import org.infinispan.interceptors.base.CommandInterceptor;
-import org.infinispan.test.MultipleCacheManagersTest;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.transaction.xa.TransactionTable;
-import org.infinispan.util.concurrent.TimeoutException;
-import org.infinispan.util.concurrent.locks.LockManager;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-import org.testng.annotations.Test;
-
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * Tester for https://jira.jboss.org/browse/ISPN-629.
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.2
- */
- at Test(groups = "functional", testName = "tx.TestTxAndRemoteTimeoutException")
-public class TxAndRemoteTimeoutExceptionTest extends MultipleCacheManagersTest {
-
-   private static Log log = LogFactory.getLog(TxAndRemoteTimeoutExceptionTest.class);
-
-   private LockManager lm1;
-   private LockManager lm0;
-   private TransactionTable txTable0;
-   private TransactionTable txTable1;
-   private TransactionManager tm;
-   private TxStatusInterceptor txStatus = new TxStatusInterceptor();
-
-   @Override
-   protected void createCacheManagers() throws Throwable {
-      Configuration defaultConfig = getDefaultConfig();
-      defaultConfig.setLockAcquisitionTimeout(500);
-      defaultConfig.setUseLockStriping(false);
-      addClusterEnabledCacheManager(defaultConfig);
-      addClusterEnabledCacheManager(defaultConfig);
-      lm0 = TestingUtil.extractLockManager(cache(0));
-      lm1 = TestingUtil.extractLockManager(cache(1));
-      txTable0 = TestingUtil.getTransactionTable(cache(0));
-      txTable1 = TestingUtil.getTransactionTable(cache(1));
-      tm = cache(0).getAdvancedCache().getTransactionManager();
-      cache(1).getAdvancedCache().addInterceptor(txStatus, 0);
-      TestingUtil.blockUntilViewReceived(cache(0), 2, 10000);
-   }
-
-   protected Configuration getDefaultConfig() {
-      return getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
-   }
-
-   public void testClearTimeoutsInTx() throws Exception {
-      cache(0).put("k1", "value");
-      runAssertion(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache(0).clear();
-         }
-      });
-   }
-
-   public void testPutTimeoutsInTx() throws Exception {
-      runAssertion(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache(0).put("k1", "v2222");
-         }
-      });
-   }
-
-   public void testRemoveTimeoutsInTx() throws Exception {
-      runAssertion(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache(0).remove("k1");
-         }
-      });
-   }
-
-   public void testReplaceTimeoutsInTx() throws Exception {
-      cache(1).put("k1", "value");
-      runAssertion(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache(0).replace("k1", "newValue");
-         }
-      });
-   }
-
-   public void testPutAllTimeoutsInTx() throws Exception {
-      runAssertion(new CacheOperation() {
-         @Override
-         public void execute() {
-            Map toAdd = new HashMap();
-            toAdd.put("k1", "v22222");
-            cache(0).putAll(toAdd);
-         }
-      });
-   }
-
-
-   private void runAssertion(CacheOperation operation) throws NotSupportedException, SystemException, HeuristicMixedException, HeuristicRollbackException, InvalidTransactionException, RollbackException {
-      txStatus.reset();
-      tm.begin();
-      cache(1).put("k1", "v1");
-      Transaction k1LockOwner = tm.suspend();
-      assert lm1.isLocked("k1");
-
-      assertEquals(1, txTable1.getLocalTxCount());
-      tm.begin();
-      cache(0).put("k2", "v2");
-      assert lm0.isLocked("k2");
-      assert !lm1.isLocked("k2");
-
-      operation.execute();
-
-      assertEquals(1, txTable1.getLocalTxCount());
-      assertEquals(1, txTable0.getLocalTxCount());
-
-
-      try {
-         tm.commit();
-         assert false;
-      } catch (RollbackException re) {
-         //expected
-      }
-      assert txStatus.teReceived;
-      assert txStatus.isTxInTableAfterTeOnPrepare;
-      //expect 1 as k1 is locked by the other tx
-      assertEquals(txStatus.numLocksAfterTeOnPrepare, 1, "This would make sure that locks are being released quickly, not waiting for a remote rollback to happen");
-
-      assertEquals(0, txTable0.getLocalTxCount());
-      assertEquals(1, txTable1.getLocalTxCount());
-
-      log.trace("Right before second commit");
-      tm.resume(k1LockOwner);
-      tm.commit();
-      assertEquals("v1", cache(0).get("k1"));
-      assertEquals("v1", cache(1).get("k1"));
-      assertEquals(0, txTable1.getLocalTxCount());
-      assertEquals(0, txTable1.getLocalTxCount());
-      assertEquals(0, lm0.getNumberOfLocksHeld());
-      assertEquals(0, lm1.getNumberOfLocksHeld());
-   }
-
-   private class TxStatusInterceptor extends CommandInterceptor {
-
-      private boolean teReceived;
-
-      private boolean isTxInTableAfterTeOnPrepare;
-
-      private int numLocksAfterTeOnPrepare;
-
-      @Override
-      public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
-         try {
-            return invokeNextInterceptor(ctx, command);
-         } catch (TimeoutException te) {
-            numLocksAfterTeOnPrepare = lm1.getNumberOfLocksHeld();
-            isTxInTableAfterTeOnPrepare = txTable1.containRemoteTx(ctx.getGlobalTransaction());
-            teReceived = true;
-            throw te;
-         }
-      }
-
-      @Override
-      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
-         return super.handleDefault(ctx, command);
-      }
-
-      public void reset() {
-         this.teReceived = false;
-         this.isTxInTableAfterTeOnPrepare = false;
-         this.numLocksAfterTeOnPrepare = -1;
-      }
-   }
-
-   public interface CacheOperation {
-
-      public abstract void execute();
-   }
-}

Deleted: branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java	2010-09-15 14:45:45 UTC (rev 2382)
+++ branches/4.2.x/core/src/test/java/org/infinispan/tx/timeout/TxAndTimeoutExceptionTest.java	2010-09-15 15:59:38 UTC (rev 2383)
@@ -1,143 +0,0 @@
-package org.infinispan.tx.timeout;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.manager.EmbeddedCacheManager;
-import org.infinispan.test.SingleCacheManagerTest;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.infinispan.transaction.xa.TransactionTable;
-import org.infinispan.util.concurrent.TimeoutException;
-import org.infinispan.util.concurrent.locks.LockManager;
-import org.testng.annotations.Test;
-
-import javax.transaction.Status;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * Tester for https://jira.jboss.org/browse/ISPN-629.
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.2
- */
- at Test(testName = "tx.TxAndTimeoutExceptionTest", groups = "functional")
-public class TxAndTimeoutExceptionTest extends SingleCacheManagerTest {
-
-   @Override
-   protected EmbeddedCacheManager createCacheManager() throws Exception {
-      Configuration config = getDefaultStandaloneConfig(true);
-      config.setUseLockStriping(false);
-      config.setLockAcquisitionTimeout(1000);
-      EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(config, true);
-      cache = cm.getCache();
-      return cm;
-   }
-
-
-   public void testPutTimeoutsInTx() throws Exception {
-      assertExpectedBehavior(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache.put("k1", "v2222");
-         }
-      });
-   }
-
-   public void testRemoveTimeoutsInTx() throws Exception {
-      assertExpectedBehavior(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache.remove("k1");
-         }
-      });
-   }
-
-   public void testClearTimeoutsInTx() throws Exception {
-      cache.put("k1", "value");
-      assertExpectedBehavior(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache.clear();
-         }
-      });
-   }
-
-   public void testReplaceTimeoutsInTx() throws Exception {
-      assertExpectedBehavior(new CacheOperation() {
-         @Override
-         public void execute() {
-            cache.replace("k1", "newValue");
-         }
-      });
-   }
-
-   public void testPutAllTimeoutsInTx() throws Exception {
-      assertExpectedBehavior(new CacheOperation() {
-         @Override
-         public void execute() {
-            Map toAdd = new HashMap();
-            toAdd.put("k1", "v22222");
-            cache.putAll(toAdd);
-         }
-      });
-   }
-
-   private void assertExpectedBehavior(CacheOperation op) throws Exception {
-      LockManager lm = TestingUtil.extractLockManager(cache);
-      TransactionTable txTable = TestingUtil.getTransactionTable(cache);
-      TransactionManager tm = cache.getAdvancedCache().getTransactionManager();
-      tm.begin();
-      cache.put("k1", "v1");
-      Transaction k1LockOwner = tm.suspend();
-      assert lm.isLocked("k1");
-
-      assertEquals(1, txTable.getLocalTxCount());
-      tm.begin();
-      cache.put("k2", "v2");
-      assert lm.isLocked("k2");
-      assertEquals(2, txTable.getLocalTxCount());
-      try {
-         op.execute();
-         assert false : "Timeout exception expected";
-      } catch (TimeoutException e) {
-         //expected
-      }
-
-      //make sure that locks acquired by that tx were released even before the transaction is rolled back, the tx object
-      //was marked for rollback 
-      Transaction transaction = tm.getTransaction();
-      assert transaction != null;
-      assert transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK;
-      assert !lm.isLocked("k2");
-      assert lm.isLocked("k1");
-      try {
-         cache.put("k3", "v3");
-         assert false;
-      } catch (IllegalStateException e) {
-         //expected
-      }
-      assertEquals(txTable.getLocalTxCount(), 2);
-
-      //now the TM is expected to rollback the tx
-      tm.rollback();
-      assertEquals(txTable.getLocalTxCount(), 1);
-
-      tm.resume(k1LockOwner);
-      tm.commit();
-
-      //now test that the other tx works as expected
-      assertEquals(0, txTable.getLocalTxCount());
-      assertEquals(cache.get("k1"), "v1");
-      assert !lm.isLocked("k1");
-      assertEquals(txTable.getLocalTxCount(), 0);
-   }
-
-   public interface CacheOperation {
-
-      public abstract void execute();
-   }
-}



More information about the infinispan-commits mailing list