[infinispan-commits] Infinispan SVN: r300 - in trunk/core/src: test/java/org/infinispan/invalidation and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu May 14 11:19:07 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-05-14 11:19:07 -0400 (Thu, 14 May 2009)
New Revision: 300
Added:
trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java
trunk/core/src/test/java/org/infinispan/test/data/
trunk/core/src/test/java/org/infinispan/test/data/Key.java
Modified:
trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
Log:
More async API tests
Modified: trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java 2009-05-14 14:19:19 UTC (rev 299)
+++ trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java 2009-05-14 15:19:07 UTC (rev 300)
@@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
@@ -116,7 +117,7 @@
List<WriteCommand> mods = Arrays.asList(command.getModifications());
Transaction runningTransaction = ctx.getRunningTransaction();
if (runningTransaction == null) throw new IllegalStateException("we must have an associated transaction");
- broadcastInvalidate(mods, runningTransaction, ctx);
+ broadcastInvalidateForPrepare(mods, runningTransaction, ctx);
} else {
if (trace) log.trace("Nothing to invalidate - no modifications in the transaction.");
}
@@ -127,13 +128,13 @@
Object retval = invokeNextInterceptor(ctx, command);
if (command.isSuccessful() && !ctx.isInTxScope()) {
if (keys != null && keys.length != 0) {
- invalidateAcrossCluster(isSynchronous(ctx), ctx, keys);
+ return invalidateAcrossCluster(isSynchronous(ctx), ctx, keys, ctx.isUseFutureReturnType(), retval);
}
}
return retval;
}
- private void broadcastInvalidate(List<WriteCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable {
+ private void broadcastInvalidateForPrepare(List<WriteCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable {
if (ctx.isInTxScope() && !isLocalModeForced(ctx)) {
if (modifications == null || modifications.isEmpty()) return;
InvalidationFilterVisitor filterVisitor = new InvalidationFilterVisitor(modifications.size());
@@ -143,7 +144,7 @@
log.debug("Modification list contains a putForExternalRead operation. Not invalidating.");
} else {
try {
- invalidateAcrossCluster(defaultSynchronous, ctx, filterVisitor.result.toArray());
+ invalidateAcrossCluster(defaultSynchronous, ctx, filterVisitor.result.toArray(), false, null);
}
catch (Throwable t) {
log.warn("Unable to broadcast evicts as a part of the prepare phase. Rolling back.", t);
@@ -184,16 +185,28 @@
}
- protected void invalidateAcrossCluster(boolean synchronous, InvocationContext ctx, Object[] keys) throws Throwable {
+ protected Object invalidateAcrossCluster(boolean synchronous, InvocationContext ctx, Object[] keys, boolean useFuture,
+ final Object retvalForFuture) throws Throwable {
if (!isLocalModeForced(ctx)) {
// increment invalidations counter if statistics maintained
incrementInvalidations();
- InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
+ final InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
if (log.isDebugEnabled())
log.debug("Cache [" + rpcManager.getLocalAddress() + "] replicating " + command);
// voila, invalidated!
- rpcManager.broadcastReplicableCommand(command, synchronous);
+ if (useFuture) {
+ return submitRpcCall(new Callable<Object>() {
+ public Object call() throws Exception {
+ rpcManager.broadcastReplicableCommand(command, true);
+ return null;
+ }
+ }, retvalForFuture);
+ } else {
+ rpcManager.broadcastReplicableCommand(command, synchronous);
+ }
}
+
+ return retvalForFuture;
}
private void incrementInvalidations() {
Added: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java 2009-05-14 15:19:07 UTC (rev 300)
@@ -0,0 +1,41 @@
+package org.infinispan.invalidation;
+
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.test.AbstractCacheTest;
+import org.infinispan.test.ReplListener;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "replication.AsyncAPIAsyncInvalTest")
+public class AsyncAPIAsyncInvalTest extends AsyncAPISyncInvalTest {
+ ReplListener rl;
+
+ public AsyncAPIAsyncInvalTest() {
+ cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ super.createCacheManagers();
+ rl = new ReplListener(c2, true);
+ }
+
+ @Override
+ protected boolean sync() {
+ return false;
+ }
+
+ @Override
+ protected void resetListeners() {
+ rl.resetEager();
+ }
+
+ @Override
+ protected void asyncWait(Class<? extends WriteCommand>... cmds) {
+ if (cmds == null || cmds.length == 0)
+ rl.expectAny();
+ else
+ rl.expect(cmds);
+
+ rl.waitForRpc();
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java (from rev 299, trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java 2009-05-14 15:19:07 UTC (rev 300)
@@ -0,0 +1,199 @@
+package org.infinispan.invalidation;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.Flag;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.data.Key;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.util.Util;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+ at Test(groups = "functional", testName = "replication.AsyncAPISyncInvalTest")
+public class AsyncAPISyncInvalTest extends MultipleCacheManagersTest {
+
+ Cache<Key, String> c1, c2;
+
+ @SuppressWarnings("unchecked")
+ protected void createCacheManagers() throws Throwable {
+ Configuration c =
+ getDefaultClusteredConfig(sync() ? Configuration.CacheMode.INVALIDATION_SYNC : Configuration.CacheMode.INVALIDATION_ASYNC);
+ c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ List<Cache<Key, String>> l = createClusteredCaches(2, getClass().getSimpleName(), c);
+ c1 = l.get(0);
+ c2 = l.get(1);
+ }
+
+ protected boolean sync() {
+ return true;
+ }
+
+ protected void asyncWait(Class<? extends WriteCommand>... cmds) {
+ }
+
+ protected void resetListeners() {
+ }
+
+ private void assertInvalidated(Key k, String value) {
+ assert Util.safeEquals(c1.get(k), value);
+ assert !c2.containsKey(k);
+ }
+
+ private void initC2(Key k) {
+ c2.getAdvancedCache().put(k, "v", Flag.CACHE_MODE_LOCAL);
+ }
+
+ public void testAsyncMethods() throws ExecutionException, InterruptedException {
+
+ String v = "v";
+ String v2 = "v2";
+ String v3 = "v3";
+ String v4 = "v4";
+ String v5 = "v5";
+ String v6 = "v6";
+ String v_null = "v_nonexistent";
+ Key key = new Key("k", true);
+
+ initC2(key);
+
+ assert !c1.containsKey(key);
+ assert v.equals(c2.get(key));
+
+ // put
+ Future<String> f = c1.putAsync(key, v);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert f.isDone();
+ assertInvalidated(key, v);
+
+ initC2(key);
+ f = c1.putAsync(key, v2);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get().equals(v);
+ assert f.isDone();
+ assertInvalidated(key, v2);
+
+ // putAll
+ initC2(key);
+ Future<Void> f2 = c1.putAllAsync(Collections.singletonMap(key, v3));
+ assert f2 != null;
+ assert !f2.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f2.isCancelled();
+ assert f2.get() == null;
+ assert f2.isDone();
+ assertInvalidated(key, v3);
+
+ // putIfAbsent
+ initC2(key);
+ f = c1.putIfAbsentAsync(key, v4);
+ assert f != null;
+ assert c2.get(key).equals(v);
+ assert !f.isCancelled();
+ assert f.get().equals(v3);
+ assert f.isDone();
+
+ // remove
+ initC2(key);
+ f = c1.removeAsync(key);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get().equals(v3);
+ assert f.isDone();
+ assertInvalidated(key, null);
+
+ // putIfAbsent again
+ initC2(key);
+ f = c1.putIfAbsentAsync(key, v4);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert f.isDone();
+ assertInvalidated(key, v4);
+
+ // removecond
+ initC2(key);
+ Future<Boolean> f3 = c1.removeAsync(key, v_null);
+ assert f3 != null;
+ assert !f3.isCancelled();
+ assert f3.get().equals(false);
+ assert f3.isDone();
+ assert c2.get(key).equals(v);
+
+ f3 = c1.removeAsync(key, v4);
+ assert f3 != null;
+ assert !f3.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f3.isCancelled();
+ assert f3.get().equals(true);
+ assert f3.isDone();
+ assertInvalidated(key, null);
+
+ // replace
+ initC2(key);
+ f = c1.replaceAsync(key, v5);
+ assert f != null;
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert f.isDone();
+ assert c2.get(key).equals(v);
+
+ key.allowSerialization();
+ resetListeners();
+ c1.put(key, v);
+ asyncWait();
+
+ initC2(key);
+ f = c1.replaceAsync(key, v5);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get().equals(v);
+ assert f.isDone();
+ assertInvalidated(key, v5);
+
+ //replace2
+ initC2(key);
+ f3 = c1.replaceAsync(key, v_null, v6);
+ assert f3 != null;
+ assert !f3.isCancelled();
+ assert f3.get().equals(false);
+ assert f3.isDone();
+ assert c2.get(key).equals(v);
+ assert c1.get(key).equals(v5);
+
+ f3 = c1.replaceAsync(key, v5, v6);
+ assert f3 != null;
+ assert !f3.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f3.isCancelled();
+ assert f3.get().equals(true);
+ assert f3.isDone();
+ assertInvalidated(key, v6);
+ }
+}
\ No newline at end of file
Modified: trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java 2009-05-14 14:19:19 UTC (rev 299)
+++ trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java 2009-05-14 15:19:07 UTC (rev 300)
@@ -6,16 +6,12 @@
import org.infinispan.config.Configuration;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
+import org.infinispan.test.data.Key;
import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.ReclosableLatch;
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -333,55 +329,4 @@
asyncWait(true);
assertOnAllCaches(key, v6);
}
-
- protected static class Key implements Externalizable {
- String value;
- ReclosableLatch latch = new ReclosableLatch(false);
- boolean lockable;
-
- public Key() {
- }
-
- public Key(String value, boolean lockable) {
- this.value = value;
- this.lockable = lockable;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Key k1 = (Key) o;
-
- if (value != null ? !value.equals(k1.value) : k1.value != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return value != null ? value.hashCode() : 0;
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(value);
- if (lockable) {
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- latch.close();
- }
- }
-
- public void allowSerialization() {
- if (lockable) latch.open();
- }
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- value = (String) in.readObject();
- }
- }
}
Added: trunk/core/src/test/java/org/infinispan/test/data/Key.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/data/Key.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/test/data/Key.java 2009-05-14 15:19:07 UTC (rev 300)
@@ -0,0 +1,61 @@
+package org.infinispan.test.data;
+
+import org.infinispan.util.concurrent.ReclosableLatch;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.TimeUnit;
+
+public class Key implements Externalizable {
+
+ String value;
+ ReclosableLatch latch = new ReclosableLatch(false);
+ boolean lockable;
+
+ public Key() {
+ }
+
+ public Key(String value, boolean lockable) {
+ this.value = value;
+ this.lockable = lockable;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Key k1 = (Key) o;
+
+ if (value != null ? !value.equals(k1.value) : k1.value != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return value != null ? value.hashCode() : 0;
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(value);
+ if (lockable) {
+ try {
+ if (!latch.await(60, TimeUnit.SECONDS)) throw new RuntimeException("Cannot serialize!!");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ latch.close();
+ }
+ }
+
+ public void allowSerialization() {
+ if (lockable) latch.open();
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ value = (String) in.readObject();
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/test/data/Key.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list