[infinispan-commits] Infinispan SVN: r301 - in trunk/core/src: main/java/org/infinispan/remoting/rpc and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu May 14 12:12:36 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-05-14 12:12:36 -0400 (Thu, 14 May 2009)
New Revision: 301

Added:
   trunk/core/src/test/java/org/infinispan/distribution/AsyncAPIAsyncDistTest.java
   trunk/core/src/test/java/org/infinispan/distribution/AsyncAPISyncDistTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java
   trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
   trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java
   trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
Log:
More async API tests

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-05-14 16:12:36 UTC (rev 301)
@@ -30,6 +30,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
@@ -181,7 +186,7 @@
 
       if (ctx.isOriginLocal()) {
          List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
-         if (trace)  log.trace("Multicasting PrepareCommand to recipients : " + recipients);
+         if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
          // this method will return immediately if we're the only member (because exclude_self=true)
          rpcManager.multicastRpcCommand(recipients, command, sync, false);
       }
@@ -231,16 +236,22 @@
             if (ctx.isOriginLocal()) {
                List<Address> rec = recipientGenerator.generateRecipients();
                if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
+               Future<Object> f1 = null, f2;
+               boolean useFuture = ctx.isUseFutureReturnType();
+               boolean sync = isSynchronous(ctx);
+
                // if L1 caching is used make sure we broadcast an invalidate message
-               if (isL1CacheEnabled && rec != null) {
+               if (isL1CacheEnabled && rec != null && rpcManager.getMembers().size() > rec.size()) {
                   InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
-                  rpcManager.broadcastReplicableCommand(ic, isSynchronous(ctx));
+                  f1 = submitRpc(null, ic, sync, useFuture);
                }
-               rpcManager.multicastReplicableCommand(rec, command, isSynchronous(ctx));
+               f2 = submitRpc(rec, command, sync, useFuture);
+
+               if (f2 != null) return new DistributionCommunicationFuture(f1, f2, returnValue);
             }
          } else {
             if (!localModeForced) {
-               ((TxInvocationContext)ctx).addTransactionParticipants(recipientGenerator.generateRecipients());
+               ((TxInvocationContext) ctx).addTransactionParticipants(recipientGenerator.generateRecipients());
             } else {
                // add to list of participants
             }
@@ -249,6 +260,68 @@
       return returnValue;
    }
 
+   private class DistributionCommunicationFuture implements Future<Object> {
+      final Future<Object> invalFuture, replFuture;
+      final Object returnValue;
+
+      private DistributionCommunicationFuture(Future<Object> invalFuture, Future<Object> replFuture, Object returnValue) {
+         this.invalFuture = invalFuture;
+         this.replFuture = replFuture;
+         this.returnValue = returnValue;
+      }
+
+      public boolean cancel(boolean mayInterruptIfRunning) {
+         boolean invalCancelled = true;
+         if (invalFuture != null) invalCancelled = invalFuture.cancel(mayInterruptIfRunning);
+         return replFuture.cancel(mayInterruptIfRunning) && invalCancelled;
+      }
+
+      public boolean isCancelled() {
+         return replFuture.isCancelled() && (invalFuture == null || invalFuture.isCancelled());
+      }
+
+      public boolean isDone() {
+         return replFuture.isDone() && (invalFuture == null || invalFuture.isDone());
+      }
+
+      public Object get() throws InterruptedException, ExecutionException {
+         if (invalFuture != null) invalFuture.get();
+         replFuture.get();
+         return returnValue;
+      }
+
+      public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+         if (invalFuture != null) invalFuture.get(timeout, unit);
+         replFuture.get(timeout, unit);
+         return returnValue;
+      }
+   }
+
+
+   private Future<Object> submitRpc(final List<Address> recipients, final WriteCommand cmd, final boolean sync, boolean useFuture) {
+      if (useFuture) {
+         Callable<Object> c = new Callable<Object>() {
+            public Object call() {
+               if (recipients == null) {
+                  rpcManager.broadcastReplicableCommand(cmd, true);
+               } else {
+                  rpcManager.multicastReplicableCommand(recipients, cmd, true);
+               }
+               return null;
+            }
+         };
+
+         return asyncExecutorService.submit(c);
+      } else {
+         if (recipients == null) {
+            rpcManager.broadcastReplicableCommand(cmd, sync);
+         } else {
+            rpcManager.multicastReplicableCommand(recipients, cmd, sync);
+         }
+         return null;
+      }
+   }
+
    interface RecipientGenerator {
       List<Address> generateRecipients();
 

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java	2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java	2009-05-14 16:12:36 UTC (rev 301)
@@ -129,4 +129,8 @@
    public Address getLocalAddress() {
       return rpcManager != null ? rpcManager.getLocalAddress() : null;
    }
+
+   public List<Address> getMembers() {
+      return rpcManager.getTransport().getMembers();
+   }
 }

Copied: trunk/core/src/test/java/org/infinispan/distribution/AsyncAPIAsyncDistTest.java (from rev 299, trunk/core/src/test/java/org/infinispan/replication/AsyncAPIAsyncReplTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/AsyncAPIAsyncDistTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/AsyncAPIAsyncDistTest.java	2009-05-14 16:12:36 UTC (rev 301)
@@ -0,0 +1,50 @@
+package org.infinispan.distribution;
+
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.test.AbstractCacheTest;
+import org.infinispan.test.ReplListener;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.AsyncAPIAsyncDistTest")
+public class AsyncAPIAsyncDistTest extends AsyncAPISyncDistTest {
+
+   ReplListener rl;
+
+   public AsyncAPIAsyncDistTest() {
+      cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
+   }
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      super.createCacheManagers();
+      rl = new ReplListener(c2, true);
+   }
+
+   @Override
+   protected boolean sync() {
+      return false;
+   }
+
+   @Override
+   protected void resetListeners() {
+      rl.resetEager();
+   }
+
+   @Override
+   protected void asyncWait(boolean tx, Class<? extends WriteCommand>... cmds) {
+      if (tx) {
+         if (cmds == null || cmds.length == 0)
+            rl.expectAnyWithTx();
+         else
+            rl.expectWithTx(cmds);
+      } else {
+         if (cmds == null || cmds.length == 0)
+            rl.expectAny();
+         else
+            rl.expect(cmds);
+      }
+
+
+      rl.waitForRpc();
+   }
+}
\ No newline at end of file

Copied: trunk/core/src/test/java/org/infinispan/distribution/AsyncAPISyncDistTest.java (from rev 300, trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/AsyncAPISyncDistTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/AsyncAPISyncDistTest.java	2009-05-14 16:12:36 UTC (rev 301)
@@ -0,0 +1,34 @@
+package org.infinispan.distribution;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.Flag;
+import org.infinispan.replication.AsyncAPISyncReplTest;
+import org.infinispan.test.data.Key;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.util.Util;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+ at Test(groups = "functional", testName = "distribution.AsyncAPISyncDistTest")
+public class AsyncAPISyncDistTest extends AsyncAPISyncReplTest {
+
+   @SuppressWarnings("unchecked")
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration c =
+            getDefaultClusteredConfig(sync() ? Configuration.CacheMode.DIST_SYNC : Configuration.CacheMode.DIST_ASYNC);
+      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      List<Cache<Key, String>> l = createClusteredCaches(2, getClass().getSimpleName(), c);
+      c1 = l.get(0);
+      c2 = l.get(1);
+   }
+
+   @Override
+   protected void assertOnAllCaches(Key k, String v) {
+      Object real;
+      assert Util.safeEquals((real = c1.getAdvancedCache().get(k, Flag.SKIP_REMOTE_LOOKUP)), v) : "Error on cache 1.  Expected " + v + " and got " + real;
+      assert Util.safeEquals((real = c2.getAdvancedCache().get(k, Flag.SKIP_REMOTE_LOOKUP)), v) : "Error on cache 2.  Expected " + v + " and got " + real;
+   }
+}
\ No newline at end of file

Modified: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java	2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java	2009-05-14 16:12:36 UTC (rev 301)
@@ -5,7 +5,7 @@
 import org.infinispan.test.ReplListener;
 import org.testng.annotations.Test;
 
- at Test(groups = "functional", testName = "replication.AsyncAPIAsyncInvalTest")
+ at Test(groups = "functional", testName = "invalidation.AsyncAPIAsyncInvalTest")
 public class AsyncAPIAsyncInvalTest extends AsyncAPISyncInvalTest {
    ReplListener rl;
 

Modified: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java	2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java	2009-05-14 16:12:36 UTC (rev 301)
@@ -15,7 +15,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
- at Test(groups = "functional", testName = "replication.AsyncAPISyncInvalTest")
+ at Test(groups = "functional", testName = "invalidation.AsyncAPISyncInvalTest")
 public class AsyncAPISyncInvalTest extends MultipleCacheManagersTest {
 
    Cache<Key, String> c1, c2;

Modified: trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java	2009-05-14 15:19:07 UTC (rev 300)
+++ trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java	2009-05-14 16:12:36 UTC (rev 301)
@@ -20,7 +20,7 @@
 @Test(groups = "functional", testName = "replication.AsyncAPISyncReplTest")
 public class AsyncAPISyncReplTest extends MultipleCacheManagersTest {
 
-   Cache<Key, String> c1, c2;
+   protected Cache<Key, String> c1, c2;
 
    @SuppressWarnings("unchecked")
    protected void createCacheManagers() throws Throwable {
@@ -42,7 +42,7 @@
    protected void resetListeners() {
    }
 
-   private void assertOnAllCaches(Key k, String v) {
+   protected void assertOnAllCaches(Key k, String v) {
       Object real;
       assert Util.safeEquals((real = c1.get(k)), v) : "Error on cache 1.  Expected " + v + " and got " + real;
       assert Util.safeEquals((real = c2.get(k)), v) : "Error on cache 2.  Expected " + v + " and got " + real;
@@ -61,6 +61,7 @@
 
       // put
       Future<String> f = c1.putAsync(key, v);
+      System.out.println("Future is of type " + f.getClass().getName());
       assert f != null;
       assert !f.isDone();
       assert c2.get(key) == null;




More information about the infinispan-commits mailing list