[infinispan-commits] Infinispan SVN: r2544 - in branches/4.2.x/core/src: main/java/org/infinispan/interceptors and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Oct 21 08:53:03 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-10-21 08:53:03 -0400 (Thu, 21 Oct 2010)
New Revision: 2544
Added:
branches/4.2.x/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
Log:
ISPN-711 - Locks not released when committing a transaction which contains explicit locks but no modifications
Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java 2010-10-21 12:42:20 UTC (rev 2543)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java 2010-10-21 12:53:03 UTC (rev 2544)
@@ -97,7 +97,7 @@
* @see LockControlCommand.java
* https://jira.jboss.org/jira/browse/ISPN-48
*/
- remoteTransaction.setModifications(modifications);
+ remoteTransaction.setModifications(getModifications());
}
// 2. then set it on the invocation context
@@ -121,7 +121,7 @@
}
public WriteCommand[] getModifications() {
- return modifications;
+ return modifications == null ? new WriteCommand[]{} : modifications;
}
public boolean isOnePhaseCommit() {
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2010-10-21 12:42:20 UTC (rev 2543)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2010-10-21 12:53:03 UTC (rev 2544)
@@ -33,6 +33,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -55,14 +56,12 @@
static final RecipientGenerator CLEAR_COMMAND_GENERATOR = new RecipientGenerator() {
- private final Object[] EMPTY_ARRAY = {};
-
public List<Address> generateRecipients() {
return null;
}
- public Object[] getKeys() {
- return EMPTY_ARRAY;
+ public Collection<Object> getKeys() {
+ return Collections.emptySet();
}
};
@@ -106,7 +105,9 @@
*
* @param ctx invocation context
* @param key key to retrieve
+ *
* @return value of a remote get, or null
+ *
* @throws Throwable if there are problems
*/
private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key, boolean dmWasRehashingDuringLocalLookup) throws Throwable {
@@ -152,6 +153,7 @@
* Tests whether a key is in the L1 cache if L1 is enabled.
*
* @param key key to check for
+ *
* @return true if the key is not in L1, or L1 caching is not enabled. false the key is in L1.
*/
private boolean isNotInL1(Object key) {
@@ -169,14 +171,14 @@
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
// don't bother with a remote get for the PutMapCommand!
return handleWriteCommand(ctx, command,
- new MultipleKeysRecipientGenerator(command.getMap().keySet()), true);
+ new MultipleKeysRecipientGenerator(command.getMap().keySet()), true);
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
return handleWriteCommand(ctx, command,
- new SingleKeyRecipientGenerator(command.getKey()), false);
+ new SingleKeyRecipientGenerator(command.getKey()), false);
}
@Override
@@ -187,7 +189,7 @@
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
return handleWriteCommand(ctx, command,
- new SingleKeyRecipientGenerator(command.getKey()), false);
+ new SingleKeyRecipientGenerator(command.getKey()), false);
}
@Override
@@ -201,18 +203,16 @@
List<Address> where;
if (toMulticast.size() == 1) {//avoid building an extra array, as most often this will be a single key
where = toMulticast.values().iterator().next();
- rpcManager.invokeRemotely(where, command, true, true);
} else {
- where = new ArrayList<Address>();
- for (List<Address> values: toMulticast.values()) {
- where.addAll(values);
- }
- rpcManager.invokeRemotely(where, command, true, true);
+ where = new LinkedList<Address>();
+ for (List<Address> values : toMulticast.values()) where.addAll(values);
}
- ((LocalTxInvocationContext)ctx).remoteLocksAcquired(where);
+ rpcManager.invokeRemotely(where, command, true, true);
+ ((LocalTxInvocationContext) ctx).remoteLocksAcquired(where);
} else {
rpcManager.invokeRemotely(dm.getAffectedNodes(command.getKeys()), command, true, true);
}
+ ctx.addAffectedKeys(command.getKeys());
}
return invokeNextInterceptor(ctx, command);
}
@@ -221,9 +221,9 @@
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
- if (ctx.isOriginLocal() && ctx.hasModifications()) {
+ if (shouldInvokeRemoteTxCommand(ctx)) {
List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
- NotifyingNotifiableFuture<Object> f = flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), null);
+ NotifyingNotifiableFuture<Object> f = flushL1Cache(recipients.size(), ctx.getLockedKeys(), null);
rpcManager.invokeRemotely(recipients, command, configuration.isSyncCommitPhase(), true);
if (f != null) f.get();
}
@@ -236,11 +236,11 @@
boolean sync = isSynchronous(ctx);
- if (ctx.isOriginLocal() && ctx.hasModifications()) {
+ if (shouldInvokeRemoteTxCommand(ctx)) {
List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
NotifyingNotifiableFuture<Object> f = null;
if (command.isOnePhaseCommit())
- f = flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), null);
+ f = flushL1Cache(recipients.size(), ctx.getLockedKeys(), null);
// this method will return immediately if we're the only member (because exclude_self=true)
rpcManager.invokeRemotely(recipients, command, sync);
if (f != null) f.get();
@@ -250,7 +250,7 @@
@Override
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
- if (ctx.isOriginLocal())
+ if (shouldInvokeRemoteTxCommand(ctx))
rpcManager.invokeRemotely(dm.getAffectedNodes(ctx.getAffectedKeys()), command, configuration.isSyncRollbackPhase(), true);
return invokeNextInterceptor(ctx, command);
}
@@ -269,19 +269,7 @@
return !ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && needReliableReturnValues;
}
- private Object[] getKeys(List<WriteCommand> mods) {
- List<Object> l = new LinkedList<Object>();
- for (WriteCommand m : mods) {
- if (m instanceof DataCommand) {
- l.add(((DataCommand) m).getKey());
- } else if (m instanceof PutMapCommand) {
- l.addAll(((PutMapCommand) m).getMap().keySet());
- }
- }
- return l.toArray(new Object[l.size()]);
- }
-
- private NotifyingNotifiableFuture<Object> flushL1Cache(int numCallRecipients, Object[] keys, Object retval) {
+ private NotifyingNotifiableFuture<Object> flushL1Cache(int numCallRecipients, Collection<Object> keys, Object retval) {
if (isL1CacheEnabled && numCallRecipients > 0 && rpcManager.getTransport().getMembers().size() > numCallRecipients) {
if (trace) log.trace("Invalidating L1 caches");
InvalidateCommand ic = cf.buildInvalidateFromL1Command(false, keys);
@@ -344,7 +332,7 @@
}
interface KeyGenerator {
- Object[] getKeys();
+ Collection<Object> getKeys();
}
interface RecipientGenerator extends KeyGenerator {
@@ -353,12 +341,12 @@
class SingleKeyRecipientGenerator implements RecipientGenerator {
final Object key;
- final Object[] keysArray;
+ final Set<Object> keys;
List<Address> recipients = null;
SingleKeyRecipientGenerator(Object key) {
this.key = key;
- keysArray = new Object[]{key};
+ keys = Collections.singleton(key);
}
public List<Address> generateRecipients() {
@@ -366,20 +354,18 @@
return recipients;
}
- public Object[] getKeys() {
- return keysArray;
+ public Collection<Object> getKeys() {
+ return keys;
}
}
class MultipleKeysRecipientGenerator implements RecipientGenerator {
final Collection<Object> keys;
- final Object[] keysArray;
List<Address> recipients = null;
MultipleKeysRecipientGenerator(Collection<Object> keys) {
this.keys = keys;
- keysArray = keys.toArray();
}
public List<Address> generateRecipients() {
@@ -392,8 +378,8 @@
return recipients;
}
- public Object[] getKeys() {
- return keysArray;
+ public Collection<Object> getKeys() {
+ return keys;
}
}
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java 2010-10-21 12:42:20 UTC (rev 2543)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java 2010-10-21 12:53:03 UTC (rev 2544)
@@ -131,7 +131,7 @@
Object retval = invokeNextInterceptor(ctx, command);
if (trace) log.trace("Entering InvalidationInterceptor's prepare phase");
// fetch the modifications before the transaction is committed (and thus removed from the txTable)
- if (ctx.hasModifications() && ctx.isOriginLocal()) {
+ if (shouldInvokeRemoteTxCommand(ctx)) {
List<WriteCommand> mods = Arrays.asList(command.getModifications());
Transaction runningTransaction = ctx.getRunningTransaction();
if (runningTransaction == null) throw new IllegalStateException("we must have an associated transaction");
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java 2010-10-21 12:42:20 UTC (rev 2543)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java 2010-10-21 12:53:03 UTC (rev 2544)
@@ -49,7 +49,7 @@
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
if (!ctx.isInTxScope()) throw new IllegalStateException("This should not be possible!");
- if (ctx.isOriginLocal()) {
+ if (shouldInvokeRemoteTxCommand(ctx)) {
rpcManager.broadcastRpcCommand(command, configuration.isSyncCommitPhase(), true);
}
return invokeNextInterceptor(ctx, command);
@@ -58,7 +58,7 @@
@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
Object retVal = invokeNextInterceptor(ctx, command);
- if (ctx.isOriginLocal() && command.hasModifications()) {
+ if (shouldInvokeRemoteTxCommand(ctx)) {
boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
rpcManager.broadcastRpcCommand(command, !async, false);
}
@@ -67,7 +67,7 @@
@Override
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
- if (ctx.isOriginLocal() && !configuration.isOnePhaseCommit()) {
+ if (shouldInvokeRemoteTxCommand(ctx) && !configuration.isOnePhaseCommit()) {
rpcManager.broadcastRpcCommand(command, configuration.isSyncRollbackPhase(), true);
}
return invokeNextInterceptor(ctx, command);
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2010-10-21 12:42:20 UTC (rev 2543)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2010-10-21 12:53:03 UTC (rev 2544)
@@ -79,4 +79,10 @@
}
return false;
}
+
+ protected final boolean shouldInvokeRemoteTxCommand(TxInvocationContext ctx) {
+ // just testing for empty modifications isn't enough - the Lock API may acquire locks on keys but won't
+ // register a Modification. See ISPN-711.
+ return ctx.isOriginLocal() && (ctx.hasModifications() || !ctx.getLockedKeys().isEmpty());
+ }
}
\ No newline at end of file
Added: branches/4.2.x/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java 2010-10-21 12:53:03 UTC (rev 2544)
@@ -0,0 +1,62 @@
+package org.infinispan.lock;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.CleanupAfterMethod;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+ at Test(testName = "lock.StaleLocksTransactionTest", groups = "functional")
+ at CleanupAfterMethod
+public class StaleLocksTransactionTest extends MultipleCacheManagersTest {
+
+ Cache<String, String> c1, c2;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration cfg = TestCacheManagerFactory.getDefaultConfiguration(true, Configuration.CacheMode.DIST_SYNC);
+ cfg.setLockAcquisitionTimeout(100);
+ EmbeddedCacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+ EmbeddedCacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+ registerCacheManager(cm1, cm2);
+ c1 = cm1.getCache();
+ c2 = cm2.getCache();
+ }
+
+ public void testNoModsCommit() throws Exception {
+ doTest(false, true);
+ }
+
+ public void testModsRollback() throws Exception {
+ doTest(true, false);
+ }
+
+ public void testNoModsRollback() throws Exception {
+ doTest(false, false);
+ }
+
+ public void testModsCommit() throws Exception {
+ doTest(true, true);
+ }
+
+ private void doTest(boolean mods, boolean commit) throws Exception {
+ tm(c1).begin();
+ c1.getAdvancedCache().lock("k");
+ assert c1.get("k") == null;
+ if (mods) c1.put("k", "v");
+ if (commit)
+ tm(c1).commit();
+ else
+ tm(c1).rollback();
+
+ assertNotLocked(c1, "k");
+ assertNotLocked(c2, "k");
+ }
+}
More information about the infinispan-commits
mailing list