[infinispan-commits] Infinispan SVN: r2330 - in branches/4.2.x/core/src: main/java/org/infinispan/config and 8 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Sep 6 18:32:55 EDT 2010
Author: mircea.markus
Date: 2010-09-06 18:32:54 -0400 (Mon, 06 Sep 2010)
New Revision: 2330
Added:
branches/4.2.x/core/src/test/java/org/infinispan/lock/EagerLockingSingleLockTest.java
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java
branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java
branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java
branches/4.2.x/core/src/test/java/org/infinispan/notifications/cachelistener/CacheNotifierTest.java
branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
Log:
[ISPN-615] - Eager locking and key affinity optimisation
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -117,8 +117,7 @@
@Override
public String toString() {
- return getClass().getSimpleName() + "{" +
- "gtx=" + globalTx +
+ return ", gtx=" + globalTx +
", cacheName='" + cacheName + '\'' +
'}';
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -432,6 +432,10 @@
this.transaction.setUseEagerLocking(useEagerLocking);
}
+ public void setEagerLockSingleNode(boolean eagerLockSingleNode) {
+ this.transaction.setEagerLockSingleNode(eagerLockSingleNode);
+ }
+
public void setUseReplQueue(boolean useReplQueue) {
this.clustering.async.setUseReplQueue(useReplQueue);
}
@@ -606,6 +610,10 @@
return transaction.useEagerLocking;
}
+ public boolean isEagerLockSingleNode() {
+ return transaction.eagerLockSingleNode;
+ }
+
public long getStateRetrievalTimeout() {
return clustering.stateRetrieval.timeout;
}
@@ -837,6 +845,17 @@
* */
@Dynamic
protected Boolean useEagerLocking = false;
+
+ /**
+ * @configRef desc="Only has effect for DIST mode and when useEagerLocking is set to true. When this is enabled, then only one node
+ * is locked in the cluster, disregarding numOwners config. On the opposite, if this is false, then on all cache.lock() calls
+ * numOwners RPCs are being performed. The node that gets locked is the main data owner, i.e. the node where data
+ * would reside if numOwners==1. If the node where the lock resides crashes, then the transaction is marked for rollback -
+ * data is in a consistent state, no fault tolerance."
+ */
+ @Dynamic
+ protected Boolean eagerLockSingleNode = false;
+
public TransactionType(String transactionManagerLookupClass) {
this.transactionManagerLookupClass = transactionManagerLookupClass;
@@ -872,8 +891,15 @@
public void setUseEagerLocking(Boolean useEagerLocking) {
testImmutability("useEagerLocking");
this.useEagerLocking = useEagerLocking;
+
}
+ @XmlAttribute
+ public void setEagerLockSingleNode(Boolean eagerLockSingleNode) {
+ testImmutability("useEagerLocking");
+ this.eagerLockSingleNode = eagerLockSingleNode;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/LocalTxInvocationContext.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -2,11 +2,13 @@
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.remoting.transport.Address;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.transaction.xa.TransactionXaAdapter;
import org.infinispan.util.BidirectionalMap;
import javax.transaction.Transaction;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -78,4 +80,8 @@
public boolean hasLockedKey(Object key) {
return xaAdapter != null && super.hasLockedKey(key);
}
+
+ public void remoteLocksAcquired(Collection<Address> nodes) {
+ xaAdapter.locksAcquired(nodes);
+ }
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -18,6 +18,7 @@
* A component that manages the distribution of elements across a cache cluster
*
* @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
@Scope(Scopes.NAMED_CACHE)
@@ -46,6 +47,11 @@
Map<Object, List<Address>> locateAll(Collection<Object> keys);
/**
+ * Same as {@link #locateAll(java.util.Collection)}, but the list of addresses only contains numOwners owners.
+ */
+ Map<Object, List<Address>> locateAll(Collection<Object> keys, int numOwners);
+
+ /**
* Transforms a cache entry so it is marked for L1 rather than the primary cache data structure. This should be done
* if it is deemed that the entry is targeted for L1 storage rather than storage in the primary data container.
*
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -259,13 +259,17 @@
}
public Map<Object, List<Address>> locateAll(Collection<Object> keys) {
+ return locateAll(keys, replCount);
+ }
+
+ public Map<Object, List<Address>> locateAll(Collection<Object> keys, int numOwners) {
if (consistentHash == null) {
Map<Object, List<Address>> m = new HashMap<Object, List<Address>>(keys.size());
List<Address> selfList = Collections.singletonList(self);
for (Object k : keys) m.put(k, selfList);
return m;
}
- return consistentHash.locateAll(keys, replCount);
+ return consistentHash.locateAll(keys, numOwners);
}
public void transformForL1(CacheEntry entry) {
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -19,6 +19,7 @@
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
@@ -30,6 +31,7 @@
import org.infinispan.util.concurrent.NotifyingFutureImpl;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@@ -41,6 +43,7 @@
* The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
*
* @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
* @since 4.0
*/
public class DistributionInterceptor extends BaseRpcInterceptor {
@@ -189,8 +192,28 @@
@Override
public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
- if (ctx.isOriginLocal())
- rpcManager.invokeRemotely(dm.getAffectedNodes(command.getKeys()), command, true, true);
+ if (ctx.isOriginLocal()) {
+ if (configuration.isEagerLockSingleNode()) {
+ //only main data owner is locked, see: https://jira.jboss.org/browse/ISPN-615
+ Map<Object, List<Address>> toMulticast = dm.locateAll(command.getKeys(), 1);
+
+ //now compile address reunion
+ List<Address> where;
+ if (toMulticast.size() == 1) {//avoid building an extra array, as most often this will be a single key
+ where = toMulticast.values().iterator().next();
+ rpcManager.invokeRemotely(where, command, true, true);
+ } else {
+ where = new ArrayList<Address>();
+ for (List<Address> values: toMulticast.values()) {
+ where.addAll(values);
+ }
+ rpcManager.invokeRemotely(where, command, true, true);
+ }
+ ((LocalTxInvocationContext)ctx).remoteLocksAcquired(where);
+ } else {
+ rpcManager.invokeRemotely(dm.getAffectedNodes(command.getKeys()), command, true, true);
+ }
+ }
return invokeNextInterceptor(ctx, command);
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -58,6 +58,7 @@
* Interceptor to implement <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a> functionality.
*
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @author Mircea.Markus at jboss.com
* @see <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC designs</a>
* @since 4.0
*/
@@ -156,9 +157,7 @@
if (goRemoteFirst) {
Object result = invokeNextInterceptor(ctx, c);
try {
- for (Object key : c.getKeys()) {
- entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
- }
+ lockKeysForRemoteTx(ctx, c);
} catch (Throwable e) {
//if anything happen during locking then unlock remote
c.setUnlock(true);
@@ -167,9 +166,7 @@
}
return result;
} else {
- for (Object key : c.getKeys()) {
- entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
- }
+ lockKeysForRemoteTx(ctx, c);
if (shouldInvokeOnCluster || c.isExplicit())
return invokeNextInterceptor(ctx, c);
else
@@ -186,6 +183,12 @@
}
}
+ private void lockKeysForRemoteTx(TxInvocationContext ctx, LockControlCommand c) throws InterruptedException {
+ for (Object key : c.getKeys()) {
+ entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
+ }
+ }
+
// write commands
@Override
Modified: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -89,35 +89,47 @@
final List<Address> leavers = MembershipArithmetic.getMembersLeft(vce.getOldMembers(), vce.getNewMembers());
if (!leavers.isEmpty()) {
if (trace) log.trace("Saw {0} leavers - kicking off a lock breaking task", leavers.size());
- try {
- lockBreakingService.submit(new Runnable() {
- public void run() {
- Set<GlobalTransaction> toKill = new HashSet<GlobalTransaction>();
- for (GlobalTransaction gt : remoteTransactions.keySet()) {
- if (leavers.contains(gt.getAddress())) toKill.add(gt);
- }
+ cleanTxForWhichTheOwnerLeft(leavers);
+ if (configuration.isUseEagerLocking() && configuration.isEagerLockSingleNode() && configuration.getCacheMode().isDistributed()) {
+ for (TransactionXaAdapter xaAdapter : localTransactions.values()) {
+ if (xaAdapter.hasRemoteLocksAcquired(leavers)) {
+ xaAdapter.markForRollback();
+ }
+ }
+ }
+ }
+ }
- if (trace)
- log.trace("Global transactions {0} pertain to leavers list {1} and need to be killed", toKill, leavers);
+ private void cleanTxForWhichTheOwnerLeft(final List<Address> leavers) {
+ try {
+ lockBreakingService.submit(new Runnable() {
+ public void run() {
+ Set<GlobalTransaction> toKill = new HashSet<GlobalTransaction>();
+ for (GlobalTransaction gt : remoteTransactions.keySet()) {
+ if (leavers.contains(gt.getAddress())) toKill.add(gt);
+ }
- for (GlobalTransaction gtx : toKill) {
- if (trace) log.trace("Killing {0}", gtx);
- RollbackCommand rc = new RollbackCommand(gtx);
- rc.init(invoker, icc, TransactionTable.this);
- try {
- rc.perform(null);
- } catch (Throwable e) {
- log.warn("Unable to roll back gtx " + gtx, e);
- } finally {
- removeRemoteTransaction(gtx);
- }
+ if (trace)
+ log.trace("Global transactions {0} pertain to leavers list {1} and need to be killed", toKill, leavers);
+
+ for (GlobalTransaction gtx : toKill) {
+ if (trace) log.trace("Killing {0}", gtx);
+ RollbackCommand rc = new RollbackCommand(gtx);
+ rc.init(invoker, icc, TransactionTable.this);
+ try {
+ rc.perform(null);
+ } catch (Throwable e) {
+ log.warn("Unable to roll back gtx " + gtx, e);
+ } finally {
+ removeRemoteTransaction(gtx);
}
}
- });
- } catch (RejectedExecutionException ree) {
- log.debug("Unable to submit task to executor", ree);
- }
+ }
+ });
+ } catch (RejectedExecutionException ree) {
+ log.debug("Unable to submit task to executor", ree);
}
+
}
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -10,6 +10,7 @@
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.interceptors.InterceptorChain;
+import org.infinispan.remoting.transport.Address;
import org.infinispan.util.BidirectionalLinkedHashMap;
import org.infinispan.util.BidirectionalMap;
import org.infinispan.util.InfinispanCollections;
@@ -21,7 +22,11 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* This acts both as an local {@link org.infinispan.transaction.xa.CacheTransaction} and implementor of an {@link
@@ -50,6 +55,10 @@
private TransactionTable txTable;
private Transaction transaction;
+ private Set<Address> remoteLockedNodes;
+ private boolean isMarkedForRollback;
+
+
public TransactionXaAdapter(GlobalTransaction globalTx, InvocationContextContainer icc, InterceptorChain invoker,
CommandsFactory commandsFactory, Configuration configuration, TransactionTable txTable,
Transaction transaction) {
@@ -71,6 +80,7 @@
}
public int prepare(Xid xid) throws XAException {
+ checkMarkedForRollback();
if (configuration.isOnePhaseCommit()) {
if (trace)
log.trace("Received prepare for tx: " + xid + " . Skipping call as 1PC will be used.");
@@ -99,6 +109,7 @@
LocalTxInvocationContext ctx = icc.createTxInvocationContext();
ctx.setXaCache(this);
if (configuration.isOnePhaseCommit()) {
+ checkMarkedForRollback();
if (trace) log.trace("Doing an 1PC prepare call on the interceptor chain");
PrepareCommand command = commandsFactory.buildPrepareCommand(globalTx, modifications, true);
try {
@@ -243,4 +254,24 @@
", txTimeout=" + txTimeout +
'}';
}
+
+ public boolean hasRemoteLocksAcquired(List<Address> leavers) {
+ if (log.isTraceEnabled()) {
+ log.trace("My remote locks: " + remoteLockedNodes + ", leavers are:" + leavers);
+ }
+ return (remoteLockedNodes != null) && !Collections.disjoint(remoteLockedNodes, leavers);
+ }
+
+ public void locksAcquired(Collection<Address> nodes) {
+ if (remoteLockedNodes == null) remoteLockedNodes = new HashSet<Address>();
+ remoteLockedNodes.addAll(nodes);
+ }
+
+ public void markForRollback() {
+ isMarkedForRollback = true;
+ }
+
+ private void checkMarkedForRollback() throws XAException {
+ if (isMarkedForRollback) throw new XAException(XAException.XA_RBOTHER);
+ }
}
Modified: branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -162,6 +162,7 @@
assert c.getTransactionManagerLookupClass().equals("org.infinispan.transaction.lookup.GenericTransactionManagerLookup");
assert c.isUseEagerLocking();
+ assert c.isEagerLockSingleNode();
assert !c.isSyncRollbackPhase();
c = namedCaches.get("syncRepl");
Added: branches/4.2.x/core/src/test/java/org/infinispan/lock/EagerLockingSingleLockTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/lock/EagerLockingSingleLockTest.java (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/lock/EagerLockingSingleLockTest.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -0,0 +1,164 @@
+package org.infinispan.lock;
+
+import org.infinispan.affinity.KeyAffinityService;
+import org.infinispan.affinity.KeyAffinityServiceFactory;
+import org.infinispan.affinity.RndKeyGenerator;
+import org.infinispan.config.Configuration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tester for https://jira.jboss.org/browse/ISPN-615.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "lock.EagerLockingSingleLockTest")
+public class EagerLockingSingleLockTest extends MultipleCacheManagersTest {
+ private KeyAffinityService kaf;
+ private ThreadPoolExecutor poolExecutor;
+
+ private static Log log = LogFactory.getLog(EagerLockingSingleLockTest.class);
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC, true);
+ config.setEagerLockSingleNode(true);
+ config.setNumOwners(2);
+ config.setLockAcquisitionTimeout(2000);
+ config.setUseEagerLocking(true);
+ config.setL1CacheEnabled(false);
+ createClusteredCaches(4, config);
+ TestingUtil.blockUntilViewReceived(cache(0), 4, 10000);
+ BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2), cache(3));
+ poolExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
+ kaf = KeyAffinityServiceFactory.newKeyAffinityService(cache(0), poolExecutor, new RndKeyGenerator(), 10, true);
+ }
+
+ @AfterClass
+ public void cleanUp() {
+ kaf.stop();
+ poolExecutor.shutdownNow();
+ }
+
+ public void testSingleLockAcquiredRemotely() throws Exception {
+ log.trace("0 -> " + address(0));
+ log.trace("1 -> " + address(1));
+ log.trace("2 -> " + address(2));
+ log.trace("3 -> " + address(3));
+
+ Object k = kaf.getKeyForAddress(address(3));
+ cache(1).put(k, "1stValue");
+ TransactionManager tm = cache(0).getAdvancedCache().getTransactionManager();
+ tm.begin();
+ cache(1).put(k, "2ndValue");
+ assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+ assert TestingUtil.extractLockManager(cache(1)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+ assert TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+ Transaction lockOwnerTx = tm.suspend();
+
+ //now make sure other transactions won't be able to acquire locks
+ tm.begin();
+ try {
+ cache(2).put(k, "3rdValue");
+ assert false;
+ } catch (TimeoutException e) {
+ assertEquals(tm.getStatus(), Status.STATUS_MARKED_ROLLBACK);
+ }
+ tm.rollback();
+
+ //now check that the only node that has a remote lock is cache(3)
+ assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+ assert TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+ tm.resume(lockOwnerTx);
+ tm.commit();
+
+ //no locks are being held now
+ assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(1)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+ assertEquals(cache(0).get(k), "2ndValue");
+ }
+
+ @Test(dependsOnMethods = "testSingleLockAcquiredRemotely")
+ public void testSingleLockAcquiredLocally() throws Exception {
+ Object k = kaf.getKeyForAddress(address(1));
+ cache(1).put(k, "1stValue");
+ TransactionManager tm = cache(0).getAdvancedCache().getTransactionManager();
+
+ tm.begin();
+ cache(1).put(k, "2ndValue");//this acquires a local cache only
+
+ //now check that the only node that has a remote lock is cache(3)
+ assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+ assert TestingUtil.extractLockManager(cache(1)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+ tm.commit();
+
+ //no locks are being held now
+ assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(1)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+ assertEquals(cache(0).get(k), "2ndValue");
+ }
+
+ @Test(dependsOnMethods = "testSingleLockAcquiredLocally")
+ public void testLockOwnerFailure() throws Exception {
+ Object k = kaf.getKeyForAddress(address(3));
+ cache(1).put(k, "1stValue");
+ final TransactionManager tm = cache(0).getAdvancedCache().getTransactionManager();
+
+ tm.begin();
+ cache(1).put(k, "2ndValue");
+
+ //now check that the only node that has a remote lock is cache(3)
+ assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+ assert TestingUtil.extractLockManager(cache(3)).isLocked(k);
+
+ manager(3).stop();
+ TestingUtil.blockUntilViewReceived(cache(0), 3, 10000);
+ BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
+
+ try {
+ log.trace("here it begins");
+ tm.commit();
+ assert false;
+ } catch (RollbackException re) {
+ //expected
+ }
+
+ //no locks are being held now
+ assert !TestingUtil.extractLockManager(cache(0)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(1)).isLocked(k);
+ assert !TestingUtil.extractLockManager(cache(2)).isLocked(k);
+
+ assertEquals(cache(0).get(k), "1stValue");
+ }
+}
Modified: branches/4.2.x/core/src/test/java/org/infinispan/notifications/cachelistener/CacheNotifierTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/notifications/cachelistener/CacheNotifierTest.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/test/java/org/infinispan/notifications/cachelistener/CacheNotifierTest.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -26,13 +26,14 @@
private TransactionManager tm;
private CacheNotifier mockNotifier;
private CacheNotifier origNotifier;
+ private CacheContainer cm;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
Configuration c = new Configuration();
c.setCacheMode(Configuration.CacheMode.LOCAL);
c.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
- CacheContainer cm = TestCacheManagerFactory.createCacheManager(c, true);
+ cm = TestCacheManagerFactory.createCacheManager(c, true);
cache = cm.getCache();
tm = TestingUtil.getTransactionManager(cache);
@@ -44,6 +45,7 @@
public void tearDown() throws Exception {
TestingUtil.replaceComponent(cache, CacheNotifier.class, origNotifier, true);
TestingUtil.killCaches(cache);
+ cm.stop();
}
@AfterClass
Modified: branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java 2010-09-06 17:47:30 UTC (rev 2329)
+++ branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java 2010-09-06 22:32:54 UTC (rev 2330)
@@ -4,6 +4,7 @@
import org.infinispan.config.Configuration;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -211,4 +212,8 @@
* {@link #addClusterEnabledCacheManager()}
*/
protected abstract void createCacheManagers() throws Throwable;
+
+ public Address address(int cacheIndex) {
+ return cache(cacheIndex).getAdvancedCache().getRpcManager().getAddress();
+ }
}
More information about the infinispan-commits
mailing list