[infinispan-commits] Infinispan SVN: r2330 - in branches/4.2.x/core/src: main/java/org/infinispan/config and 8 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Sep 6 18:32:55 EDT 2010


Author: mircea.markus
Date: 2010-09-06 18:32:54 -0400 (Mon, 06 Sep 2010)
New Revision: 2330

Added:
   branches/4.2.x/core/src/test/java/org/infinispan/lock/EagerLockingSingleLockTest.java
Modified:
   branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
   branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java
   branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
   branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
   branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java
   branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/notifications/cachelistener/CacheNotifierTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
Log:
[ISPN-615] - Eager locking and key affinity optimisation

Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -117,8 +117,7 @@
 
    @Override
    public String toString() {
-      return getClass().getSimpleName() + "{" +
-            "gtx=" + globalTx +
+      return ", gtx=" + globalTx +
             ", cacheName='" + cacheName + '\'' +
             '}';
    }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -432,6 +432,10 @@
       this.transaction.setUseEagerLocking(useEagerLocking);
    }
 
+   public void setEagerLockSingleNode(boolean eagerLockSingleNode) {
+      this.transaction.setEagerLockSingleNode(eagerLockSingleNode);
+   }
+
    public void setUseReplQueue(boolean useReplQueue) {
       this.clustering.async.setUseReplQueue(useReplQueue);
    }
@@ -606,6 +610,10 @@
       return transaction.useEagerLocking;
    }
 
+   public boolean isEagerLockSingleNode() {
+      return transaction.eagerLockSingleNode;
+   }
+
    public long getStateRetrievalTimeout() {
       return clustering.stateRetrieval.timeout;
    }
@@ -837,6 +845,17 @@
        * */
       @Dynamic
       protected Boolean useEagerLocking = false;
+
+      /**
+       * @configRef desc="Only has effect for DIST mode and when useEagerLocking is set to true. When this is enabled, then only one node
+       * is locked in the cluster, disregarding numOwners config. On the opposite, if this is false, then on all cache.lock() calls
+       * numOwners RPCs are being performed. The node that gets locked is the main data owner, i.e. the node where data
+       * would reside if numOwners==1. If the node where the lock resides crashes, then the transaction is marked for rollback -
+       * data is in a consistent state, no fault tolerance."
+       */
+      @Dynamic
+      protected Boolean eagerLockSingleNode = false;
+
       
       public TransactionType(String transactionManagerLookupClass) {
          this.transactionManagerLookupClass = transactionManagerLookupClass;
@@ -872,8 +891,15 @@
       public void setUseEagerLocking(Boolean useEagerLocking) {
          testImmutability("useEagerLocking");
          this.useEagerLocking = useEagerLocking;
+
       }
 
+      @XmlAttribute
+      public void setEagerLockSingleNode(Boolean eagerLockSingleNode) {
+         testImmutability("useEagerLocking");
+         this.eagerLockSingleNode = eagerLockSingleNode;
+      }
+
       @Override
       public boolean equals(Object o) {
          if (this == o) return true;

Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -2,11 +2,13 @@
 
 import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.remoting.transport.Address;
 import org.infinispan.transaction.xa.GlobalTransaction;
 import org.infinispan.transaction.xa.TransactionXaAdapter;
 import org.infinispan.util.BidirectionalMap;
 
 import javax.transaction.Transaction;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -78,4 +80,8 @@
    public boolean hasLockedKey(Object key) {
       return xaAdapter != null && super.hasLockedKey(key);
    }
+
+   public void remoteLocksAcquired(Collection<Address> nodes) {
+      xaAdapter.locksAcquired(nodes);
+   }
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -18,6 +18,7 @@
  * A component that manages the distribution of elements across a cache cluster
  *
  * @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 @Scope(Scopes.NAMED_CACHE)
@@ -46,6 +47,11 @@
    Map<Object, List<Address>> locateAll(Collection<Object> keys);
 
    /**
+    * Same as {@link #locateAll(java.util.Collection)}, but the list of addresses only contains numOwners owners.
+    */
+   Map<Object, List<Address>> locateAll(Collection<Object> keys, int numOwners);
+
+   /**
     * Transforms a cache entry so it is marked for L1 rather than the primary cache data structure.  This should be done
     * if it is deemed that the entry is targeted for L1 storage rather than storage in the primary data container.
     *

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -259,13 +259,17 @@
    }
 
    public Map<Object, List<Address>> locateAll(Collection<Object> keys) {
+      return locateAll(keys, replCount);
+   }
+
+   public Map<Object, List<Address>> locateAll(Collection<Object> keys, int numOwners) {
       if (consistentHash == null) {
          Map<Object, List<Address>> m = new HashMap<Object, List<Address>>(keys.size());
          List<Address> selfList = Collections.singletonList(self);
          for (Object k : keys) m.put(k, selfList);
          return m;
       }
-      return consistentHash.locateAll(keys, replCount);
+      return consistentHash.locateAll(keys, numOwners);
    }
 
    public void transformForL1(CacheEntry entry) {

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -19,6 +19,7 @@
 import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.LocalTxInvocationContext;
 import org.infinispan.context.impl.TxInvocationContext;
 import org.infinispan.distribution.DistributionManager;
 import org.infinispan.factories.annotations.Inject;
@@ -30,6 +31,7 @@
 import org.infinispan.util.concurrent.NotifyingFutureImpl;
 import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -41,6 +43,7 @@
  * The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
  *
  * @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 public class DistributionInterceptor extends BaseRpcInterceptor {
@@ -189,8 +192,28 @@
 
    @Override
    public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
-      if (ctx.isOriginLocal())
-         rpcManager.invokeRemotely(dm.getAffectedNodes(command.getKeys()), command, true, true);
+      if (ctx.isOriginLocal()) {
+         if (configuration.isEagerLockSingleNode()) {
+            //only main data owner is locked, see: https://jira.jboss.org/browse/ISPN-615
+            Map<Object, List<Address>> toMulticast = dm.locateAll(command.getKeys(), 1);
+
+            //now compile address reunion
+            List<Address> where;
+            if (toMulticast.size() == 1) {//avoid building an extra array, as most often this will be a single key
+               where = toMulticast.values().iterator().next();
+               rpcManager.invokeRemotely(where, command, true, true);
+            } else {
+               where = new ArrayList<Address>();
+               for (List<Address> values:  toMulticast.values()) {
+                  where.addAll(values);
+               }
+               rpcManager.invokeRemotely(where, command, true, true);
+            }
+            ((LocalTxInvocationContext)ctx).remoteLocksAcquired(where);
+         } else {
+            rpcManager.invokeRemotely(dm.getAffectedNodes(command.getKeys()), command, true, true);
+         }
+      }
       return invokeNextInterceptor(ctx, command);
    }
 

Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -58,6 +58,7 @@
  * Interceptor to implement <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a> functionality.
  *
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @author Mircea.Markus at jboss.com
  * @see <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC designs</a>
  * @since 4.0
  */
@@ -156,9 +157,7 @@
          if (goRemoteFirst) {
             Object result = invokeNextInterceptor(ctx, c);
             try {
-               for (Object key : c.getKeys()) {
-                  entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
-               }
+               lockKeysForRemoteTx(ctx, c);
             } catch (Throwable e) {
                //if anything happen during locking then unlock remote
                c.setUnlock(true);
@@ -167,9 +166,7 @@
             }
             return result;
          } else {
-            for (Object key : c.getKeys()) {
-               entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
-            }
+            lockKeysForRemoteTx(ctx, c);
             if (shouldInvokeOnCluster || c.isExplicit())
                return invokeNextInterceptor(ctx, c);
             else
@@ -186,6 +183,12 @@
       }
    }
 
+   private void lockKeysForRemoteTx(TxInvocationContext ctx, LockControlCommand c) throws InterruptedException {
+      for (Object key : c.getKeys()) {
+         entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
+      }
+   }
+
    // write commands
 
    @Override

Modified: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -89,35 +89,47 @@
          final List<Address> leavers = MembershipArithmetic.getMembersLeft(vce.getOldMembers(), vce.getNewMembers());
          if (!leavers.isEmpty()) {
             if (trace) log.trace("Saw {0} leavers - kicking off a lock breaking task", leavers.size());
-            try {
-               lockBreakingService.submit(new Runnable() {
-                  public void run() {
-                     Set<GlobalTransaction> toKill = new HashSet<GlobalTransaction>();
-                     for (GlobalTransaction gt : remoteTransactions.keySet()) {
-                        if (leavers.contains(gt.getAddress())) toKill.add(gt);
-                     }
+            cleanTxForWhichTheOwnerLeft(leavers);
+            if (configuration.isUseEagerLocking() && configuration.isEagerLockSingleNode() && configuration.getCacheMode().isDistributed()) {
+               for (TransactionXaAdapter xaAdapter : localTransactions.values()) {
+                  if (xaAdapter.hasRemoteLocksAcquired(leavers)) {
+                     xaAdapter.markForRollback();
+                  }
+               }
+            }
+         }
+      }
 
-                     if (trace)
-                        log.trace("Global transactions {0} pertain to leavers list {1} and need to be killed", toKill, leavers);
+      private void cleanTxForWhichTheOwnerLeft(final List<Address> leavers) {
+         try {
+            lockBreakingService.submit(new Runnable() {
+               public void run() {
+                  Set<GlobalTransaction> toKill = new HashSet<GlobalTransaction>();
+                  for (GlobalTransaction gt : remoteTransactions.keySet()) {
+                     if (leavers.contains(gt.getAddress())) toKill.add(gt);
+                  }
 
-                     for (GlobalTransaction gtx : toKill) {
-                        if (trace) log.trace("Killing {0}", gtx);
-                        RollbackCommand rc = new RollbackCommand(gtx);
-                        rc.init(invoker, icc, TransactionTable.this);
-                        try {
-                           rc.perform(null);
-                        } catch (Throwable e) {
-                           log.warn("Unable to roll back gtx " + gtx, e);
-                        } finally {
-                           removeRemoteTransaction(gtx);
-                        }
+                  if (trace)
+                     log.trace("Global transactions {0} pertain to leavers list {1} and need to be killed", toKill, leavers);
+
+                  for (GlobalTransaction gtx : toKill) {
+                     if (trace) log.trace("Killing {0}", gtx);
+                     RollbackCommand rc = new RollbackCommand(gtx);
+                     rc.init(invoker, icc, TransactionTable.this);
+                     try {
+                        rc.perform(null);
+                     } catch (Throwable e) {
+                        log.warn("Unable to roll back gtx " + gtx, e);
+                     } finally {
+                        removeRemoteTransaction(gtx);
                      }
                   }
-               });
-            } catch (RejectedExecutionException ree) {
-               log.debug("Unable to submit task to executor", ree);
-            }
+               }
+            });
+         } catch (RejectedExecutionException ree) {
+            log.debug("Unable to submit task to executor", ree);
          }
+
       }
    }
 

Modified: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -10,6 +10,7 @@
 import org.infinispan.context.InvocationContextContainer;
 import org.infinispan.context.impl.LocalTxInvocationContext;
 import org.infinispan.interceptors.InterceptorChain;
+import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.BidirectionalLinkedHashMap;
 import org.infinispan.util.BidirectionalMap;
 import org.infinispan.util.InfinispanCollections;
@@ -21,7 +22,11 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * This acts both as an local {@link org.infinispan.transaction.xa.CacheTransaction} and implementor of an {@link
@@ -50,6 +55,10 @@
    private TransactionTable txTable;
    private Transaction transaction;
 
+   private Set<Address> remoteLockedNodes;
+   private boolean isMarkedForRollback;
+
+
    public TransactionXaAdapter(GlobalTransaction globalTx, InvocationContextContainer icc, InterceptorChain invoker,
                                CommandsFactory commandsFactory, Configuration configuration, TransactionTable txTable,
                                Transaction transaction) {
@@ -71,6 +80,7 @@
    }
 
    public int prepare(Xid xid) throws XAException {
+      checkMarkedForRollback();
       if (configuration.isOnePhaseCommit()) {
          if (trace)
             log.trace("Received prepare for tx: " + xid + " . Skipping call as 1PC will be used.");
@@ -99,6 +109,7 @@
          LocalTxInvocationContext ctx = icc.createTxInvocationContext();
          ctx.setXaCache(this);
          if (configuration.isOnePhaseCommit()) {
+            checkMarkedForRollback();
             if (trace) log.trace("Doing an 1PC prepare call on the interceptor chain");
             PrepareCommand command = commandsFactory.buildPrepareCommand(globalTx, modifications, true);
             try {
@@ -243,4 +254,24 @@
             ", txTimeout=" + txTimeout +
             '}';
    }
+
+   public boolean hasRemoteLocksAcquired(List<Address> leavers) {
+      if (log.isTraceEnabled()) {
+         log.trace("My remote locks: " + remoteLockedNodes + ", leavers are:" + leavers);
+      }
+      return (remoteLockedNodes != null) && !Collections.disjoint(remoteLockedNodes, leavers);
+   }
+
+   public void locksAcquired(Collection<Address> nodes) {
+      if (remoteLockedNodes == null) remoteLockedNodes = new HashSet<Address>();
+      remoteLockedNodes.addAll(nodes);
+   }
+
+   public void markForRollback() {
+      isMarkedForRollback = true;
+   }
+
+   private void checkMarkedForRollback() throws XAException {
+      if (isMarkedForRollback) throw new XAException(XAException.XA_RBOTHER);
+   }   
 }

Modified: branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -162,6 +162,7 @@
 
       assert c.getTransactionManagerLookupClass().equals("org.infinispan.transaction.lookup.GenericTransactionManagerLookup");
       assert c.isUseEagerLocking();
+      assert c.isEagerLockSingleNode();
       assert !c.isSyncRollbackPhase();
 
       c = namedCaches.get("syncRepl");

Added: branches/4.2.x/core/src/test/java/org/infinispan/lock/EagerLockingSingleLockTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/lock/EagerLockingSingleLockTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/lock/EagerLockingSingleLockTest.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -0,0 +1,164 @@
+package org.infinispan.lock;
+
+import org.infinispan.affinity.KeyAffinityService;
+import org.infinispan.affinity.KeyAffinityServiceFactory;
+import org.infinispan.affinity.RndKeyGenerator;
+import org.infinispan.config.Configuration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tester for https://jira.jboss.org/browse/ISPN-615.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "lock.EagerLockingSingleLockTest")
+public class EagerLockingSingleLockTest extends MultipleCacheManagersTest {
+   private KeyAffinityService kaf;
+   private ThreadPoolExecutor poolExecutor;
+
+   private static Log log = LogFactory.getLog(EagerLockingSingleLockTest.class);
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC, true);
+      config.setEagerLockSingleNode(true);
+      config.setNumOwners(2);
+      config.setLockAcquisitionTimeout(2000);
+      config.setUseEagerLocking(true);
+      config.setL1CacheEnabled(false);
+      createClusteredCaches(4, config);
+      TestingUtil.blockUntilViewReceived(cache(0), 4, 10000);
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2), cache(3));
+      poolExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
+      kaf = KeyAffinityServiceFactory.newKeyAffinityService(cache(0), poolExecutor, new RndKeyGenerator(), 10, true);
+   }
+
+   @AfterClass
+   public void cleanUp() {
+      kaf.stop();
+      poolExecutor.shutdownNow();
+   }
+
+   public void testSingleLockAcquiredRemotely() throws Exception {
+      log.trace("0 -> " + address(0));
+      log.trace("1 -> " + address(1));
+      log.trace("2 -> " + address(2));
+      log.trace("3 -> " + address(3));
+
+      Object k = kaf.getKeyForAddress(address(3));
+      cache(1).put(k, "1stValue");
+      TransactionManager tm = cache(0).getAdvancedCache().getTransactionManager();
+      tm.begin();
+      cache(1).put(k, "2ndValue");
+      assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+      assert TestingUtil.extractLockManager(cache(1)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+      assert TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+      Transaction lockOwnerTx = tm.suspend();
+
+      //now make sure other transactions won't be able to acquire locks
+      tm.begin();
+      try {
+         cache(2).put(k, "3rdValue");
+         assert false;
+      } catch (TimeoutException e) {
+         assertEquals(tm.getStatus(), Status.STATUS_MARKED_ROLLBACK);
+      }
+      tm.rollback();
+
+      //now check that the only node that has a remote lock is cache(3)
+      assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+      assert TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+      tm.resume(lockOwnerTx);
+      tm.commit();
+
+      //no locks are being held now
+      assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(1)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+      assertEquals(cache(0).get(k), "2ndValue");
+   }
+
+   @Test(dependsOnMethods = "testSingleLockAcquiredRemotely")
+   public void testSingleLockAcquiredLocally() throws Exception {
+      Object k = kaf.getKeyForAddress(address(1));
+      cache(1).put(k, "1stValue");
+      TransactionManager tm = cache(0).getAdvancedCache().getTransactionManager();
+
+      tm.begin();
+      cache(1).put(k, "2ndValue");//this acquires a local cache only
+
+      //now check that the only node that has a remote lock is cache(3)
+      assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+      assert TestingUtil.extractLockManager(cache(1)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+      tm.commit();
+
+      //no locks are being held now
+      assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(1)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+      assertEquals(cache(0).get(k), "2ndValue");
+   }
+
+   @Test(dependsOnMethods = "testSingleLockAcquiredLocally")
+   public void testLockOwnerFailure() throws Exception {
+      Object k = kaf.getKeyForAddress(address(3));
+      cache(1).put(k, "1stValue");
+      final TransactionManager tm = cache(0).getAdvancedCache().getTransactionManager();
+
+      tm.begin();
+      cache(1).put(k, "2ndValue");
+
+      //now check that the only node that has a remote lock is cache(3)
+      assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+      assert TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+      manager(3).stop();
+      TestingUtil.blockUntilViewReceived(cache(0), 3, 10000);
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
+
+      try {
+         log.trace("here it begins");
+         tm.commit();
+         assert false;
+      } catch (RollbackException re) {
+         //expected
+      }
+
+      //no locks are being held now
+      assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(1)).isLocked(k);
+      assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+
+      assertEquals(cache(0).get(k), "1stValue");
+   }
+}

Modified: branches/4.2.x/core/src/test/java/org/infinispan/notifications/cachelistener/CacheNotifierTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/notifications/cachelistener/CacheNotifierTest.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/test/java/org/infinispan/notifications/cachelistener/CacheNotifierTest.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -26,13 +26,14 @@
    private TransactionManager tm;
    private CacheNotifier mockNotifier;
    private CacheNotifier origNotifier;
+   private CacheContainer cm;
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
       Configuration c = new Configuration();
       c.setCacheMode(Configuration.CacheMode.LOCAL);
       c.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
-      CacheContainer cm = TestCacheManagerFactory.createCacheManager(c, true);
+      cm = TestCacheManagerFactory.createCacheManager(c, true);
 
       cache = cm.getCache();
       tm = TestingUtil.getTransactionManager(cache);
@@ -44,6 +45,7 @@
    public void tearDown() throws Exception {
       TestingUtil.replaceComponent(cache, CacheNotifier.class, origNotifier, true);
       TestingUtil.killCaches(cache);
+      cm.stop();
    }
 
    @AfterClass

Modified: branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java	2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java	2010-09-06 22:32:54 UTC (rev 2330)
@@ -4,6 +4,7 @@
 import org.infinispan.config.Configuration;
 import org.infinispan.manager.CacheContainer;
 import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -211,4 +212,8 @@
     * {@link #addClusterEnabledCacheManager()}
     */
    protected abstract void createCacheManagers() throws Throwable;
+
+   public Address address(int cacheIndex) {
+      return cache(cacheIndex).getAdvancedCache().getRpcManager().getAddress();
+   }
 }



More information about the infinispan-commits mailing list