[infinispan-commits] Infinispan SVN: r286 - in trunk/core/src: main/java/org/infinispan/interceptors and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed May 13 18:57:00 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-05-13 18:57:00 -0400 (Wed, 13 May 2009)
New Revision: 286
Added:
trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java
trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
Modified:
trunk/core/src/main/java/org/infinispan/Cache.java
trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
Log:
[ISPN-72] (Asynchronous Cache API) impl and basic tests
Modified: trunk/core/src/main/java/org/infinispan/Cache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/Cache.java 2009-05-13 16:48:27 UTC (rev 285)
+++ trunk/core/src/main/java/org/infinispan/Cache.java 2009-05-13 22:57:00 UTC (rev 286)
@@ -72,6 +72,9 @@
* especially advantageous if the cache uses distribution and the three keys map to different cache instances in the
* cluster.
* <p/>
+ * Also, the use of async operations when within a transaction return your local value only, as expected. A Future is
+ * still returned though for API consistency.
+ * <p/>
* <h3>Constructing a Cache</h3> An instance of the Cache is usually obtained by using a {@link CacheManager}.
* <pre>
* CacheManager cm = new DefaultCacheManager(); // optionally pass in a default configuration
Modified: trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java 2009-05-13 16:48:27 UTC (rev 285)
+++ trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java 2009-05-13 22:57:00 UTC (rev 286)
@@ -36,6 +36,8 @@
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
+import java.util.concurrent.Callable;
+
/**
* Takes care of replicating modifications to other caches in a cluster. Also listens for prepare(), commit() and
* rollback() messages which are received 'side-ways' (see docs/design/Refactoring.txt).
@@ -44,7 +46,6 @@
* @since 4.0
*/
public class ReplicationInterceptor extends BaseRpcInterceptor {
-
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
if (!ctx.isInTxScope()) throw new IllegalStateException("This should not be possible!");
@@ -73,7 +74,7 @@
return retVal;
}
-
+
@Override
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
if (ctx.isOriginLocal() && !configuration.isOnePhaseCommit()) {
@@ -111,11 +112,20 @@
* If we are within one transaction we won't do any replication as replication would only be performed at commit
* time. If the operation didn't originate locally we won't do any replication either.
*/
- private Object handleCrudMethod(InvocationContext ctx, WriteCommand command) throws Throwable {
+ private Object handleCrudMethod(final InvocationContext ctx, final WriteCommand command) throws Throwable {
// FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally do we attempt to replicate.
- Object returnValue = invokeNextInterceptor(ctx, command);
+ final Object returnValue = invokeNextInterceptor(ctx, command);
if (!isLocalModeForced(ctx) && command.isSuccessful() && ctx.isOriginLocal() && !ctx.isInTxScope()) {
- rpcManager.broadcastReplicableCommand(command, isSynchronous(ctx));
+ if (ctx.isUseFutureReturnType()) {
+ return submitRpcCall(new Callable<Object>() {
+ public Object call() throws Exception {
+ rpcManager.broadcastReplicableCommand(command, true);
+ return null;
+ }
+ }, returnValue);
+ } else {
+ rpcManager.broadcastReplicableCommand(command, isSynchronous(ctx));
+ }
}
return returnValue;
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java 2009-05-13 16:48:27 UTC (rev 285)
+++ trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java 2009-05-13 22:57:00 UTC (rev 286)
@@ -24,8 +24,8 @@
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.transaction.TransactionLog;
+import org.infinispan.transaction.xa.TransactionTable;
import org.infinispan.transaction.xa.TransactionXaAdapter;
-import org.infinispan.transaction.xa.TransactionTable;
import javax.transaction.RollbackException;
import javax.transaction.Status;
@@ -36,13 +36,13 @@
import java.util.concurrent.atomic.AtomicLong;
/**
- * Interceptor in charge with handling transaction related operations, e.g enlisting cache as an transaction participant,
- * propagating remotely initiated changes.
+ * Interceptor in charge with handling transaction related operations, e.g enlisting cache as an transaction
+ * participant, propagating remotely initiated changes.
*
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author Mircea.Markus at jboss.com
+ * @see org.infinispan.transaction.xa.TransactionXaAdapter
* @since 4.0
- * @see org.infinispan.transaction.xa.TransactionXaAdapter
*/
public class TxInterceptor extends CommandInterceptor {
@@ -170,17 +170,21 @@
}
private Object enlistWriteAndInvokeNext(InvocationContext ctx, WriteCommand command) throws Throwable {
+ TransactionXaAdapter xaAdapter = null;
+ boolean shouldAddMod = false;
if (shouldEnlist(ctx)) {
- TransactionXaAdapter xaAdapter = enlist(ctx);
+ xaAdapter = enlist(ctx);
LocalTxInvocationContext localTxContext = (LocalTxInvocationContext) ctx;
if (!isLocalModeForced(ctx)) {
- xaAdapter.addModification(command);
+ shouldAddMod = true;
}
localTxContext.setXaCache(xaAdapter);
}
if (!ctx.isInTxScope())
transactionLog.logNoTxWrite(command);
- return invokeNextInterceptor(ctx, command);
+ Object rv = invokeNextInterceptor(ctx, command);
+ if (command.isSuccessful() && shouldAddMod) xaAdapter.addModification(command);
+ return rv;
}
public TransactionXaAdapter enlist(InvocationContext ctx) throws SystemException, RollbackException {
Modified: trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2009-05-13 16:48:27 UTC (rev 285)
+++ trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2009-05-13 22:57:00 UTC (rev 286)
@@ -23,10 +23,19 @@
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.KnownComponentNames;
+import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.rpc.CacheRpcManager;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
/**
* Acts as a base for all RPC calls - subclassed by
*
@@ -37,11 +46,15 @@
public abstract class BaseRpcInterceptor extends CommandInterceptor {
protected CacheRpcManager rpcManager;
+ protected ExecutorService asyncExecutorService;
@Inject
- public void init(CacheRpcManager rpcManager) {
+ public void init(CacheRpcManager rpcManager,
+ @ComponentName(KnownComponentNames.ASYNC_SERIALIZATION_EXECUTOR) ExecutorService e) {
this.rpcManager = rpcManager;
+ this.asyncExecutorService = e;
}
+
protected boolean defaultSynchronous;
@Start
@@ -65,4 +78,35 @@
}
return false;
}
+
+ protected final <X> Future<X> submitRpcCall(Callable<Object> c, final Object returnValue) {
+ final Future f = asyncExecutorService.submit(c);
+ return new Future<X>() {
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return f.cancel(mayInterruptIfRunning);
+ }
+
+ public boolean isCancelled() {
+ return f.isCancelled();
+ }
+
+ public boolean isDone() {
+ return f.isDone();
+ }
+
+ @SuppressWarnings("unchecked")
+ public X get() throws InterruptedException, ExecutionException {
+ f.get(); // wait for f to complete first
+ return (X) returnValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ public X get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ f.get(timeout, unit);
+ return (X) returnValue;
+ }
+ };
+ }
+
}
\ No newline at end of file
Added: trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java 2009-05-13 22:57:00 UTC (rev 286)
@@ -0,0 +1,117 @@
+package org.infinispan.api;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+ at Test(groups = "functional", testName = "api.AsyncAPITest")
+public class AsyncAPITest extends SingleCacheManagerTest {
+ Cache<String, String> c;
+
+ protected CacheManager createCacheManager() throws Exception {
+ CacheManager cm = TestCacheManagerFactory.createLocalCacheManager();
+ c = cm.getCache();
+ return cm;
+ }
+
+ public void testAsyncMethods() throws ExecutionException, InterruptedException {
+ // put
+ Future<String> f = c.putAsync("k", "v");
+ assert f != null;
+ assert f.isDone();
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert c.get("k").equals("v");
+
+ f = c.putAsync("k", "v2");
+ assert f != null;
+ assert f.isDone();
+ assert !f.isCancelled();
+ assert f.get().equals("v");
+ assert c.get("k").equals("v2");
+
+ // putAll
+ Future<Void> f2 = c.putAllAsync(Collections.singletonMap("k", "v3"));
+ assert f2 != null;
+ assert f2.isDone();
+ assert !f2.isCancelled();
+ assert f2.get() == null;
+ assert c.get("k").equals("v3");
+
+ // putIfAbsent
+ f = c.putIfAbsentAsync("k", "v4");
+ assert f != null;
+ assert f.isDone();
+ assert !f.isCancelled();
+ assert f.get().equals("v3");
+ assert c.get("k").equals("v3");
+
+ // remove
+ f = c.removeAsync("k");
+ assert f != null;
+ assert f.isDone();
+ assert !f.isCancelled();
+ assert f.get().equals("v3");
+ assert c.get("k") == null;
+
+ // putIfAbsent again
+ f = c.putIfAbsentAsync("k", "v4");
+ assert f != null;
+ assert f.isDone();
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert c.get("k").equals("v4");
+
+ // removecond
+ Future<Boolean> f3 = c.removeAsync("k", "v_nonexistent");
+ assert f3 != null;
+ assert f3.isDone();
+ assert !f3.isCancelled();
+ assert f3.get().equals(false);
+ assert c.get("k").equals("v4");
+
+ f3 = c.removeAsync("k", "v4");
+ assert f3 != null;
+ assert f3.isDone();
+ assert !f3.isCancelled();
+ assert f3.get().equals(true);
+ assert c.get("k") == null;
+
+ // replace
+ f = c.replaceAsync("k", "v5");
+ assert f != null;
+ assert f.isDone();
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert c.get("k") == null;
+
+ c.put("k", "v");
+ f = c.replaceAsync("k", "v5");
+ assert f != null;
+ assert f.isDone();
+ assert !f.isCancelled();
+ assert f.get().equals("v");
+ assert c.get("k").equals("v5");
+
+ //replace2
+ f3 = c.replaceAsync("k", "v_nonexistent", "v6");
+ assert f3 != null;
+ assert f3.isDone();
+ assert !f3.isCancelled();
+ assert f3.get().equals(false);
+ assert c.get("k").equals("v5");
+
+ f3 = c.replaceAsync("k", "v5", "v6");
+ assert f3 != null;
+ assert f3.isDone();
+ assert !f3.isCancelled();
+ assert f3.get().equals(true);
+ assert c.get("k").equals("v6");
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/api/AsyncAPITest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java 2009-05-13 22:57:00 UTC (rev 286)
@@ -0,0 +1,384 @@
+package org.infinispan.replication;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.ReclosableLatch;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+ at Test(groups = "functional", testName = "replication.AsyncAPISyncReplTest")
+public class AsyncAPISyncReplTest extends MultipleCacheManagersTest {
+
+ Cache<Key, String> c1, c2;
+
+ @SuppressWarnings("unchecked")
+ protected void createCacheManagers() throws Throwable {
+ Configuration c =
+ getDefaultClusteredConfig(sync() ? Configuration.CacheMode.REPL_SYNC : Configuration.CacheMode.REPL_ASYNC);
+ c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ List<Cache<Key, String>> l = createClusteredCaches(2, getClass().getSimpleName(), c);
+ c1 = l.get(0);
+ c2 = l.get(1);
+ }
+
+ protected boolean sync() {
+ return true;
+ }
+
+ protected void asyncWait() {
+ }
+
+ private void assertOnAllCaches(Key k, String v) {
+ Object real;
+ assert Util.safeEquals((real = c1.get(k)), v) : "Error on cache 1. Expected " + v + " and got " + real;
+ assert Util.safeEquals((real = c2.get(k)), v) : "Error on cache 2. Expected " + v + " and got " + real;
+ ;
+ }
+
+ public void testAsyncMethods() throws ExecutionException, InterruptedException {
+
+ String v = "v";
+ String v2 = "v2";
+ String v3 = "v3";
+ String v4 = "v4";
+ String v5 = "v5";
+ String v6 = "v6";
+ String v_null = "v_nonexistent";
+ Key key = new Key("k", true);
+
+ // put
+ Future<String> f = c1.putAsync(key, v);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key) == null;
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert f.isDone();
+ assertOnAllCaches(key, v);
+
+ f = c1.putAsync(key, v2);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get().equals(v);
+ assert f.isDone();
+ assertOnAllCaches(key, v2);
+
+ // putAll
+ Future<Void> f2 = c1.putAllAsync(Collections.singletonMap(key, v3));
+ assert f2 != null;
+ assert !f2.isDone();
+ assert c2.get(key).equals(v2);
+ key.allowSerialization();
+ assert !f2.isCancelled();
+ assert f2.get() == null;
+ assert f2.isDone();
+ assertOnAllCaches(key, v3);
+
+ // putIfAbsent
+ f = c1.putIfAbsentAsync(key, v4);
+ assert f != null;
+ assert c2.get(key).equals(v3);
+ assert !f.isCancelled();
+ assert f.get().equals(v3);
+ assert f.isDone();
+ assertOnAllCaches(key, v3);
+
+ // remove
+ f = c1.removeAsync(key);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key).equals(v3);
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get().equals(v3);
+ assert f.isDone();
+ assertOnAllCaches(key, null);
+
+ // putIfAbsent again
+ f = c1.putIfAbsentAsync(key, v4);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key) == null;
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert f.isDone();
+ assertOnAllCaches(key, v4);
+
+ // removecond
+ Future<Boolean> f3 = c1.removeAsync(key, v_null);
+ assert f3 != null;
+ assert !f3.isCancelled();
+ assert f3.get().equals(false);
+ assert f3.isDone();
+ assertOnAllCaches(key, v4);
+
+ f3 = c1.removeAsync(key, v4);
+ assert f3 != null;
+ assert !f3.isDone();
+ assert c2.get(key).equals(v4);
+ key.allowSerialization();
+ assert !f3.isCancelled();
+ assert f3.get().equals(true);
+ assert f3.isDone();
+ assertOnAllCaches(key, null);
+
+ // replace
+ f = c1.replaceAsync(key, v5);
+ assert f != null;
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert f.isDone();
+ assertOnAllCaches(key, null);
+
+ key.allowSerialization();
+ c1.put(key, v);
+
+ f = c1.replaceAsync(key, v5);
+ assert f != null;
+ assert !f.isDone();
+ assert c2.get(key).equals(v);
+ key.allowSerialization();
+ assert !f.isCancelled();
+ assert f.get().equals(v);
+ assert f.isDone();
+ assertOnAllCaches(key, v5);
+
+ //replace2
+ f3 = c1.replaceAsync(key, v_null, v6);
+ assert f3 != null;
+ assert !f3.isCancelled();
+ assert f3.get().equals(false);
+ assert f3.isDone();
+ assertOnAllCaches(key, v5);
+
+ f3 = c1.replaceAsync(key, v5, v6);
+ assert f3 != null;
+ assert !f3.isDone();
+ assert c2.get(key).equals(v5);
+ key.allowSerialization();
+ assert !f3.isCancelled();
+ assert f3.get().equals(true);
+ assert f3.isDone();
+ assertOnAllCaches(key, v6);
+ }
+
+ public void testAsyncTxMethods() throws Exception {
+
+ String v = "v";
+ String v2 = "v2";
+ String v3 = "v3";
+ String v4 = "v4";
+ String v5 = "v5";
+ String v6 = "v6";
+ String v_null = "v_nonexistent";
+ Key key = new Key("k", false);
+ TransactionManager tm = TestingUtil.getTransactionManager(c1);
+
+ // put
+ tm.begin();
+ Future<String> f = c1.putAsync(key, v);
+ assert f != null;
+ assert f.isDone();
+ assert c2.get(key) == null;
+ assert f.get() == null;
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v);
+
+ tm.begin();
+ f = c1.putAsync(key, v2);
+ assert f != null;
+ assert f.isDone();
+ assert c2.get(key).equals(v);
+ assert !f.isCancelled();
+ assert f.get().equals(v);
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v2);
+
+ // putAll
+ tm.begin();
+ Future<Void> f2 = c1.putAllAsync(Collections.singletonMap(key, v3));
+ assert f2 != null;
+ assert f2.isDone();
+ assert c2.get(key).equals(v2);
+ assert !f2.isCancelled();
+ assert f2.get() == null;
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v3);
+
+ // putIfAbsent
+ tm.begin();
+ f = c1.putIfAbsentAsync(key, v4);
+ assert f != null;
+ assert f.isDone();
+ assert c2.get(key).equals(v3);
+ assert !f.isCancelled();
+ assert f.get().equals(v3);
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v3);
+
+ // remove
+ tm.begin();
+ f = c1.removeAsync(key);
+ assert f != null;
+ assert f.isDone();
+ assert c2.get(key).equals(v3);
+ assert !f.isCancelled();
+ assert f.get().equals(v3);
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, null);
+
+ // putIfAbsent again
+ tm.begin();
+ f = c1.putIfAbsentAsync(key, v4);
+ assert f != null;
+ assert f.isDone();
+ assert c2.get(key) == null;
+ assert !f.isCancelled();
+ assert f.get() == null;
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v4);
+
+ // removecond
+ tm.begin();
+ Future<Boolean> f3 = c1.removeAsync(key, v_null);
+ assert f3 != null;
+ assert !f3.isCancelled();
+ assert f3.get().equals(false);
+ assert f3.isDone();
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v4);
+
+ tm.begin();
+ f3 = c1.removeAsync(key, v4);
+ assert f3 != null;
+ assert f3.isDone();
+ assert c2.get(key).equals(v4);
+ assert !f3.isCancelled();
+ assert f3.get().equals(true);
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, null);
+
+ // replace
+ tm.begin();
+ f = c1.replaceAsync(key, v5);
+ assert f != null;
+ assert !f.isCancelled();
+ assert f.get() == null;
+ assert f.isDone();
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, null);
+
+ c1.put(key, v);
+
+ tm.begin();
+ f = c1.replaceAsync(key, v5);
+ assert f != null;
+ assert f.isDone();
+ assert c2.get(key).equals(v);
+ assert !f.isCancelled();
+ assert f.get().equals(v);
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v5);
+
+ //replace2
+ tm.begin();
+ f3 = c1.replaceAsync(key, v_null, v6);
+ assert f3 != null;
+ assert !f3.isCancelled();
+ assert f3.get().equals(false);
+ assert f3.isDone();
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v5);
+
+ tm.begin();
+ f3 = c1.replaceAsync(key, v5, v6);
+ assert f3 != null;
+ assert f3.isDone();
+ assert c2.get(key).equals(v5);
+ assert !f3.isCancelled();
+ assert f3.get().equals(true);
+ tm.commit();
+ asyncWait();
+ assertOnAllCaches(key, v6);
+ }
+
+ protected static class Key implements Externalizable {
+ String value;
+ ReclosableLatch latch = new ReclosableLatch(false);
+ boolean lockable;
+
+ public Key() {
+ }
+
+ public Key(String value, boolean lockable) {
+ this.value = value;
+ this.lockable = lockable;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Key k1 = (Key) o;
+
+ if (value != null ? !value.equals(k1.value) : k1.value != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return value != null ? value.hashCode() : 0;
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(value);
+ if (lockable) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ latch.close();
+ }
+ }
+
+ public void allowSerialization() {
+ if (lockable) latch.open();
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ value = (String) in.readObject();
+ }
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/replication/AsyncAPISyncReplTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list