[infinispan-commits] Infinispan SVN: r714 - in trunk/core/src: main/java/org/infinispan/factories and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Aug 20 21:52:39 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-08-20 21:52:39 -0400 (Thu, 20 Aug 2009)
New Revision: 714
Modified:
trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
trunk/core/src/main/java/org/infinispan/factories/EntryFactoryImpl.java
trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
Log:
More dist tests
Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java 2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java 2009-08-21 01:52:39 UTC (rev 714)
@@ -1,5 +1,8 @@
package org.infinispan.distribution;
+import org.infinispan.commands.tx.CommitCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.WriteCommand;
import java.util.Collection;
@@ -47,6 +50,12 @@
*/
boolean logIfNeeded(WriteCommand command);
+ void logIfNeeded(PrepareCommand command);
+
+ void logIfNeeded(CommitCommand command);
+
+ void logIfNeeded(RollbackCommand command);
+
/**
* If logging is enabled, will log the commands and return true. Otherwise, will just return false.
*
Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java 2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java 2009-08-21 01:52:39 UTC (rev 714)
@@ -1,11 +1,17 @@
package org.infinispan.distribution;
+import org.infinispan.commands.tx.CommitCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.transaction.xa.GlobalTransaction;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -20,6 +26,7 @@
volatile boolean enabled;
final ReadWriteLock loggingLock = new ReentrantReadWriteLock();
final BlockingQueue<WriteCommand> commandQueue = new LinkedBlockingQueue<WriteCommand>();
+ final Map<GlobalTransaction, PrepareCommand> uncommittedPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
public void enable() {
enabled = true;
@@ -42,31 +49,87 @@
}
public boolean logIfNeeded(WriteCommand command) {
- loggingLock.readLock().lock();
- try {
- if (enabled) {
- commandQueue.add(command);
- return true;
- } else {
- return false;
+ if (enabled) {
+ loggingLock.readLock().lock();
+ try {
+ if (enabled) {
+ try {
+ commandQueue.put(command);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return true;
+ }
+ } finally {
+ loggingLock.readLock().unlock();
}
- } finally {
- loggingLock.readLock().unlock();
}
+ return false;
}
+ public void logIfNeeded(PrepareCommand command) {
+ if (enabled) {
+ loggingLock.readLock().lock();
+ try {
+ if (enabled) {
+ uncommittedPrepares.put(command.getGlobalTransaction(), command);
+ }
+ } finally {
+ loggingLock.readLock().unlock();
+ }
+ }
+ }
+
+ public void logIfNeeded(CommitCommand command) {
+ if (enabled) {
+ loggingLock.readLock().lock();
+ try {
+ if (enabled) {
+ PrepareCommand pc = uncommittedPrepares.remove(command.getGlobalTransaction());
+ for (WriteCommand wc : pc.getModifications())
+ try {
+ commandQueue.put(wc);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } finally {
+ loggingLock.readLock().unlock();
+ }
+ }
+ }
+
+ public void logIfNeeded(RollbackCommand command) {
+ if (enabled) {
+ loggingLock.readLock().lock();
+ try {
+ if (enabled) {
+ uncommittedPrepares.remove(command.getGlobalTransaction());
+ }
+ } finally {
+ loggingLock.readLock().unlock();
+ }
+ }
+ }
+
public boolean logIfNeeded(Collection<WriteCommand> commands) {
- loggingLock.readLock().lock();
- try {
- if (enabled) {
- for (WriteCommand command : commands) commandQueue.add(command);
- return true;
- } else {
- return false;
+ if (enabled) {
+ loggingLock.readLock().lock();
+ try {
+ if (enabled) {
+ for (WriteCommand command : commands)
+ try {
+ commandQueue.put(command);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return true;
+ }
+ } finally {
+ loggingLock.readLock().unlock();
}
- } finally {
- loggingLock.readLock().unlock();
}
+ return false;
}
public int size() {
Modified: trunk/core/src/main/java/org/infinispan/factories/EntryFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EntryFactoryImpl.java 2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/factories/EntryFactoryImpl.java 2009-08-21 01:52:39 UTC (rev 714)
@@ -34,6 +34,7 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.util.Util;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.concurrent.locks.LockManager;
@@ -129,7 +130,7 @@
if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry;
mvccEntry.setRemoved(false);
mvccEntry.setValid(true);
- }
+ }
return mvccEntry;
@@ -178,7 +179,7 @@
* @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
* already held)
* @throws InterruptedException if interrupted
- * @throws org.infinispan.lock.TimeoutException
+ * @throws org.infinispan.util.concurrent.TimeoutException
* if we are unable to acquire the lock after a specified timeout.
*/
public final boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
@@ -195,7 +196,7 @@
return true;
} else {
Object owner = lockManager.getOwner(key);
- throw new TimeoutException("Unable to acquire lock on key [" + key + "] for requestor [" +
+ throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
}
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java 2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java 2009-08-21 01:52:39 UTC (rev 714)
@@ -1,14 +1,19 @@
package org.infinispan.interceptors;
import org.infinispan.commands.AbstractVisitor;
+import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.VisitableCommand;
-import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.tx.CommitCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.tx.RollbackCommand;
+import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
@@ -48,6 +53,60 @@
}
}
+ @Override
+ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand cmd) throws Throwable {
+ dm.getTransactionLogger().logIfNeeded(cmd);
+ return super.visitPrepareCommand(ctx, cmd);
+ }
+
+ @Override
+ public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand cmd) throws Throwable {
+ dm.getTransactionLogger().logIfNeeded(cmd);
+ return super.visitRollbackCommand(ctx, cmd);
+ }
+
+ @Override
+ public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand cmd) throws Throwable {
+ dm.getTransactionLogger().logIfNeeded(cmd);
+ return super.visitCommitCommand(ctx, cmd);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
+ Object o = super.visitPutKeyValueCommand(ctx, command);
+ if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+ return o;
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
+ Object o = super.visitRemoveCommand(ctx, command);
+ if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+ return o;
+ }
+
+ @Override
+ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
+ Object o = super.visitReplaceCommand(ctx, command);
+ if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+ return o;
+ }
+
+ @Override
+ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
+ Object o = super.visitClearCommand(ctx, command);
+ if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+ return o;
+ }
+
+ @Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
+ Object o = super.visitPutMapCommand(ctx, command);
+ if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+ return o;
+ }
+
+
class ReplayCommandVisitor extends AbstractVisitor {
@Override
public Object visitPutMapCommand(InvocationContext ignored, PutMapCommand command) {
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-08-21 01:52:39 UTC (rev 714)
@@ -1,6 +1,7 @@
package org.infinispan.interceptors;
import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.DataCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.CommitCommand;
@@ -31,6 +32,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -181,6 +183,7 @@
if (ctx.isOriginLocal()) {
List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
rpcManager.invokeRemotely(recipients, command, configuration.isSyncCommitPhase(), true);
+ flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), false, null, configuration.isSyncCommitPhase());
}
return invokeNextInterceptor(ctx, command);
}
@@ -196,6 +199,8 @@
if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
// this method will return immediately if we're the only member (because exclude_self=true)
rpcManager.invokeRemotely(recipients, command, sync);
+ if (command.isOnePhaseCommit())
+ flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), false, null, false);
}
return retVal;
}
@@ -223,6 +228,33 @@
return !ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && needReliableReturnValues;
}
+ private Object[] getKeys(List<WriteCommand> mods) {
+ List<Object> l = new LinkedList<Object>();
+ for (WriteCommand m : mods) {
+ if (m instanceof DataCommand) {
+ l.add(((DataCommand) m).getKey());
+ } else if (m instanceof PutMapCommand) {
+ l.addAll(((PutMapCommand) m).getMap().keySet());
+ }
+ }
+ return l.toArray(new Object[l.size()]);
+ }
+
+ private NotifyingNotifiableFuture<Object> flushL1Cache(int numCallRecipients, Object[] keys, boolean useFuture, Object retval, boolean sync) {
+ if (isL1CacheEnabled && numCallRecipients > 0 && rpcManager.getTransport().getMembers().size() > numCallRecipients) {
+ if (trace) log.trace("Invalidating L1 caches");
+ InvalidateCommand ic = cf.buildInvalidateFromL1Command(keys);
+ if (useFuture) {
+ NotifyingNotifiableFuture<Object> future = new AggregatingNotifyingFutureImpl(retval, 2);
+ rpcManager.broadcastRpcCommandInFuture(ic, future);
+ return future;
+ } else {
+ rpcManager.broadcastRpcCommand(ic, sync);
+ }
+ }
+ return null;
+ }
+
/**
* If we are within one transaction we won't do any replication as replication would only be performed at commit
* time. If the operation didn't originate locally we won't do any replication either.
@@ -245,17 +277,7 @@
if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
boolean useFuture = ctx.isUseFutureReturnType();
boolean sync = isSynchronous(ctx);
- NotifyingNotifiableFuture<Object> future = null;
- // if L1 caching is used make sure we broadcast an invalidate message
- if (isL1CacheEnabled && rec != null && rpcManager.getTransport().getMembers().size() > rec.size()) {
- InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
- if (useFuture) {
- future = new AggregatingNotifyingFutureImpl(returnValue, 2);
- rpcManager.broadcastRpcCommandInFuture(ic, future);
- } else {
- rpcManager.broadcastRpcCommand(ic, sync);
- }
- }
+ NotifyingNotifiableFuture<Object> future = flushL1Cache(rec == null ? 0 : rec.size(), recipientGenerator.getKeys(), useFuture, returnValue, sync);
if (useFuture) {
if (future == null) future = new NotifyingFutureImpl(returnValue);
Modified: trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java 2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java 2009-08-21 01:52:39 UTC (rev 714)
@@ -7,7 +7,6 @@
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.ClearCommand;
-import org.infinispan.commands.write.EvictCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
@@ -148,11 +147,6 @@
}
@Override
- public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable {
- return invokeNextInterceptor(ctx, command);
- }
-
- @Override
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand invalidateCommand) throws Throwable {
return enlistWriteAndInvokeNext(ctx, invalidateCommand);
}
Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-08-21 01:52:39 UTC (rev 714)
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
@Test(groups = "functional", testName = "distribution.BaseDistFunctionalTest")
@@ -45,6 +46,8 @@
configuration.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
}
if (tx) configuration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ configuration.setSyncReplTimeout(60, TimeUnit.SECONDS);
+ configuration.setLockAcquisitionTimeout(45, TimeUnit.SECONDS);
caches = createClusteredCaches(4, cacheName, configuration);
reorderBasedOnCHPositions();
Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java 2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java 2009-08-21 01:52:39 UTC (rev 714)
@@ -96,7 +96,6 @@
* More complex - init some state. Start a new transaction, and midway trigger a rehash. Then complete transaction
* and test results.
*/
- @Test(enabled = false, description = "Enable after releasing Beta1")
public void testTransactional() throws Exception {
final List<MagicKey> keys = init();
final CountDownLatch l = new CountDownLatch(1);
More information about the infinispan-commits
mailing list