[infinispan-commits] Infinispan SVN: r286 - in trunk/core/src: main/java/org/infinispan/interceptors and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed May 13 18:57:00 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-05-13 18:57:00 -0400 (Wed, 13 May 2009)
New Revision: 286

Added:
   trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java
   trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/Cache.java
   trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
Log:
[ISPN-72] (Asynchronous Cache API) impl and basic tests

Modified: trunk/core/src/main/java/org/infinispan/Cache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/Cache.java	2009-05-13 16:48:27 UTC (rev 285)
+++ trunk/core/src/main/java/org/infinispan/Cache.java	2009-05-13 22:57:00 UTC (rev 286)
@@ -72,6 +72,9 @@
  * especially advantageous if the cache uses distribution and the three keys map to different cache instances in the
  * cluster.
  * <p/>
+ * Also, the use of async operations when within a transaction return your local value only, as expected.  A Future is
+ * still returned though for API consistency.
+ * <p/>
  * <h3>Constructing a Cache</h3> An instance of the Cache is usually obtained by using a {@link CacheManager}.
  * <pre>
  *   CacheManager cm = new DefaultCacheManager(); // optionally pass in a default configuration

Modified: trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java	2009-05-13 16:48:27 UTC (rev 285)
+++ trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java	2009-05-13 22:57:00 UTC (rev 286)
@@ -36,6 +36,8 @@
 import org.infinispan.context.impl.TxInvocationContext;
 import org.infinispan.interceptors.base.BaseRpcInterceptor;
 
+import java.util.concurrent.Callable;
+
 /**
  * Takes care of replicating modifications to other caches in a cluster. Also listens for prepare(), commit() and
  * rollback() messages which are received 'side-ways' (see docs/design/Refactoring.txt).
@@ -44,7 +46,6 @@
  * @since 4.0
  */
 public class ReplicationInterceptor extends BaseRpcInterceptor {
-
    @Override
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
       if (!ctx.isInTxScope()) throw new IllegalStateException("This should not be possible!");
@@ -73,7 +74,7 @@
       return retVal;
    }
 
-   
+
    @Override
    public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
       if (ctx.isOriginLocal() && !configuration.isOnePhaseCommit()) {
@@ -111,11 +112,20 @@
     * If we are within one transaction we won't do any replication as replication would only be performed at commit
     * time. If the operation didn't originate locally we won't do any replication either.
     */
-   private Object handleCrudMethod(InvocationContext ctx, WriteCommand command) throws Throwable {
+   private Object handleCrudMethod(final InvocationContext ctx, final WriteCommand command) throws Throwable {
       // FIRST pass this call up the chain.  Only if it succeeds (no exceptions) locally do we attempt to replicate.
-      Object returnValue = invokeNextInterceptor(ctx, command);
+      final Object returnValue = invokeNextInterceptor(ctx, command);
       if (!isLocalModeForced(ctx) && command.isSuccessful() && ctx.isOriginLocal() && !ctx.isInTxScope()) {
-         rpcManager.broadcastReplicableCommand(command, isSynchronous(ctx));
+         if (ctx.isUseFutureReturnType()) {
+            return submitRpcCall(new Callable<Object>() {
+               public Object call() throws Exception {
+                  rpcManager.broadcastReplicableCommand(command, true);
+                  return null;
+               }
+            }, returnValue);
+         } else {
+            rpcManager.broadcastReplicableCommand(command, isSynchronous(ctx));
+         }
       }
       return returnValue;
    }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2009-05-13 16:48:27 UTC (rev 285)
+++ trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2009-05-13 22:57:00 UTC (rev 286)
@@ -24,8 +24,8 @@
 import org.infinispan.jmx.annotations.ManagedAttribute;
 import org.infinispan.jmx.annotations.ManagedOperation;
 import org.infinispan.transaction.TransactionLog;
+import org.infinispan.transaction.xa.TransactionTable;
 import org.infinispan.transaction.xa.TransactionXaAdapter;
-import org.infinispan.transaction.xa.TransactionTable;
 
 import javax.transaction.RollbackException;
 import javax.transaction.Status;
@@ -36,13 +36,13 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Interceptor in charge with handling transaction related operations, e.g enlisting cache as an transaction participant,
- * propagating remotely initiated changes.
+ * Interceptor in charge with handling transaction related operations, e.g enlisting cache as an transaction
+ * participant, propagating remotely initiated changes.
  *
  * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
  * @author Mircea.Markus at jboss.com
+ * @see org.infinispan.transaction.xa.TransactionXaAdapter
  * @since 4.0
- * @see org.infinispan.transaction.xa.TransactionXaAdapter
  */
 public class TxInterceptor extends CommandInterceptor {
 
@@ -170,17 +170,21 @@
    }
 
    private Object enlistWriteAndInvokeNext(InvocationContext ctx, WriteCommand command) throws Throwable {
+      TransactionXaAdapter xaAdapter = null;
+      boolean shouldAddMod = false;
       if (shouldEnlist(ctx)) {
-         TransactionXaAdapter xaAdapter = enlist(ctx);
+         xaAdapter = enlist(ctx);
          LocalTxInvocationContext localTxContext = (LocalTxInvocationContext) ctx;
          if (!isLocalModeForced(ctx)) {
-            xaAdapter.addModification(command);
+            shouldAddMod = true;
          }
          localTxContext.setXaCache(xaAdapter);
       }
       if (!ctx.isInTxScope())
          transactionLog.logNoTxWrite(command);
-      return invokeNextInterceptor(ctx, command);
+      Object rv = invokeNextInterceptor(ctx, command);
+      if (command.isSuccessful() && shouldAddMod) xaAdapter.addModification(command);
+      return rv;
    }
 
    public TransactionXaAdapter enlist(InvocationContext ctx) throws SystemException, RollbackException {

Modified: trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java	2009-05-13 16:48:27 UTC (rev 285)
+++ trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java	2009-05-13 22:57:00 UTC (rev 286)
@@ -23,10 +23,19 @@
 
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.KnownComponentNames;
+import org.infinispan.factories.annotations.ComponentName;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.remoting.rpc.CacheRpcManager;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 /**
  * Acts as a base for all RPC calls - subclassed by
  *
@@ -37,11 +46,15 @@
 public abstract class BaseRpcInterceptor extends CommandInterceptor {
 
    protected CacheRpcManager rpcManager;
+   protected ExecutorService asyncExecutorService;
 
    @Inject
-   public void init(CacheRpcManager rpcManager) {
+   public void init(CacheRpcManager rpcManager,
+                    @ComponentName(KnownComponentNames.ASYNC_SERIALIZATION_EXECUTOR) ExecutorService e) {
       this.rpcManager = rpcManager;
+      this.asyncExecutorService = e;
    }
+
    protected boolean defaultSynchronous;
 
    @Start
@@ -65,4 +78,35 @@
       }
       return false;
    }
+
+   protected final <X> Future<X> submitRpcCall(Callable<Object> c, final Object returnValue) {
+      final Future f = asyncExecutorService.submit(c);
+      return new Future<X>() {
+
+         public boolean cancel(boolean mayInterruptIfRunning) {
+            return f.cancel(mayInterruptIfRunning);
+         }
+
+         public boolean isCancelled() {
+            return f.isCancelled();
+         }
+
+         public boolean isDone() {
+            return f.isDone();
+         }
+
+         @SuppressWarnings("unchecked")
+         public X get() throws InterruptedException, ExecutionException {
+            f.get(); // wait for f to complete first
+            return (X) returnValue;
+         }
+
+         @SuppressWarnings("unchecked")
+         public X get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            f.get(timeout, unit);
+            return (X) returnValue;
+         }
+      };
+   }
+
 }
\ No newline at end of file

Added: trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java	2009-05-13 22:57:00 UTC (rev 286)
@@ -0,0 +1,117 @@
+package org.infinispan.api;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+ at Test(groups = "functional", testName = "api.AsyncAPITest")
+public class AsyncAPITest extends SingleCacheManagerTest {
+   Cache<String, String> c;
+
+   protected CacheManager createCacheManager() throws Exception {
+      CacheManager cm = TestCacheManagerFactory.createLocalCacheManager();
+      c = cm.getCache();
+      return cm;
+   }
+
+   public void testAsyncMethods() throws ExecutionException, InterruptedException {
+      // put
+      Future<String> f = c.putAsync("k", "v");
+      assert f != null;
+      assert f.isDone();
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert c.get("k").equals("v");
+
+      f = c.putAsync("k", "v2");
+      assert f != null;
+      assert f.isDone();
+      assert !f.isCancelled();
+      assert f.get().equals("v");
+      assert c.get("k").equals("v2");
+
+      // putAll
+      Future<Void> f2 = c.putAllAsync(Collections.singletonMap("k", "v3"));
+      assert f2 != null;
+      assert f2.isDone();
+      assert !f2.isCancelled();
+      assert f2.get() == null;
+      assert c.get("k").equals("v3");
+
+      // putIfAbsent
+      f = c.putIfAbsentAsync("k", "v4");
+      assert f != null;
+      assert f.isDone();
+      assert !f.isCancelled();
+      assert f.get().equals("v3");
+      assert c.get("k").equals("v3");
+
+      // remove
+      f = c.removeAsync("k");
+      assert f != null;
+      assert f.isDone();
+      assert !f.isCancelled();
+      assert f.get().equals("v3");
+      assert c.get("k") == null;
+
+      // putIfAbsent again
+      f = c.putIfAbsentAsync("k", "v4");
+      assert f != null;
+      assert f.isDone();
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert c.get("k").equals("v4");
+
+      // removecond
+      Future<Boolean> f3 = c.removeAsync("k", "v_nonexistent");
+      assert f3 != null;
+      assert f3.isDone();
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert c.get("k").equals("v4");
+
+      f3 = c.removeAsync("k", "v4");
+      assert f3 != null;
+      assert f3.isDone();
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      assert c.get("k") == null;
+
+      // replace
+      f = c.replaceAsync("k", "v5");
+      assert f != null;
+      assert f.isDone();
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert c.get("k") == null;
+
+      c.put("k", "v");
+      f = c.replaceAsync("k", "v5");
+      assert f != null;
+      assert f.isDone();
+      assert !f.isCancelled();
+      assert f.get().equals("v");
+      assert c.get("k").equals("v5");
+
+      //replace2
+      f3 = c.replaceAsync("k", "v_nonexistent", "v6");
+      assert f3 != null;
+      assert f3.isDone();
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert c.get("k").equals("v5");
+
+      f3 = c.replaceAsync("k", "v5", "v6");
+      assert f3 != null;
+      assert f3.isDone();
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      assert c.get("k").equals("v6");
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java	2009-05-13 22:57:00 UTC (rev 286)
@@ -0,0 +1,384 @@
+package org.infinispan.replication;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.ReclosableLatch;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+ at Test(groups = "functional", testName = "replication.AsyncAPISyncReplTest")
+public class AsyncAPISyncReplTest extends MultipleCacheManagersTest {
+
+   Cache<Key, String> c1, c2;
+
+   @SuppressWarnings("unchecked")
+   protected void createCacheManagers() throws Throwable {
+      Configuration c =
+            getDefaultClusteredConfig(sync() ? Configuration.CacheMode.REPL_SYNC : Configuration.CacheMode.REPL_ASYNC);
+      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      List<Cache<Key, String>> l = createClusteredCaches(2, getClass().getSimpleName(), c);
+      c1 = l.get(0);
+      c2 = l.get(1);
+   }
+
+   protected boolean sync() {
+      return true;
+   }
+
+   protected void asyncWait() {
+   }
+
+   private void assertOnAllCaches(Key k, String v) {
+      Object real;
+      assert Util.safeEquals((real = c1.get(k)), v) : "Error on cache 1.  Expected " + v + " and got " + real;
+      assert Util.safeEquals((real = c2.get(k)), v) : "Error on cache 2.  Expected " + v + " and got " + real;
+      ;
+   }
+
+   public void testAsyncMethods() throws ExecutionException, InterruptedException {
+
+      String v = "v";
+      String v2 = "v2";
+      String v3 = "v3";
+      String v4 = "v4";
+      String v5 = "v5";
+      String v6 = "v6";
+      String v_null = "v_nonexistent";
+      Key key = new Key("k", true);
+
+      // put
+      Future<String> f = c1.putAsync(key, v);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key) == null;
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert f.isDone();
+      assertOnAllCaches(key, v);
+
+      f = c1.putAsync(key, v2);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get().equals(v);
+      assert f.isDone();
+      assertOnAllCaches(key, v2);
+
+      // putAll
+      Future<Void> f2 = c1.putAllAsync(Collections.singletonMap(key, v3));
+      assert f2 != null;
+      assert !f2.isDone();
+      assert c2.get(key).equals(v2);
+      key.allowSerialization();
+      assert !f2.isCancelled();
+      assert f2.get() == null;
+      assert f2.isDone();
+      assertOnAllCaches(key, v3);
+
+      // putIfAbsent
+      f = c1.putIfAbsentAsync(key, v4);
+      assert f != null;
+      assert c2.get(key).equals(v3);
+      assert !f.isCancelled();
+      assert f.get().equals(v3);
+      assert f.isDone();
+      assertOnAllCaches(key, v3);
+
+      // remove
+      f = c1.removeAsync(key);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key).equals(v3);
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get().equals(v3);
+      assert f.isDone();
+      assertOnAllCaches(key, null);
+
+      // putIfAbsent again
+      f = c1.putIfAbsentAsync(key, v4);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key) == null;
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert f.isDone();
+      assertOnAllCaches(key, v4);
+
+      // removecond
+      Future<Boolean> f3 = c1.removeAsync(key, v_null);
+      assert f3 != null;
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert f3.isDone();
+      assertOnAllCaches(key, v4);
+
+      f3 = c1.removeAsync(key, v4);
+      assert f3 != null;
+      assert !f3.isDone();
+      assert c2.get(key).equals(v4);
+      key.allowSerialization();
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      assert f3.isDone();
+      assertOnAllCaches(key, null);
+
+      // replace
+      f = c1.replaceAsync(key, v5);
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert f.isDone();
+      assertOnAllCaches(key, null);
+
+      key.allowSerialization();
+      c1.put(key, v);
+
+      f = c1.replaceAsync(key, v5);
+      assert f != null;
+      assert !f.isDone();
+      assert c2.get(key).equals(v);
+      key.allowSerialization();
+      assert !f.isCancelled();
+      assert f.get().equals(v);
+      assert f.isDone();
+      assertOnAllCaches(key, v5);
+
+      //replace2
+      f3 = c1.replaceAsync(key, v_null, v6);
+      assert f3 != null;
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert f3.isDone();
+      assertOnAllCaches(key, v5);
+
+      f3 = c1.replaceAsync(key, v5, v6);
+      assert f3 != null;
+      assert !f3.isDone();
+      assert c2.get(key).equals(v5);
+      key.allowSerialization();
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      assert f3.isDone();
+      assertOnAllCaches(key, v6);
+   }
+
+   public void testAsyncTxMethods() throws Exception {
+
+      String v = "v";
+      String v2 = "v2";
+      String v3 = "v3";
+      String v4 = "v4";
+      String v5 = "v5";
+      String v6 = "v6";
+      String v_null = "v_nonexistent";
+      Key key = new Key("k", false);
+      TransactionManager tm = TestingUtil.getTransactionManager(c1);
+
+      // put
+      tm.begin();
+      Future<String> f = c1.putAsync(key, v);
+      assert f != null;
+      assert f.isDone();
+      assert c2.get(key) == null;
+      assert f.get() == null;
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v);
+
+      tm.begin();
+      f = c1.putAsync(key, v2);
+      assert f != null;
+      assert f.isDone();
+      assert c2.get(key).equals(v);
+      assert !f.isCancelled();
+      assert f.get().equals(v);
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v2);
+
+      // putAll
+      tm.begin();
+      Future<Void> f2 = c1.putAllAsync(Collections.singletonMap(key, v3));
+      assert f2 != null;
+      assert f2.isDone();
+      assert c2.get(key).equals(v2);
+      assert !f2.isCancelled();
+      assert f2.get() == null;
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v3);
+
+      // putIfAbsent
+      tm.begin();
+      f = c1.putIfAbsentAsync(key, v4);
+      assert f != null;
+      assert f.isDone();
+      assert c2.get(key).equals(v3);
+      assert !f.isCancelled();
+      assert f.get().equals(v3);
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v3);
+
+      // remove
+      tm.begin();
+      f = c1.removeAsync(key);
+      assert f != null;
+      assert f.isDone();
+      assert c2.get(key).equals(v3);
+      assert !f.isCancelled();
+      assert f.get().equals(v3);
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, null);
+
+      // putIfAbsent again
+      tm.begin();
+      f = c1.putIfAbsentAsync(key, v4);
+      assert f != null;
+      assert f.isDone();
+      assert c2.get(key) == null;
+      assert !f.isCancelled();
+      assert f.get() == null;
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v4);
+
+      // removecond
+      tm.begin();
+      Future<Boolean> f3 = c1.removeAsync(key, v_null);
+      assert f3 != null;
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert f3.isDone();
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v4);
+
+      tm.begin();
+      f3 = c1.removeAsync(key, v4);
+      assert f3 != null;
+      assert f3.isDone();
+      assert c2.get(key).equals(v4);
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, null);
+
+      // replace
+      tm.begin();
+      f = c1.replaceAsync(key, v5);
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert f.isDone();
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, null);
+
+      c1.put(key, v);
+
+      tm.begin();
+      f = c1.replaceAsync(key, v5);
+      assert f != null;
+      assert f.isDone();
+      assert c2.get(key).equals(v);
+      assert !f.isCancelled();
+      assert f.get().equals(v);
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v5);
+
+      //replace2
+      tm.begin();
+      f3 = c1.replaceAsync(key, v_null, v6);
+      assert f3 != null;
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert f3.isDone();
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v5);
+
+      tm.begin();
+      f3 = c1.replaceAsync(key, v5, v6);
+      assert f3 != null;
+      assert f3.isDone();
+      assert c2.get(key).equals(v5);
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      tm.commit();
+      asyncWait();
+      assertOnAllCaches(key, v6);
+   }
+
+   protected static class Key implements Externalizable {
+      String value;
+      ReclosableLatch latch = new ReclosableLatch(false);
+      boolean lockable;
+
+      public Key() {
+      }
+
+      public Key(String value, boolean lockable) {
+         this.value = value;
+         this.lockable = lockable;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+
+         Key k1 = (Key) o;
+
+         if (value != null ? !value.equals(k1.value) : k1.value != null) return false;
+
+         return true;
+      }
+
+      @Override
+      public int hashCode() {
+         return value != null ? value.hashCode() : 0;
+      }
+
+      public void writeExternal(ObjectOutput out) throws IOException {
+         out.writeObject(value);
+         if (lockable) {
+            try {
+               latch.await();
+            } catch (InterruptedException e) {
+               e.printStackTrace();
+            }
+            latch.close();
+         }
+      }
+
+      public void allowSerialization() {
+         if (lockable) latch.open();
+      }
+
+      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+         value = (String) in.readObject();
+      }
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF




More information about the infinispan-commits mailing list