[infinispan-commits] Infinispan SVN: r300 - in trunk/core/src: test/java/org/infinispan/invalidation and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu May 14 11:19:07 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-05-14 11:19:07 -0400 (Thu, 14 May 2009)
New Revision: 300

Added:
   trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
   trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java
   trunk/core/src/test/java/org/infinispan/test/data/
   trunk/core/src/test/java/org/infinispan/test/data/Key.java
Modified:
   trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
   trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
Log:
More async API tests

Modified: trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2009-05-14 14:19:19 UTC (rev 299)
+++ trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2009-05-14 15:19:07 UTC (rev 300)
@@ -48,6 +48,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -116,7 +117,7 @@
          List<WriteCommand> mods = Arrays.asList(command.getModifications());
          Transaction runningTransaction = ctx.getRunningTransaction();
          if (runningTransaction == null) throw new IllegalStateException("we must have an associated transaction");
-         broadcastInvalidate(mods, runningTransaction, ctx);
+         broadcastInvalidateForPrepare(mods, runningTransaction, ctx);
       } else {
          if (trace) log.trace("Nothing to invalidate - no modifications in the transaction.");
       }
@@ -127,13 +128,13 @@
       Object retval = invokeNextInterceptor(ctx, command);
       if (command.isSuccessful() && !ctx.isInTxScope()) {
          if (keys != null && keys.length != 0) {
-            invalidateAcrossCluster(isSynchronous(ctx), ctx, keys);
+            return invalidateAcrossCluster(isSynchronous(ctx), ctx, keys, ctx.isUseFutureReturnType(), retval);
          }
       }
       return retval;
    }
 
-   private void broadcastInvalidate(List<WriteCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable {
+   private void broadcastInvalidateForPrepare(List<WriteCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable {
       if (ctx.isInTxScope() && !isLocalModeForced(ctx)) {
          if (modifications == null || modifications.isEmpty()) return;
          InvalidationFilterVisitor filterVisitor = new InvalidationFilterVisitor(modifications.size());
@@ -143,7 +144,7 @@
             log.debug("Modification list contains a putForExternalRead operation.  Not invalidating.");
          } else {
             try {
-               invalidateAcrossCluster(defaultSynchronous, ctx, filterVisitor.result.toArray());
+               invalidateAcrossCluster(defaultSynchronous, ctx, filterVisitor.result.toArray(), false, null);
             }
             catch (Throwable t) {
                log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
@@ -184,16 +185,28 @@
    }
 
 
-   protected void invalidateAcrossCluster(boolean synchronous, InvocationContext ctx, Object[] keys) throws Throwable {
+   protected Object invalidateAcrossCluster(boolean synchronous, InvocationContext ctx, Object[] keys, boolean useFuture,
+                                            final Object retvalForFuture) throws Throwable {
       if (!isLocalModeForced(ctx)) {
          // increment invalidations counter if statistics maintained
          incrementInvalidations();
-         InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
+         final InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
          if (log.isDebugEnabled())
             log.debug("Cache [" + rpcManager.getLocalAddress() + "] replicating " + command);
          // voila, invalidated!
-         rpcManager.broadcastReplicableCommand(command, synchronous);
+         if (useFuture) {
+            return submitRpcCall(new Callable<Object>() {
+               public Object call() throws Exception {
+                  rpcManager.broadcastReplicableCommand(command, true);
+                  return null;
+               }
+            }, retvalForFuture);
+         } else {
+            rpcManager.broadcastReplicableCommand(command, synchronous);
+         }
       }
+
+      return retvalForFuture;
    }
 
    private void incrementInvalidations() {

Added: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java	2009-05-14 15:19:07 UTC (rev 300)
@@ -0,0 +1,41 @@
+package org.infinispan.invalidation;
+
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.test.AbstractCacheTest;
+import org.infinispan.test.ReplListener;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "replication.AsyncAPIAsyncInvalTest")
+public class AsyncAPIAsyncInvalTest extends AsyncAPISyncInvalTest {
+   ReplListener rl;
+
+   public AsyncAPIAsyncInvalTest() {
+      cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
+   }
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      super.createCacheManagers();
+      rl = new ReplListener(c2, true);
+   }
+
+   @Override
+   protected boolean sync() {
+      return false;
+   }
+
+   @Override
+   protected void resetListeners() {
+      rl.resetEager();
+   }
+
+   @Override
+   protected void asyncWait(Class<? extends WriteCommand>... cmds) {
+      if (cmds == null || cmds.length == 0)
+         rl.expectAny();
+      else
+         rl.expect(cmds);
+
+      rl.waitForRpc();
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPIAsyncInvalTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Copied: trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java (from rev 299, trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/invalidation/AsyncAPISyncInvalTest.java	2009-05-14 15:19:07 UTC (rev 300)
@@ -0,0 +1,199 @@
+package org.infinispan.invalidation;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.context.Flag;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.data.Key;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.util.Util;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+ at Test(groups = "functional", testName = "replication.AsyncAPISyncInvalTest")
+public class AsyncAPISyncInvalTest extends MultipleCacheManagersTest {
+
+   Cache<Key, String> c1, c2;
+
+   @SuppressWarnings("unchecked")
+   protected void createCacheManagers() throws Throwable {
+      Configuration c =
+            getDefaultClusteredConfig(sync() ? Configuration.CacheMode.INVALIDATION_SYNC : Configuration.CacheMode.INVALIDATION_ASYNC);
+      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      List<Cache<Key, String>> l = createClusteredCaches(2, getClass().getSimpleName(), c);
+      c1 = l.get(0);
+      c2 = l.get(1);
+   }
+
+   protected boolean sync() {
+      return true;
+   }
+
+   protected void asyncWait(Class<? extends WriteCommand>... cmds) {
+   }
+
+   protected void resetListeners() {
+   }
+
+   private void assertInvalidated(Key k, String value) {
+      assert Util.safeEquals(c1.get(k), value);
+      assert !c2.containsKey(k);
+   }
+
+   private void initC2(Key k) {
+      c2.getAdvancedCache().put(k, "v", Flag.CACHE_MODE_LOCAL);
+   }
+
+   public void testAsyncMethods() throws ExecutionException, InterruptedException {
+
+      String v = "v";
+      String v2 = "v2";
+      String v3 = "v3";
+      String v4 = "v4";
+      String v5 = "v5";
+      String v6 = "v6";
+      String v_null = "v_nonexistent";
+      Key key = new Key("k", true);
+
+      initC2(key);
+
+      assert !c1.containsKey(key);
+      assert v.equals(c2.get(key));
+
+      // put
+      Future<String> f = c1.putAsync(key, v);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert f.isDone();
+      assertInvalidated(key, v);
+
+      initC2(key);
+      f = c1.putAsync(key, v2);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get().equals(v);
+      assert f.isDone();
+      assertInvalidated(key, v2);
+
+      // putAll
+      initC2(key);
+      Future<Void> f2 = c1.putAllAsync(Collections.singletonMap(key, v3));
+      assert f2 != null;
+      assert !f2.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f2.isCancelled();
+      assert f2.get() == null;
+      assert f2.isDone();
+      assertInvalidated(key, v3);
+
+      // putIfAbsent
+      initC2(key);
+      f = c1.putIfAbsentAsync(key, v4);
+      assert f != null;
+      assert c2.get(key).equals(v);
+      assert !f.isCancelled();
+      assert f.get().equals(v3);
+      assert f.isDone();
+
+      // remove
+      initC2(key);
+      f = c1.removeAsync(key);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get().equals(v3);
+      assert f.isDone();
+      assertInvalidated(key, null);
+
+      // putIfAbsent again
+      initC2(key);
+      f = c1.putIfAbsentAsync(key, v4);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert f.isDone();
+      assertInvalidated(key, v4);
+
+      // removecond
+      initC2(key);
+      Future<Boolean> f3 = c1.removeAsync(key, v_null);
+      assert f3 != null;
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert f3.isDone();
+      assert c2.get(key).equals(v);
+
+      f3 = c1.removeAsync(key, v4);
+      assert f3 != null;
+      assert !f3.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      assert f3.isDone();
+      assertInvalidated(key, null);
+
+      // replace
+      initC2(key);
+      f = c1.replaceAsync(key, v5);
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert f.isDone();
+      assert c2.get(key).equals(v);
+
+      key.allowSerialization();
+      resetListeners();
+      c1.put(key, v);
+      asyncWait();
+
+      initC2(key);
+      f = c1.replaceAsync(key, v5);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get().equals(v);
+      assert f.isDone();
+      assertInvalidated(key, v5);
+
+      //replace2
+      initC2(key);
+      f3 = c1.replaceAsync(key, v_null, v6);
+      assert f3 != null;
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert f3.isDone();
+      assert c2.get(key).equals(v);
+      assert c1.get(key).equals(v5);
+
+      f3 = c1.replaceAsync(key, v5, v6);
+      assert f3 != null;
+      assert !f3.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      assert f3.isDone();
+      assertInvalidated(key, v6);
+   }
+}
\ No newline at end of file

Modified: trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java	2009-05-14 14:19:19 UTC (rev 299)
+++ trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java	2009-05-14 15:19:07 UTC (rev 300)
@@ -6,16 +6,12 @@
 import org.infinispan.config.Configuration;
 import org.infinispan.test.MultipleCacheManagersTest;
 import org.infinispan.test.TestingUtil;
+import org.infinispan.test.data.Key;
 import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
 import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.ReclosableLatch;
 import org.testng.annotations.Test;
 
 import javax.transaction.TransactionManager;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -333,55 +329,4 @@
       asyncWait(true);
       assertOnAllCaches(key, v6);
    }
-
-   protected static class Key implements Externalizable {
-      String value;
-      ReclosableLatch latch = new ReclosableLatch(false);
-      boolean lockable;
-
-      public Key() {
-      }
-
-      public Key(String value, boolean lockable) {
-         this.value = value;
-         this.lockable = lockable;
-      }
-
-      @Override
-      public boolean equals(Object o) {
-         if (this == o) return true;
-         if (o == null || getClass() != o.getClass()) return false;
-
-         Key k1 = (Key) o;
-
-         if (value != null ? !value.equals(k1.value) : k1.value != null) return false;
-
-         return true;
-      }
-
-      @Override
-      public int hashCode() {
-         return value != null ? value.hashCode() : 0;
-      }
-
-      public void writeExternal(ObjectOutput out) throws IOException {
-         out.writeObject(value);
-         if (lockable) {
-            try {
-               latch.await();
-            } catch (InterruptedException e) {
-               e.printStackTrace();
-            }
-            latch.close();
-         }
-      }
-
-      public void allowSerialization() {
-         if (lockable) latch.open();
-      }
-
-      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-         value = (String) in.readObject();
-      }
-   }
 }

Added: trunk/core/src/test/java/org/infinispan/test/data/Key.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/data/Key.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/test/data/Key.java	2009-05-14 15:19:07 UTC (rev 300)
@@ -0,0 +1,61 @@
+package org.infinispan.test.data;
+
+import org.infinispan.util.concurrent.ReclosableLatch;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.TimeUnit;
+
+public class Key implements Externalizable {
+
+   String value;
+   ReclosableLatch latch = new ReclosableLatch(false);
+   boolean lockable;
+
+   public Key() {
+   }
+
+   public Key(String value, boolean lockable) {
+      this.value = value;
+      this.lockable = lockable;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Key k1 = (Key) o;
+
+      if (value != null ? !value.equals(k1.value) : k1.value != null) return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      return value != null ? value.hashCode() : 0;
+   }
+
+   public void writeExternal(ObjectOutput out) throws IOException {
+      out.writeObject(value);
+      if (lockable) {
+         try {
+            if (!latch.await(60, TimeUnit.SECONDS)) throw new RuntimeException("Cannot serialize!!");
+         } catch (InterruptedException e) {
+            e.printStackTrace();
+         }
+         latch.close();
+      }
+   }
+
+   public void allowSerialization() {
+      if (lockable) latch.open();
+   }
+
+   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+      value = (String) in.readObject();
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/test/data/Key.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF




More information about the infinispan-commits mailing list