Author: mircea.markus
Date: 2008-10-16 17:12:47 -0400 (Thu, 16 Oct 2008)
New Revision: 6974
Added:
core/branches/flat/src/test/java/org/jboss/starobrno/replication/
core/branches/flat/src/test/java/org/jboss/starobrno/replication/AsyncReplTest.java
core/branches/flat/src/test/java/org/jboss/starobrno/replication/ExceptionTest.java
core/branches/flat/src/test/java/org/jboss/starobrno/replication/ReplicationExceptionTest.java
core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncCacheListenerTest.java
core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncReplTest.java
core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CallInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
Log:
added replication tests
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-16
06:10:36 UTC (rev 6973)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -74,7 +74,6 @@
protected TransactionManager transactionManager;
protected RPCManager rpcManager;
-
@Inject
private void injectDependencies(InvocationContextContainer
invocationContextContainer,
CommandsFactory commandsFactory,
@@ -252,47 +251,47 @@
public List<CommandInterceptor> getInterceptorChain()
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public Marshaller getMarshaller()
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public void addInterceptor(CommandInterceptor i, int position)
{
- //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public void addInterceptor(CommandInterceptor i, Class<? extends
CommandInterceptor> afterInterceptor)
{
- //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public void removeInterceptor(int position)
{
- //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public void removeInterceptor(Class<? extends CommandInterceptor>
interceptorType)
{
- //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public CacheLoaderManager getCacheLoaderManager()
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public BuddyManager getBuddyManager()
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public TransactionTable getTransactionTable()
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public RPCManager getRPCManager()
@@ -302,32 +301,32 @@
public StateTransferManager getStateTransferManager()
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public Notifier getNotifier()
{
- return null; //TODO: Autogenerated. Implement me properly
+ return notifier;
}
public String getClusterName()
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public GlobalTransaction getCurrentTransaction(Transaction tx, boolean
createIfNotExists)
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public GlobalTransaction getCurrentTransaction()
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public GravitateResult gravitateData(K key, boolean searchBuddyBackupSubtrees,
InvocationContext ctx)
{
- return null; //TODO: Autogenerated. Implement me properly
+ throw new IllegalStateException();//todo Implement me properly
}
public ComponentRegistry getComponentRegistry()
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CallInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CallInterceptor.java 2008-10-16
06:10:36 UTC (rev 6973)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CallInterceptor.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -50,14 +50,6 @@
*/
public class CallInterceptor extends CommandInterceptor
{
- private boolean notOptimisticLocking;
-
- @Start
- protected void start()
- {
- notOptimisticLocking = false;
- }
-
@Override
public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
{
@@ -135,7 +127,7 @@
throws Throwable
{
Object result = invokeCommand(ctx, command);
- if (notOptimisticLocking && ctx.isValidTransaction())
+ if (ctx.isValidTransaction())
{
GlobalTransaction gtx = ctx.getGlobalTransaction();
if (gtx == null)
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java 2008-10-16
06:10:36 UTC (rev 6973)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -40,7 +40,6 @@
/**
* Acts as a base for all RPC calls - subclassed by
- * {@link org.jboss.cache.interceptors.ReplicationInterceptor} and {@link
OptimisticReplicationInterceptor}.
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
*/
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java 2008-10-16
06:10:36 UTC (rev 6973)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -287,7 +287,8 @@
// JGroups supports
http://jira.jboss.com/jira/browse/JGRP-193
// the serialization problem could be on the remote end and this is why we
cannot catch this above, when marshalling.
if (retval == null)
- throw new NotSerializableException("RpcDispatcher returned a null. This
is most often caused by args for " + command.getClass().getSimpleName() + " not
being serializable.");
+ throw new NotSerializableException("RpcDispatcher returned a null. This
is most often caused by args for "
+ + command.getClass().getSimpleName() + " not being
serializable.");
return retval;
}
}
Added:
core/branches/flat/src/test/java/org/jboss/starobrno/replication/AsyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/replication/AsyncReplTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/replication/AsyncReplTest.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -0,0 +1,232 @@
+/*
+ *
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at
gnu.org.
+ */
+
+package org.jboss.starobrno.replication;
+
+import org.jboss.cache.Fqn;
+import org.jboss.cache.transaction.DummyTransactionManagerLookup;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.util.internals.ReplicationListener;
+import org.jboss.starobrno.util.TestingUtil;
+
+/**
+ * Unit test for replicated async CacheSPI. Use locking and multiple threads to test
+ * concurrent access to the tree.
+ */
+@Test(groups = {"functional", "jgroups"})
+public class AsyncReplTest
+{
+ private Configuration configuration;
+
+ private class AsyncReplTestTL {
+ private CacheSPI<Object, Object> cache1, cache2;
+ private CacheManager cacheManager1, cacheManager2;
+ private ReplicationListener replListener1, replListener2;
+ }
+
+ private ThreadLocal<AsyncReplTestTL> threadLocal = new
ThreadLocal<AsyncReplTestTL>();
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception
+ {
+ AsyncReplTestTL tl = new AsyncReplTestTL();
+ threadLocal.set(tl);
+
+ configuration = new Configuration();
+ configuration.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+
configuration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+
+ log("creating cache1");
+ tl.cacheManager1 = new CacheManager(configuration);
+ tl.cache1 = (CacheSPI) tl.cacheManager1.createCache("testCache");
+ tl.replListener1 = new ReplicationListener(tl.cache1);
+
+ log("creating cache2");
+ tl.cacheManager2 = new CacheManager(configuration);
+ tl.cache2 = (CacheSPI) tl.cacheManager2.createCache("testCache");
+ tl.replListener2 = new ReplicationListener(tl.cache2);
+ }
+
+ /**
+ * Provides a hook for multiplexer integration. This default implementation
+ * is a no-op; subclasses that test mux integration would override
+ * to integrate the given cache with a multiplexer.
+ * <p/>
+ * param cache a cache that has been configured but not yet created.
+ */
+ protected void configureMultiplexer(Cache cache) throws Exception
+ {
+ // default does nothing
+ }
+
+ /**
+ * Provides a hook to check that the cache's channel came from the
+ * multiplexer, or not, as expected. This default impl asserts that
+ * the channel did not come from the multiplexer.
+ *
+ * @param cache a cache that has already been started
+ */
+ protected void validateMultiplexer(Cache cache)
+ {
+ assertFalse("Cache is not using multiplexer",
cache.getConfiguration().isUsingMultiplexer());
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception
+ {
+ AsyncReplTestTL tl = threadLocal.get();
+ TestingUtil.killCaches(tl.cache1, tl.cache2);
+ threadLocal.set(null);
+ }
+
+ public void testTxCompletion() throws Exception
+ {
+ AsyncReplTestTL tl = threadLocal.get();
+ CacheSPI<Object, Object> cache1 = tl.cache1;
+ CacheSPI<Object, Object> cache2 = tl.cache2;
+ ReplicationListener replListener1 = tl.replListener1;
+ ReplicationListener replListener2 = tl.replListener2;
+
+ String key = "key";
+
+ replListener2.expectAny();
+ cache1.put(key, "value1");
+ // allow for replication
+ replListener2.waitForReplicationToOccur(500);
+ assertEquals("value1", cache1.get( key));
+ assertEquals("value1", cache2.get(key));
+
+ TransactionManager mgr = cache1.getTransactionManager();
+ mgr.begin();
+
+ replListener2.expectAny();
+ cache1.put(key, "value2");
+ assertEquals("value2", cache1.get(key));
+ assertEquals("value1", cache2.get(key));
+
+ mgr.commit();
+
+ replListener2.waitForReplicationToOccur(500);
+
+ assertEquals("value2", cache1.get(key));
+ assertEquals("value2", cache2.get(key));
+
+ mgr.begin();
+ cache1.put(key, "value3");
+ assertEquals("value3", cache1.get(key));
+ assertEquals("value2", cache2.get(key));
+
+ mgr.rollback();
+
+ assertEquals("value2", cache1.get(key));
+ assertEquals("value2", cache2.get(key));
+ }
+
+ public void testPutShouldNotReplicateToDifferentCluster()
+ {
+ AsyncReplTestTL tl = threadLocal.get();
+ CacheSPI<Object, Object> cache1 = tl.cache1;
+ CacheSPI<Object, Object> cache2 = tl.cache2;
+ ReplicationListener replListener1 = tl.replListener1;
+ ReplicationListener replListener2 = tl.replListener2;
+
+ CacheSPI<Object, Object> cache3 = null, cache4 = null;
+ try
+ {
+ configuration.setClusterName("otherTest");
+ cache3 = (CacheSPI<Object, Object>) new
CacheManager(configuration).createCache("testCache");
+ cache4 = (CacheSPI<Object, Object>) new
CacheManager(configuration).createCache("testCache");
+ replListener2.expectAny();
+ cache1.put("age", 38);
+ // because we use async repl, modfication may not yet have been propagated to
cache2, so
+ // we have to wait a little
+ replListener2.waitForReplicationToOccur(500);
+ assertNull("Should not have replicated",
cache3.get("age"));
+ }
+ catch (Exception e)
+ {
+ fail(e.toString());
+ }
+ finally
+ {
+ if (cache3 != null)
+ {
+ cache3.stop();
+ }
+ if (cache4 != null)
+ {
+ cache4.stop();
+ }
+ }
+ }
+
+ public void testAsyncReplDelay()
+ {
+ Integer age;
+ AsyncReplTestTL tl = threadLocal.get();
+ CacheSPI<Object, Object> cache1 = tl.cache1;
+ CacheSPI<Object, Object> cache2 = tl.cache2;
+ ReplicationListener replListener1 = tl.replListener1;
+ ReplicationListener replListener2 = tl.replListener2;
+
+ try
+ {
+ cache1.put("age", 38);
+
+ // value on cache2 may be 38 or not yet replicated
+ age = (Integer) cache2.get("age");
+ log("attr \"age\" of \"/a/b/c\" on cache2=" +
age);
+ assertTrue("should be either null or 38", age == null || age == 38);
+ }
+ catch (Exception e)
+ {
+ fail(e.toString());
+ }
+ }
+
+ public void testAsyncReplTxDelay()
+ {
+ Integer age;
+ AsyncReplTestTL tl = threadLocal.get();
+ CacheSPI<Object, Object> cache1 = tl.cache1;
+ CacheSPI<Object, Object> cache2 = tl.cache2;
+ ReplicationListener replListener1 = tl.replListener1;
+ ReplicationListener replListener2 = tl.replListener2;
+
+ try
+ {
+ TransactionManager tm = cache1.getTransactionManager();
+ tm.begin();
+ cache1.put( "age", 38);
+ tm.commit();
+
+ // value on cache2 may be 38 or not yet replicated
+ age = (Integer) cache2.get("age");
+ log("attr \"age\" of \"/a/b/c\" on cache2=" +
age);
+ assertTrue("should be either null or 38", age == null || age == 38);
+ }
+ catch (Exception e)
+ {
+ fail(e.toString());
+ }
+ }
+
+ private void log(String msg)
+ {
+ System.out.println("-- [" + Thread.currentThread() + "]: " +
msg);
+ }
+}
Added:
core/branches/flat/src/test/java/org/jboss/starobrno/replication/ExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/replication/ExceptionTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/replication/ExceptionTest.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -0,0 +1,98 @@
+package org.jboss.starobrno.replication;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.starobrno.lock.TimeoutException;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.cache.transaction.DummyTransactionManagerLookup;
+
+/**
+ * Tests the type of exceptions thrown for Lock Acquisition Timeouts versus Sync Repl
Timeouts
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
+ */
+@Test(groups = {"functional"}, sequential = true)
+public class ExceptionTest
+{
+ private Cache cache1;
+ private Cache cache2;
+
+ private static String DELAYED_CLUSTER_CONFIG =
+ "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
+ " mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
+ "PING(timeout=1000;num_initial_members=2):" +
+ "MERGE2(min_interval=5000;max_interval=10000):" +
+ "FD_SOCK:" +
+ "VERIFY_SUSPECT(timeout=1500):" +
+
"pbcast.NAKACK(gc_lag=50;max_xmit_size=8192;retransmit_timeout=600,1200,2400,4800):"
+
+ "UNICAST(timeout=600,1200,2400,4800):" +
+ "pbcast.STABLE(desired_avg_gossip=20000):" +
+ "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
+ "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
+ "shun=false;print_local_addr=true):" +
+ "pbcast.STATE_TRANSFER:" +
+ "DELAY(in_delay=100;out_delay=100)";
+
+ private Cache createCache(String jgroupsConfig)
+ {
+ Configuration c = new Configuration();
+ c.setSyncCommitPhase(true);
+ c.setSyncRollbackPhase(true);
+ c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ if (jgroupsConfig != null) c.setClusterConfig(jgroupsConfig);
+ c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ Cache cache = new CacheManager(c).createCache("testCache");
+ return cache;
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ TestingUtil.killCaches(cache1, cache2);
+ cache1 = null;
+ cache2 = null;
+ }
+
+ @Test(groups = {"functional"}, expectedExceptions =
{TimeoutException.class})
+ public void testSyncReplTimeout()
+ {
+ cache1 = createCache(DELAYED_CLUSTER_CONFIG);
+ cache2 = createCache(DELAYED_CLUSTER_CONFIG);
+
+ cache1.getConfiguration().setSyncReplTimeout(1); // 1ms. this is *bound* to fail.
+ cache2.getConfiguration().setSyncReplTimeout(1);
+
+ cache1.start();
+ cache2.start();
+
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+ cache1.put("k", "v");
+ }
+
+ @Test(groups = {"functional"}, expectedExceptions =
{TimeoutException.class})
+ public void testLockAcquisitionTimeout() throws Exception
+ {
+ cache1 = createCache(null);
+ cache2 = createCache(null);
+ cache2.getConfiguration().setLockAcquisitionTimeout(1);
+
+ cache1.start();
+ cache2.start();
+
+ TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+
+ // get a lock on cache 2 and hold on to it.
+ TransactionManager tm =
cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+ tm.begin();
+ cache2.put("block", "block");
+ Transaction t = tm.suspend();
+ cache1.put("block", "v");
+ }
+}
Added:
core/branches/flat/src/test/java/org/jboss/starobrno/replication/ReplicationExceptionTest.java
===================================================================
---
core/branches/flat/src/test/java/org/jboss/starobrno/replication/ReplicationExceptionTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/replication/ReplicationExceptionTest.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -0,0 +1,175 @@
+/*
+ *
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at
gnu.org.
+ */
+package org.jboss.starobrno.replication;
+
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.fail;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.cache.transaction.DummyTransactionManager;
+import org.jboss.cache.lock.IsolationLevel;
+
+import javax.naming.Context;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import java.io.NotSerializableException;
+import java.io.Serializable;
+
+/**
+ * Teting of replication exception for a Nonerislizable object
+ *
+ * @author Ben Wang
+ * @version $Revision: 6905 $
+ */
+@Test(groups = {"functional"}, sequential = true)
+public class ReplicationExceptionTest
+{
+ private CacheSPI<String, ContainerData> cache1, cache2;
+
+ //String old_factory = null;
+ final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory";
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception
+ {
+ //old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
+ System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception
+ {
+ // We just can't kill DummyTransactionManager. We are sharing single instance
in more tests.
+ TestingUtil.killTransaction(DummyTransactionManager.getInstance());
+ destroyCaches();
+ /*
+ if (old_factory != null)
+ {
+ System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
+ old_factory = null;
+ }
+ */
+ }
+
+ private TransactionManager beginTransaction() throws SystemException,
NotSupportedException
+ {
+ TransactionManager mgr =
cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+ mgr.begin();
+ return mgr;
+ }
+
+ private void initCaches(Configuration.CacheMode cachingMode)
+ {
+ Configuration conf1 = new Configuration();
+ Configuration conf2 = new Configuration();
+
+ conf1.setCacheMode(cachingMode);
+ conf2.setCacheMode(cachingMode);
+ conf1.setIsolationLevel(IsolationLevel.SERIALIZABLE);
+ conf2.setIsolationLevel(IsolationLevel.SERIALIZABLE);
+
+
conf1.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+
conf2.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+ /*
+
cache1.setTransactionManagerLookupClass("org.jboss.cache.transaction.GenericTransactionManagerLookup");
+
cache2.setTransactionManagerLookupClass("org.jboss.cache.transaction.GenericTransactionManagerLookup");
+ */
+ conf1.setLockAcquisitionTimeout(5000);
+ conf2.setLockAcquisitionTimeout(5000);
+
+ cache1 = (CacheSPI) new CacheManager(conf1).createCache("testCache");
+ cache2 = (CacheSPI) new CacheManager(conf2).createCache("testCache");
+
+
+ cache1.start();
+ cache2.start();
+ }
+
+ void destroyCaches() throws Exception
+ {
+ TestingUtil.killCaches(cache1, cache2);
+ cache1 = null;
+ cache2 = null;
+ }
+
+ public void testNonSerializableRepl() throws Exception
+ {
+ try
+ {
+ initCaches(Configuration.CacheMode.REPL_SYNC);
+
+ cache1.put("test", new ContainerData());
+
+ // We should not come here.
+ assertNotNull("NonSerializableData should not be null on cache2",
cache2.get("test"));
+ }
+ catch (RuntimeException runtime)
+ {
+ Throwable t = runtime.getCause();
+ if (t instanceof NotSerializableException)
+ {
+ System.out.println("received NotSerializableException - as
expected");
+ }
+ else
+ {
+ throw runtime;
+ }
+ }
+ }
+
+ public void testNonSerializableReplWithTx() throws Exception
+ {
+ TransactionManager tm;
+
+ try
+ {
+ initCaches(Configuration.CacheMode.REPL_SYNC);
+
+ tm = beginTransaction();
+ cache1.put("test", new ContainerData());
+ tm.commit();
+
+ // We should not come here.
+ assertNotNull("NonSerializableData should not be null on cache2",
cache2.get("test"));
+ }
+ catch (RollbackException rollback)
+ {
+ System.out.println("received RollbackException - as expected");
+ }
+ catch (Exception e)
+ {
+ // We should also examine that it is indeed throwing a NonSerilaizable
exception.
+ fail(e.toString());
+ }
+ }
+
+ static class NonSerializabeData
+ {
+ int i;
+ }
+
+ static class ContainerData implements Serializable
+ {
+ int i;
+ NonSerializabeData non_serializable_data;
+ private static final long serialVersionUID = -8322197791060897247L;
+
+ public ContainerData()
+ {
+ i = 99;
+ non_serializable_data = new NonSerializabeData();
+ }
+ }
+}
Added:
core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncCacheListenerTest.java
===================================================================
---
core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncCacheListenerTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncCacheListenerTest.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -0,0 +1,256 @@
+/*
+ *
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at
gnu.org.
+ */
+package org.jboss.starobrno.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.lock.IsolationLevel;
+import org.jboss.cache.transaction.DummyTransactionManager;
+import org.jboss.cache.transaction.DummyTransactionManagerLookup;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.notifications.annotation.CacheEntryModified;
+import org.jboss.starobrno.notifications.annotation.CacheEntryRemoved;
+import org.jboss.starobrno.notifications.annotation.CacheListener;
+import org.jboss.starobrno.notifications.event.Event;
+import org.jboss.starobrno.notifications.event.TransactionalEvent;
+import org.jboss.starobrno.util.TestingUtil;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.naming.Context;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Test out the TreeCacheListener
+ *
+ * @version $Revision: 6905 $
+ */
+@Test(groups = {"functional"}, sequential = true)
+public class SyncCacheListenerTest
+{
+ private CacheSPI<Object, Object> cache1, cache2;
+ private final static Log log_ = LogFactory.getLog(SyncCacheListenerTest.class);
+ //private String old_factory = null;
+ private final static String FACTORY =
"org.jboss.cache.transaction.DummyContextFactory";
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception
+ {
+ System.out.println("*** starting setUp()");
+ //old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
+ System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
+
+ initCaches();
+ System.out.println("*** finished setUp()");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception
+ {
+ System.out.println("*** starting tearDown()");
+ // We just can't kill DummyTransactionManager. We are sharing single instance
in more tests.
+ TestingUtil.killTransaction(DummyTransactionManager.getInstance());
+ destroyCaches();
+ /*
+ if (old_factory != null)
+ {
+ System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
+ old_factory = null;
+ }
+ */
+ System.out.println("*** finished tearDown()");
+ }
+
+ private void initCaches()
+ {
+ Configuration conf = new Configuration();
+ conf.setSyncCommitPhase(true);
+ conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ conf.setIsolationLevel(IsolationLevel.SERIALIZABLE);
+
conf.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ conf.setLockAcquisitionTimeout(5000);
+
+ cache1 = (CacheSPI) new CacheManager(conf).createCache("firstCache");
+ cache2 = (CacheSPI) new CacheManager(conf).createCache("firstCache");
+ TestingUtil.blockUntilViewReceived(cache2, 2, 1000);
+ }
+
+ private void destroyCaches()
+ {
+ TestingUtil.killCaches(cache1, cache2);
+ cache1 = null;
+ cache2 = null;
+ }
+
+ public void testSyncTxRepl() throws Exception
+ {
+ Integer age;
+ TransactionManager tm = cache1.getTransactionManager();
+
+ tm.begin();
+ Transaction tx = tm.getTransaction();
+ Listener lis = new Listener();
+ cache1.getNotifier().addCacheListener(lis);
+ lis.put("age", 38);
+
+ tm.suspend();
+ assertNull("age on cache2 must be null as the TX has not yet been
committed", cache2.get("age"));
+ tm.resume(tx);
+ tm.commit();
+
+ // value on cache2 must be 38
+ age = (Integer) cache2.get("age");
+ assertNotNull("\"age\" obtained from cache2 must be non-null ",
age);
+ assertTrue("\"age\" must be 38", age == 38);
+ }
+
+ public void testRemoteCacheListener() throws Exception
+ {
+ Integer age;
+ RemoteListener lis = new RemoteListener();
+ cache2.getNotifier().addCacheListener(lis);
+ cache1.put("age", 38);
+
+ // value on cache2 must be 38
+ age = (Integer) cache2.get("age");
+ assertNotNull("\"age\" obtained from cache2 must be non-null ",
age);
+ assertTrue("\"age\" must be 38", age == 38);
+ cache1.remove("age");
+ }
+
+ public void testSyncRepl() throws Exception
+ {
+ Integer age;
+ Listener lis = new Listener();
+ cache1.addCacheListener(lis);
+ lis.put("age", 38);
+
+ // value on cache2 must be 38
+ age = (Integer) cache2.get("age");
+ assertNotNull("\"age\" obtained from cache2 must be non-null ",
age);
+ assertTrue("\"age\" must be 38", age == 38);
+ }
+
+
+ public void simpleReplicationTest() throws Exception
+ {
+ TransactionManager tm =
cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+ tm.begin();
+ cache1.put("key","value");
+ tm.commit();
+
+ assert cache2.get("key").equals("value");
+
+ }
+
+ public void testSyncTxReplMap() throws Exception
+ {
+ Integer age;
+ TransactionManager tm =
cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+ tm.begin();
+ Transaction tx = tm.getTransaction();
+ Listener lis = new Listener();
+
+ cache1.put("age", 38);
+ lis.put("name", "Ben");
+
+ assert cache1.get("age").equals(38);
+ tm.suspend();
+ assertNull("age on cache2 must be null as the TX has not yet been
committed", cache2.get("age"));
+ assertNull("age on cache1 must be null as the TX has been resumed",
cache1.get("age"));
+ tm.resume(tx);
+ assertNotNull("age on cache1 must be not be null",
cache1.get("age"));
+ tm.commit();
+ assertNotNull("age on cache1 must be not be null",
cache1.get("age"));
+
+ System.out.println(" ********************** ");
+ // value on cache2 must be 38
+ age = (Integer) cache2.get("age");
+ assertNotNull("\"age\" obtained from cache2 must be non-null ",
age);
+ assertTrue("\"age\" must be 38", age == 38);
+ }
+
+ public void testSyncReplMap() throws Exception
+ {
+ Integer age;
+
+ Listener lis = new Listener();
+ cache1.getNotifier().addCacheListener(lis);
+ lis.put("age", 38);
+
+ cache1.put("name", "Ben");
+ // value on cache2 must be 38
+ age = (Integer) cache2.get("age");
+ assertNotNull("\"age\" obtained from cache2 must be non-null ",
age);
+ assertTrue("\"age\" must be 38", age == 38);
+ }
+
+ @CacheListener
+ public class Listener
+ {
+ Object key = null;
+
+ public void put(Object key, Object val)
+ {
+ this.key = key;
+ cache1.put(key, val);
+ }
+
+ public void put(String fqn, Map map)
+ {
+ if (map.size() == 0)
+ fail("put(): map size can't be 0");
+ Set<String> set = map.keySet();
+ key = set.iterator().next();// take anyone
+ cache1.put(fqn, map);
+ }
+
+ @CacheEntryModified
+ public void nodeModified(Event ne)
+ {
+ if (!ne.isPre())
+ {
+ log_.debug("nodeModified visited with fqn: " + key);
+ try
+ {
+ // test out if we can get the read lock since there is a write lock going
as well.
+ cache1.get(key);
+ }
+ catch (CacheException e)
+ {
+ e.printStackTrace();//To change body of catch statement use File |
Settings | File Templates.
+ fail("nodeModified: test failed with exception: " + e);
+ }
+ }
+ }
+
+ }
+
+ @CacheListener
+ public class RemoteListener
+ {
+
+ @CacheEntryRemoved
+ @CacheEntryModified
+ public void callback(TransactionalEvent e)
+ {
+ System.out.println("Callback got event " + e);
+ log_.debug("Callback got event " + e);
+ assertFalse("node was removed on remote cache so isLocal should be
false", e.isOriginLocal());
+ }
+ }
+}
Added: core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncReplTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncReplTest.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at
gnu.org.
+ */
+package org.jboss.starobrno.replication;
+
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.starobrno.config.Configuration;
+
+/**
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
+ */
+@Test(groups = {"functional", "jgroups"})
+public class SyncReplTest
+{
+ private ThreadLocal<Cache<Object, Object>[]> cachesTL = new
ThreadLocal<Cache<Object, Object>[]>();
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp()
+ {
+ System.out.println("*** In setUp()");
+ Cache<Object, Object>[] caches = new Cache[2];
+ Configuration configuration = new Configuration();
+ configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ caches[0] = new CacheManager(configuration).createCache("test");
+ caches[1] = new CacheManager(configuration).createCache("test");
+ cachesTL.set(caches);
+ TestingUtil.blockUntilViewsReceived(caches, 5000);
+ System.out.println("*** Finished setUp()");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown()
+ {
+ Cache<Object, Object>[] caches = cachesTL.get();
+ if (caches != null) TestingUtil.killCaches(caches);
+ cachesTL.set(null);
+ }
+
+ public void testBasicOperation()
+ {
+ Cache<Object, Object>[] caches = cachesTL.get();
+ assertClusterSize("Should only be 2 caches in the cluster!!!", 2);
+
+ String k = "key", v = "value";
+
+ assertNull("Should be null", caches[0].get(k));
+ assertNull("Should be null", caches[1].get(k));
+
+ caches[0].put(k, v);
+
+ assertEquals(v, caches[0].get(k));
+ assertEquals("Should have replicated", v, caches[1].get(k));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testSyncRepl()
+ {
+ Cache<Object, Object>[] caches = cachesTL.get();
+ assertClusterSize("Should only be 2 caches in the cluster!!!", 2);
+
+ caches[0].getConfiguration().setSyncCommitPhase(true);
+ caches[1].getConfiguration().setSyncCommitPhase(true);
+
+ caches[0].put("age", 38);
+ assertEquals("Value should be set", 38, caches[0].get("age"));
+ assertEquals("Value should have replicated", 38,
caches[1].get("age"));
+ }
+
+ private void assertClusterSize(String message, int size)
+ {
+ Cache<Object, Object>[] caches = cachesTL.get();
+ for (Cache c : caches)
+ {
+ assertClusterSize(message, size, c);
+ }
+ }
+
+ private void assertClusterSize(String message, int size, Cache c)
+ {
+ assertEquals(message, size, c.getMembers().size());
+ }
+}
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java 2008-10-16
06:10:36 UTC (rev 6973)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -19,6 +19,7 @@
import org.jboss.starobrno.lock.LockManager;
import org.jgroups.JChannel;
+import javax.transaction.TransactionManager;
import java.io.File;
import java.lang.reflect.Field;
import java.util.Arrays;
@@ -463,6 +464,21 @@
}
/**
+ * Clears transaction with the current thread in the given transaction manager.
+ * @param txManager a TransactionManager to be cleared
+ */
+ public static void killTransaction(TransactionManager txManager) {
+ if (txManager != null) {
+ try {
+ txManager.rollback();
+ } catch (Exception e) {
+ // don't care
+ }
+ }
+ }
+
+
+ /**
* Clears any associated transactions with the current thread in the caches'
transaction managers.
*/
public static void killTransactions(Cache... caches)
Added:
core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
===================================================================
---
core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
(rev 0)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java 2008-10-16
21:12:47 UTC (rev 6974)
@@ -0,0 +1,269 @@
+package org.jboss.starobrno.util.internals;
+
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.remote.ReplicateCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.io.ByteBuffer;
+import org.jboss.starobrno.marshall.CacheMarshallerStarobrno;
+import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
+import org.jboss.starobrno.marshall.ExtendedMarshaller;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+
+/*
+* Utility class that notifies when certain commands were asynchronously replicated on
secondary cache.
+ * Especially useful for avaoiding Thread.sleep() statements.
+ * <p/>
+ * Usage:
+ * <pre>
+ * Cache c1, c2; //these being two async caches
+ * AsyncReplicationListener listener2 = new AsyncReplicationListener(c2);
+ * listener2.expect(PutKeyValueCommand.class);
+ * c1.put(fqn, key, value);
+ * listener2.waitForReplicationToOccur(1000); // -this will block here untill c2
recieves the PutKeyValueCommand command
+ * </pre>
+ * Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the
object returns to the
+ * non-initialized state and *can* be reused through expect-wait cycle.
+ * <b>Note</b>: this class might be used aswell for sync caches, e.g. a test
could have subclasses which use sync and
+ * async replication
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @since 2.2
+ */
+public class ReplicationListener
+{
+ private CountDownLatch latch = new CountDownLatch(1);
+ private Set<Class<? extends ReplicableCommand>> expectedCommands;
+
+ /**
+ * Builds a listener that will observe the given cache for recieving replication
commands.
+ */
+ public ReplicationListener(Cache cache)
+ {
+ ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
+ RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
+ CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher)
TestingUtil.extractField(rpcManager, "rpcDispatcher");
+ RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2)
realDispatcher.getMarshaller();
+ RpcDispatcher.Marshaller2 delegate = null;
+ delegate = new MarshallerDelegate(realMarshaller);
+ realDispatcher.setMarshaller(delegate);
+ realDispatcher.setRequestMarshaller(delegate);
+ realDispatcher.setResponseMarshaller(delegate);
+ }
+
+ private class MarshallerDelegate implements RpcDispatcher.Marshaller2
+ {
+ RpcDispatcher.Marshaller2 marshaller;
+
+ private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller)
+ {
+ this.marshaller = marshaller;
+ }
+
+ public byte[] objectToByteBuffer(Object obj) throws Exception
+ {
+ return marshaller.objectToByteBuffer(obj);
+ }
+
+ public Object objectFromByteBuffer(byte bytes[]) throws Exception
+ {
+ Object result = marshaller.objectFromByteBuffer(bytes);
+ if (result instanceof ReplicateCommand && expectedCommands != null)
+ {
+ ReplicateCommand replicateCommand = (ReplicateCommand) result;
+ return new ReplicateCommandDelegate(replicateCommand);
+ }
+ return result;
+ }
+
+ public Buffer objectToBuffer(Object o) throws Exception
+ {
+ return marshaller.objectToBuffer(o);
+ }
+
+ public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+ {
+ Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
+ if (result instanceof ReplicateCommand && expectedCommands != null)
+ {
+ ReplicateCommand replicateCommand = (ReplicateCommand) result;
+ return new ReplicateCommandDelegate(replicateCommand);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * We want the notification to be performed only *after* the remote command is
executed.
+ */
+ private class ReplicateCommandDelegate extends ReplicateCommand
+ {
+ ReplicateCommand realOne;
+
+ private ReplicateCommandDelegate(ReplicateCommand realOne)
+ {
+ this.realOne = realOne;
+ }
+
+ @Override
+ public Object perform(InvocationContext ctx) throws Throwable
+ {
+ try
+ {
+ return realOne.perform(ctx);
+ }
+ finally
+ {
+ System.out.println("Processed command: " + realOne);
+ Iterator<Class<? extends ReplicableCommand>> it =
expectedCommands.iterator();
+ while (it.hasNext())
+ {
+ Class<? extends ReplicableCommand> replicableCommandClass =
it.next();
+ if (realOne.containsCommandType(replicableCommandClass))
+ {
+ it.remove();
+ } else if (realOne.getSingleModification() instanceof PrepareCommand)
//explicit transaction
+ {
+ PrepareCommand prepareCommand = (PrepareCommand)
realOne.getSingleModification();
+ if (prepareCommand.containsModificationType(replicableCommandClass))
+ {
+ it.remove();
+ }
+ }
+ }
+ if (expectedCommands.isEmpty())
+ {
+ latch.countDown();
+ }
+ }
+ }
+ }
+
+ /**
+ * Needed for region based marshalling.
+ */
+ private class RegionMarshallerDelegate extends CacheMarshallerStarobrno
+ {
+ private ExtendedMarshaller realOne;
+
+ private RegionMarshallerDelegate(ExtendedMarshaller realOne)
+ {
+ this.realOne = realOne;
+ }
+
+ @Override
+ public void objectToObjectStream(Object obj, ObjectOutputStream out) throws
Exception
+ {
+ realOne.objectToObjectStream(obj, out);
+ }
+
+ @Override
+ public Object objectFromObjectStream(ObjectInputStream in) throws Exception
+ {
+ return realOne.objectFromObjectStream(in);
+ }
+
+ @Override
+ public Object objectFromStream(InputStream is) throws Exception
+ {
+ return realOne.objectFromStream(is);
+ }
+
+ public Object objectFromByteBuffer(byte[] bytes) throws Exception
+ {
+ return this.objectFromByteBuffer(bytes, 0, bytes.length);
+ }
+
+
+ public ByteBuffer objectToBuffer(Object o) throws Exception
+ {
+ return realOne.objectToBuffer(o);
+ }
+
+ public Object objectFromByteBuffer(byte[] buffer, int i, int i1) throws Exception
+ {
+ Object result = realOne.objectFromByteBuffer(buffer, i , i1);
+ if (result instanceof ReplicateCommand && expectedCommands != null)
+ {
+ ReplicateCommand replicateCommand = (ReplicateCommand) result;
+ result = new ReplicateCommandDelegate(replicateCommand);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Blocks for the elements specified through {@link #expect(Class[])} invocations to
be replicated in this cache.
+ * if replication does not occur in the give timeout then an exception is being
thrown.
+ */
+ public void waitForReplicationToOccur(long timeoutMillis)
+ {
+ System.out.println("enter...
ReplicationListener.waitForReplicationToOccur");
+ waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
+ System.out.println("exit...
ReplicationListener.waitForReplicationToOccur");
+ }
+
+ /**
+ * Similar to {@link #waitForReplicationToOccur(long)} except that this method
provides more flexibility in time units.
+ *
+ * @param timeout the maximum time to wait
+ * @param timeUnit the time unit of the <tt>timeout</tt> argument.
+ */
+ public void waitForReplicationToOccur(long timeout, TimeUnit timeUnit)
+ {
+ assert expectedCommands != null : "there are no replication expectations;
please use AsyncReplicationListener.expect(...) before calling this method";
+ try
+ {
+ if (!latch.await(timeout, timeUnit))
+ {
+ assert false : "waiting for more than " + timeout + " " +
timeUnit + " and following commands did not replicate: " + expectedCommands;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("unexpected", e);
+ }
+ finally
+ {
+ expectedCommands = null;
+ latch = new CountDownLatch(1);
+ }
+ }
+
+ /**
+ * {@link #waitForReplicationToOccur(long)} will block untill all the commands
specified here are being replicated
+ * to this cache. The method can be called several times with various arguments.
+ */
+ public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+ {
+ if (this.expectedCommands == null)
+ {
+ this.expectedCommands = new HashSet<Class<? extends
ReplicableCommand>>();
+ }
+ this.expectedCommands.addAll(Arrays.asList(expectedCommands));
+ }
+
+ /**
+ * Waits untill first command is replicated.
+ */
+ public void expectAny()
+ {
+ expect();
+ }
+}
\ No newline at end of file