[infinispan-commits] Infinispan SVN: r301 - in trunk/core/src: main/java/org/infinispan/remoting/rpc and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu May 14 12:12:36 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-05-14 12:12:36 -0400 (Thu, 14 May 2009)
New Revision: 301
Added:
trunk/core/src/test/java/org/infinispan/distribution/AsyncAPIAsyncDistTest.java
trunk/core/src/test/java/org/infinispan/distribution/AsyncAPISyncDistTest.java
Modified:
trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java
trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java
trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
Log:
More async API tests
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-05-14 16:12:36 UTC (rev 301)
@@ -30,6 +30,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
@@ -181,7 +186,7 @@
if (ctx.isOriginLocal()) {
List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
- if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
+ if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
// this method will return immediately if we're the only member (because exclude_self=true)
rpcManager.multicastRpcCommand(recipients, command, sync, false);
}
@@ -231,16 +236,22 @@
if (ctx.isOriginLocal()) {
List<Address> rec = recipientGenerator.generateRecipients();
if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
+ Future<Object> f1 = null, f2;
+ boolean useFuture = ctx.isUseFutureReturnType();
+ boolean sync = isSynchronous(ctx);
+
// if L1 caching is used make sure we broadcast an invalidate message
- if (isL1CacheEnabled && rec != null) {
+ if (isL1CacheEnabled && rec != null && rpcManager.getMembers().size() > rec.size()) {
InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
- rpcManager.broadcastReplicableCommand(ic, isSynchronous(ctx));
+ f1 = submitRpc(null, ic, sync, useFuture);
}
- rpcManager.multicastReplicableCommand(rec, command, isSynchronous(ctx));
+ f2 = submitRpc(rec, command, sync, useFuture);
+
+ if (f2 != null) return new DistributionCommunicationFuture(f1, f2, returnValue);
}
} else {
if (!localModeForced) {
- ((TxInvocationContext)ctx).addTransactionParticipants(recipientGenerator.generateRecipients());
+ ((TxInvocationContext) ctx).addTransactionParticipants(recipientGenerator.generateRecipients());
} else {
// add to list of participants
}
@@ -249,6 +260,68 @@
return returnValue;
}
+ private class DistributionCommunicationFuture implements Future<Object> {
+ final Future<Object> invalFuture, replFuture;
+ final Object returnValue;
+
+ private DistributionCommunicationFuture(Future<Object> invalFuture, Future<Object> replFuture, Object returnValue) {
+ this.invalFuture = invalFuture;
+ this.replFuture = replFuture;
+ this.returnValue = returnValue;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean invalCancelled = true;
+ if (invalFuture != null) invalCancelled = invalFuture.cancel(mayInterruptIfRunning);
+ return replFuture.cancel(mayInterruptIfRunning) && invalCancelled;
+ }
+
+ public boolean isCancelled() {
+ return replFuture.isCancelled() && (invalFuture == null || invalFuture.isCancelled());
+ }
+
+ public boolean isDone() {
+ return replFuture.isDone() && (invalFuture == null || invalFuture.isDone());
+ }
+
+ public Object get() throws InterruptedException, ExecutionException {
+ if (invalFuture != null) invalFuture.get();
+ replFuture.get();
+ return returnValue;
+ }
+
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ if (invalFuture != null) invalFuture.get(timeout, unit);
+ replFuture.get(timeout, unit);
+ return returnValue;
+ }
+ }
+
+
+ private Future<Object> submitRpc(final List<Address> recipients, final WriteCommand cmd, final boolean sync, boolean useFuture) {
+ if (useFuture) {
+ Callable<Object> c = new Callable<Object>() {
+ public Object call() {
+ if (recipients == null) {
+ rpcManager.broadcastReplicableCommand(cmd, true);
+ } else {
+ rpcManager.multicastReplicableCommand(recipients, cmd, true);
+ }
+ return null;
+ }
+ };
+
+ return asyncExecutorService.submit(c);
+ } else {
+ if (recipients == null) {
+ rpcManager.broadcastReplicableCommand(cmd, sync);
+ } else {
+ rpcManager.multicastReplicableCommand(recipients, cmd, sync);
+ }
+ return null;
+ }
+ }
+
interface RecipientGenerator {
List<Address> generateRecipients();
Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java 2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java 2009-05-14 16:12:36 UTC (rev 301)
@@ -129,4 +129,8 @@
public Address getLocalAddress() {
return rpcManager != null ? rpcManager.getLocalAddress() : null;
}
+
+ public List<Address> getMembers() {
+ return rpcManager.getTransport().getMembers();
+ }
}
Copied: trunk/core/src/test/java/org/infinispan/distribution/AsyncAPIAsyncDistTest.java (from rev 299, trunk/core/src/test/java/org/infinispan/replication/AsyncAPIAsyncReplTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/AsyncAPIAsyncDistTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/AsyncAPIAsyncDistTest.java 2009-05-14 16:12:36 UTC (rev 301)
@@ -0,0 +1,50 @@
+package org.infinispan.distribution;
+
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.test.AbstractCacheTest;
+import org.infinispan.test.ReplListener;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.AsyncAPIAsyncDistTest")
+public class AsyncAPIAsyncDistTest extends AsyncAPISyncDistTest {
+
+ ReplListener rl;
+
+ public AsyncAPIAsyncDistTest() {
+ cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ super.createCacheManagers();
+ rl = new ReplListener(c2, true);
+ }
+
+ @Override
+ protected boolean sync() {
+ return false;
+ }
+
+ @Override
+ protected void resetListeners() {
+ rl.resetEager();
+ }
+
+ @Override
+ protected void asyncWait(boolean tx, Class<? extends WriteCommand>... cmds) {
+ if (tx) {
+ if (cmds == null || cmds.length == 0)
+ rl.expectAnyWithTx();
+ else
+ rl.expectWithTx(cmds);
+ } else {
+ if (cmds == null || cmds.length == 0)
+ rl.expectAny();
+ else
+ rl.expect(cmds);
+ }
+
+
+ rl.waitForRpc();
+ }
+}
\ No newline at end of file
Copied: trunk/core/src/test/java/org/infinispan/distribution/AsyncAPISyncDistTest.java (from rev 300, trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/AsyncAPISyncDistTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/AsyncAPISyncDistTest.java 2009-05-14 16:12:36 UTC (rev 301)
@@ -0,0 +1,34 @@
+package org.infinispan.distribution;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.Flag;
+import org.infinispan.replication.AsyncAPISyncReplTest;
+import org.infinispan.test.data.Key;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.util.Util;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+ at Test(groups = "functional", testName = "distribution.AsyncAPISyncDistTest")
+public class AsyncAPISyncDistTest extends AsyncAPISyncReplTest {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration c =
+ getDefaultClusteredConfig(sync() ? Configuration.CacheMode.DIST_SYNC : Configuration.CacheMode.DIST_ASYNC);
+ c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ List<Cache<Key, String>> l = createClusteredCaches(2, getClass().getSimpleName(), c);
+ c1 = l.get(0);
+ c2 = l.get(1);
+ }
+
+ @Override
+ protected void assertOnAllCaches(Key k, String v) {
+ Object real;
+ assert Util.safeEquals((real = c1.getAdvancedCache().get(k, Flag.SKIP_REMOTE_LOOKUP)), v) : "Error on cache 1. Expected " + v + " and got " + real;
+ assert Util.safeEquals((real = c2.getAdvancedCache().get(k, Flag.SKIP_REMOTE_LOOKUP)), v) : "Error on cache 2. Expected " + v + " and got " + real;
+ }
+}
\ No newline at end of file
Modified: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java 2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java 2009-05-14 16:12:36 UTC (rev 301)
@@ -5,7 +5,7 @@
import org.infinispan.test.ReplListener;
import org.testng.annotations.Test;
- at Test(groups = "functional", testName = "replication.AsyncAPIAsyncInvalTest")
+ at Test(groups = "functional", testName = "invalidation.AsyncAPIAsyncInvalTest")
public class AsyncAPIAsyncInvalTest extends AsyncAPISyncInvalTest {
ReplListener rl;
Modified: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java 2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java 2009-05-14 16:12:36 UTC (rev 301)
@@ -15,7 +15,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
- at Test(groups = "functional", testName = "replication.AsyncAPISyncInvalTest")
+ at Test(groups = "functional", testName = "invalidation.AsyncAPISyncInvalTest")
public class AsyncAPISyncInvalTest extends MultipleCacheManagersTest {
Cache<Key, String> c1, c2;
Modified: trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java 2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java 2009-05-14 16:12:36 UTC (rev 301)
@@ -20,7 +20,7 @@
@Test(groups = "functional", testName = "replication.AsyncAPISyncReplTest")
public class AsyncAPISyncReplTest extends MultipleCacheManagersTest {
- Cache<Key, String> c1, c2;
+ protected Cache<Key, String> c1, c2;
@SuppressWarnings("unchecked")
protected void createCacheManagers() throws Throwable {
@@ -42,7 +42,7 @@
protected void resetListeners() {
}
- private void assertOnAllCaches(Key k, String v) {
+ protected void assertOnAllCaches(Key k, String v) {
Object real;
assert Util.safeEquals((real = c1.get(k)), v) : "Error on cache 1. Expected " + v + " and got " + real;
assert Util.safeEquals((real = c2.get(k)), v) : "Error on cache 2. Expected " + v + " and got " + real;
@@ -61,6 +61,7 @@
// put
Future<String> f = c1.putAsync(key, v);
+ System.out.println("Future is of type " + f.getClass().getName());
assert f != null;
assert !f.isDone();
assert c2.get(key) == null;
More information about the infinispan-commits
mailing list