[jbosscache-commits] JBoss Cache SVN: r6974 - in core/branches/flat/src: main/java/org/jboss/starobrno/interceptors and 6 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Oct 16 17:12:47 EDT 2008


Author: mircea.markus
Date: 2008-10-16 17:12:47 -0400 (Thu, 16 Oct 2008)
New Revision: 6974

Added:
   core/branches/flat/src/test/java/org/jboss/starobrno/replication/
   core/branches/flat/src/test/java/org/jboss/starobrno/replication/AsyncReplTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/replication/ExceptionTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/replication/ReplicationExceptionTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncCacheListenerTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncReplTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
Modified:
   core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CallInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
   core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
Log:
added replication tests

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-10-16 06:10:36 UTC (rev 6973)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -74,7 +74,6 @@
    protected TransactionManager transactionManager;
    protected RPCManager rpcManager;
 
-
    @Inject
    private void injectDependencies(InvocationContextContainer invocationContextContainer,
                                    CommandsFactory commandsFactory,
@@ -252,47 +251,47 @@
 
    public List<CommandInterceptor> getInterceptorChain()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public Marshaller getMarshaller()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public void addInterceptor(CommandInterceptor i, int position)
    {
-      //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public void addInterceptor(CommandInterceptor i, Class<? extends CommandInterceptor> afterInterceptor)
    {
-      //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public void removeInterceptor(int position)
    {
-      //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public void removeInterceptor(Class<? extends CommandInterceptor> interceptorType)
    {
-      //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public CacheLoaderManager getCacheLoaderManager()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public BuddyManager getBuddyManager()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public TransactionTable getTransactionTable()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public RPCManager getRPCManager()
@@ -302,32 +301,32 @@
 
    public StateTransferManager getStateTransferManager()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public Notifier getNotifier()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      return notifier;
    }
 
    public String getClusterName()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public GlobalTransaction getCurrentTransaction(Transaction tx, boolean createIfNotExists)
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public GlobalTransaction getCurrentTransaction()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public GravitateResult gravitateData(K key, boolean searchBuddyBackupSubtrees, InvocationContext ctx)
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      throw new IllegalStateException();//todo Implement me properly
    }
 
    public ComponentRegistry getComponentRegistry()

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CallInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CallInterceptor.java	2008-10-16 06:10:36 UTC (rev 6973)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CallInterceptor.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -50,14 +50,6 @@
  */
 public class CallInterceptor extends CommandInterceptor
 {
-   private boolean notOptimisticLocking;
-
-   @Start
-   protected void start()
-   {
-      notOptimisticLocking = false;
-   }
-
    @Override
    public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
    {
@@ -135,7 +127,7 @@
          throws Throwable
    {
       Object result = invokeCommand(ctx, command);
-      if (notOptimisticLocking && ctx.isValidTransaction())
+      if (ctx.isValidTransaction())
       {
          GlobalTransaction gtx = ctx.getGlobalTransaction();
          if (gtx == null)

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java	2008-10-16 06:10:36 UTC (rev 6973)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -40,7 +40,6 @@
 
 /**
  * Acts as a base for all RPC calls - subclassed by
- * {@link org.jboss.cache.interceptors.ReplicationInterceptor} and {@link OptimisticReplicationInterceptor}.
  *
  * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
  */

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java	2008-10-16 06:10:36 UTC (rev 6973)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -287,7 +287,8 @@
          // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
          // the serialization problem could be on the remote end and this is why we cannot catch this above, when marshalling.
          if (retval == null)
-            throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for " + command.getClass().getSimpleName() + " not being serializable.");
+            throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for "
+                  + command.getClass().getSimpleName() + " not being serializable.");
          return retval;
       }
    }

Added: core/branches/flat/src/test/java/org/jboss/starobrno/replication/AsyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/replication/AsyncReplTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/replication/AsyncReplTest.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -0,0 +1,232 @@
+/*
+ *
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.starobrno.replication;
+
+import org.jboss.cache.Fqn;
+import org.jboss.cache.transaction.DummyTransactionManagerLookup;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.util.internals.ReplicationListener;
+import org.jboss.starobrno.util.TestingUtil;
+
+/**
+ * Unit test for replicated async CacheSPI. Use locking and multiple threads to test
+ * concurrent access to the tree.
+ */
+ at Test(groups = {"functional", "jgroups"})
+public class AsyncReplTest
+{
+   private Configuration configuration;
+
+   private class AsyncReplTestTL {
+      private CacheSPI<Object, Object> cache1, cache2;
+      private CacheManager cacheManager1, cacheManager2;
+      private ReplicationListener replListener1, replListener2;
+   }
+
+   private ThreadLocal<AsyncReplTestTL> threadLocal = new ThreadLocal<AsyncReplTestTL>();
+
+   @BeforeMethod(alwaysRun = true)
+   public void setUp() throws Exception
+   {
+      AsyncReplTestTL tl = new AsyncReplTestTL();
+      threadLocal.set(tl);
+
+      configuration = new Configuration();
+      configuration.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+      configuration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+
+      log("creating cache1");
+      tl.cacheManager1 = new CacheManager(configuration);
+      tl.cache1 = (CacheSPI) tl.cacheManager1.createCache("testCache");
+      tl.replListener1 = new ReplicationListener(tl.cache1);
+
+      log("creating cache2");
+      tl.cacheManager2 = new CacheManager(configuration);
+      tl.cache2 = (CacheSPI) tl.cacheManager2.createCache("testCache");
+      tl.replListener2 = new ReplicationListener(tl.cache2);
+   }
+
+   /**
+    * Provides a hook for multiplexer integration. This default implementation
+    * is a no-op; subclasses that test mux integration would override
+    * to integrate the given cache with a multiplexer.
+    * <p/>
+    * param cache a cache that has been configured but not yet created.
+    */
+   protected void configureMultiplexer(Cache cache) throws Exception
+   {
+      // default does nothing
+   }
+
+   /**
+    * Provides a hook to check that the cache's channel came from the
+    * multiplexer, or not, as expected.  This default impl asserts that
+    * the channel did not come from the multiplexer.
+    *
+    * @param cache a cache that has already been started
+    */
+   protected void validateMultiplexer(Cache cache)
+   {
+      assertFalse("Cache is not using multiplexer", cache.getConfiguration().isUsingMultiplexer());
+   }
+
+   @AfterMethod(alwaysRun = true)
+   public void tearDown() throws Exception
+   {
+      AsyncReplTestTL tl = threadLocal.get();
+      TestingUtil.killCaches(tl.cache1, tl.cache2);
+      threadLocal.set(null);
+   }
+
+   public void testTxCompletion() throws Exception
+   {
+      AsyncReplTestTL tl = threadLocal.get();
+      CacheSPI<Object, Object> cache1 = tl.cache1;
+      CacheSPI<Object, Object> cache2 = tl.cache2;
+      ReplicationListener replListener1 = tl.replListener1;
+      ReplicationListener replListener2 = tl.replListener2;
+
+      String key = "key";
+
+      replListener2.expectAny();
+      cache1.put(key, "value1");
+      // allow for replication
+      replListener2.waitForReplicationToOccur(500);
+      assertEquals("value1", cache1.get( key));
+      assertEquals("value1", cache2.get(key));
+
+      TransactionManager mgr = cache1.getTransactionManager();
+      mgr.begin();
+
+      replListener2.expectAny();
+      cache1.put(key, "value2");
+      assertEquals("value2", cache1.get(key));
+      assertEquals("value1", cache2.get(key));
+
+      mgr.commit();
+
+      replListener2.waitForReplicationToOccur(500);
+
+      assertEquals("value2", cache1.get(key));
+      assertEquals("value2", cache2.get(key));
+
+      mgr.begin();
+      cache1.put(key, "value3");
+      assertEquals("value3", cache1.get(key));
+      assertEquals("value2", cache2.get(key));
+
+      mgr.rollback();
+
+      assertEquals("value2", cache1.get(key));
+      assertEquals("value2", cache2.get(key));
+   }
+
+   public void testPutShouldNotReplicateToDifferentCluster()
+   {
+      AsyncReplTestTL tl = threadLocal.get();
+      CacheSPI<Object, Object> cache1 = tl.cache1;
+      CacheSPI<Object, Object> cache2 = tl.cache2;
+      ReplicationListener replListener1 = tl.replListener1;
+      ReplicationListener replListener2 = tl.replListener2;
+
+      CacheSPI<Object, Object> cache3 = null, cache4 = null;
+      try
+      {
+         configuration.setClusterName("otherTest");
+         cache3 = (CacheSPI<Object, Object>) new CacheManager(configuration).createCache("testCache");
+         cache4 = (CacheSPI<Object, Object>) new CacheManager(configuration).createCache("testCache");
+         replListener2.expectAny();
+         cache1.put("age", 38);
+         // because we use async repl, modfication may not yet have been propagated to cache2, so
+         // we have to wait a little
+         replListener2.waitForReplicationToOccur(500);
+         assertNull("Should not have replicated", cache3.get("age"));
+      }
+      catch (Exception e)
+      {
+         fail(e.toString());
+      }
+      finally
+      {
+         if (cache3 != null)
+         {
+            cache3.stop();
+         }
+         if (cache4 != null)
+         {
+            cache4.stop();
+         }
+      }
+   }
+
+   public void testAsyncReplDelay()
+   {
+      Integer age;
+      AsyncReplTestTL tl = threadLocal.get();
+      CacheSPI<Object, Object> cache1 = tl.cache1;
+      CacheSPI<Object, Object> cache2 = tl.cache2;
+      ReplicationListener replListener1 = tl.replListener1;
+      ReplicationListener replListener2 = tl.replListener2;
+
+      try
+      {
+         cache1.put("age", 38);
+
+         // value on cache2 may be 38 or not yet replicated
+         age = (Integer) cache2.get("age");
+         log("attr \"age\" of \"/a/b/c\" on cache2=" + age);
+         assertTrue("should be either null or 38", age == null || age == 38);
+      }
+      catch (Exception e)
+      {
+         fail(e.toString());
+      }
+   }
+
+   public void testAsyncReplTxDelay()
+   {
+      Integer age;
+      AsyncReplTestTL tl = threadLocal.get();
+      CacheSPI<Object, Object> cache1 = tl.cache1;
+      CacheSPI<Object, Object> cache2 = tl.cache2;
+      ReplicationListener replListener1 = tl.replListener1;
+      ReplicationListener replListener2 = tl.replListener2;
+
+      try
+      {
+         TransactionManager tm = cache1.getTransactionManager();
+         tm.begin();
+         cache1.put( "age", 38);
+         tm.commit();
+
+         // value on cache2 may be 38 or not yet replicated
+         age = (Integer) cache2.get("age");
+         log("attr \"age\" of \"/a/b/c\" on cache2=" + age);
+         assertTrue("should be either null or 38", age == null || age == 38);
+      }
+      catch (Exception e)
+      {
+         fail(e.toString());
+      }
+   }
+
+   private void log(String msg)
+   {
+      System.out.println("-- [" + Thread.currentThread() + "]: " + msg);
+   }
+}

Added: core/branches/flat/src/test/java/org/jboss/starobrno/replication/ExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/replication/ExceptionTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/replication/ExceptionTest.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -0,0 +1,98 @@
+package org.jboss.starobrno.replication;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.starobrno.lock.TimeoutException;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.cache.transaction.DummyTransactionManagerLookup;
+
+/**
+ * Tests the type of exceptions thrown for Lock Acquisition Timeouts versus Sync Repl Timeouts
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ */
+ at Test(groups = {"functional"}, sequential = true)
+public class ExceptionTest
+{
+   private Cache cache1;
+   private Cache cache2;
+
+   private static String DELAYED_CLUSTER_CONFIG =
+            "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
+            "    mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
+            "PING(timeout=1000;num_initial_members=2):" +
+            "MERGE2(min_interval=5000;max_interval=10000):" +
+            "FD_SOCK:" +
+            "VERIFY_SUSPECT(timeout=1500):" +
+            "pbcast.NAKACK(gc_lag=50;max_xmit_size=8192;retransmit_timeout=600,1200,2400,4800):" +
+            "UNICAST(timeout=600,1200,2400,4800):" +
+            "pbcast.STABLE(desired_avg_gossip=20000):" +
+            "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
+            "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
+            "shun=false;print_local_addr=true):" +
+            "pbcast.STATE_TRANSFER:" +
+            "DELAY(in_delay=100;out_delay=100)";
+
+   private Cache createCache(String jgroupsConfig)
+   {
+      Configuration c = new Configuration();
+      c.setSyncCommitPhase(true);
+      c.setSyncRollbackPhase(true);
+      c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      if (jgroupsConfig != null) c.setClusterConfig(jgroupsConfig);
+      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      Cache cache = new CacheManager(c).createCache("testCache");
+      return cache;
+   }
+
+   @AfterMethod
+   public void tearDown()
+   {
+      TestingUtil.killCaches(cache1, cache2);
+      cache1 = null;
+      cache2 = null;
+   }
+
+   @Test(groups = {"functional"}, expectedExceptions = {TimeoutException.class})
+   public void testSyncReplTimeout()
+   {
+      cache1 = createCache(DELAYED_CLUSTER_CONFIG);
+      cache2 = createCache(DELAYED_CLUSTER_CONFIG);
+
+      cache1.getConfiguration().setSyncReplTimeout(1); // 1ms.  this is *bound* to fail.
+      cache2.getConfiguration().setSyncReplTimeout(1);
+
+      cache1.start();
+      cache2.start();
+
+      TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+      cache1.put("k", "v");
+   }
+
+   @Test(groups = {"functional"}, expectedExceptions = {TimeoutException.class})
+   public void testLockAcquisitionTimeout() throws Exception
+   {
+      cache1 = createCache(null);
+      cache2 = createCache(null);
+      cache2.getConfiguration().setLockAcquisitionTimeout(1);
+
+      cache1.start();
+      cache2.start();
+
+      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+
+      // get a lock on cache 2 and hold on to it.
+      TransactionManager tm = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+      tm.begin();
+      cache2.put("block", "block");
+      Transaction t = tm.suspend();
+      cache1.put("block", "v");
+   }
+}

Added: core/branches/flat/src/test/java/org/jboss/starobrno/replication/ReplicationExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/replication/ReplicationExceptionTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/replication/ReplicationExceptionTest.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -0,0 +1,175 @@
+/*
+ *
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.starobrno.replication;
+
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.fail;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.cache.transaction.DummyTransactionManager;
+import org.jboss.cache.lock.IsolationLevel;
+
+import javax.naming.Context;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import java.io.NotSerializableException;
+import java.io.Serializable;
+
+/**
+ * Teting of replication exception for a Nonerislizable object
+ *
+ * @author Ben Wang
+ * @version $Revision: 6905 $
+ */
+ at Test(groups = {"functional"}, sequential = true)
+public class ReplicationExceptionTest
+{
+   private CacheSPI<String, ContainerData> cache1, cache2;
+
+   //String old_factory = null;
+   final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory";
+
+   @BeforeMethod(alwaysRun = true)
+   public void setUp() throws Exception
+   {
+      //old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
+      System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
+   }
+
+   @AfterMethod(alwaysRun = true)
+   public void tearDown() throws Exception
+   {
+      // We just can't kill DummyTransactionManager. We are sharing single instance in more tests.
+      TestingUtil.killTransaction(DummyTransactionManager.getInstance());
+      destroyCaches();
+      /*
+      if (old_factory != null)
+      {
+         System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
+         old_factory = null;
+      }
+      */
+   }
+
+   private TransactionManager beginTransaction() throws SystemException, NotSupportedException
+   {
+      TransactionManager mgr = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+      mgr.begin();
+      return mgr;
+   }
+
+   private void initCaches(Configuration.CacheMode cachingMode)
+   {
+      Configuration conf1 = new Configuration();
+      Configuration conf2 = new Configuration();
+
+      conf1.setCacheMode(cachingMode);
+      conf2.setCacheMode(cachingMode);
+      conf1.setIsolationLevel(IsolationLevel.SERIALIZABLE);
+      conf2.setIsolationLevel(IsolationLevel.SERIALIZABLE);
+
+      conf1.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+      conf2.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+      /*
+       cache1.setTransactionManagerLookupClass("org.jboss.cache.transaction.GenericTransactionManagerLookup");
+       cache2.setTransactionManagerLookupClass("org.jboss.cache.transaction.GenericTransactionManagerLookup");
+       */
+      conf1.setLockAcquisitionTimeout(5000);
+      conf2.setLockAcquisitionTimeout(5000);
+
+      cache1 = (CacheSPI) new CacheManager(conf1).createCache("testCache");
+      cache2 = (CacheSPI) new CacheManager(conf2).createCache("testCache");
+
+
+      cache1.start();
+      cache2.start();
+   }
+
+   void destroyCaches() throws Exception
+   {
+      TestingUtil.killCaches(cache1, cache2);
+      cache1 = null;
+      cache2 = null;
+   }
+
+   public void testNonSerializableRepl() throws Exception
+   {
+      try
+      {
+         initCaches(Configuration.CacheMode.REPL_SYNC);
+
+         cache1.put("test", new ContainerData());
+
+         // We should not come here.
+         assertNotNull("NonSerializableData should not be null on cache2", cache2.get("test"));
+      }
+      catch (RuntimeException runtime)
+      {
+         Throwable t = runtime.getCause();
+         if (t instanceof NotSerializableException)
+         {
+            System.out.println("received NotSerializableException - as expected");
+         }
+         else
+         {
+            throw runtime;
+         }
+      }
+   }
+
+   public void testNonSerializableReplWithTx() throws Exception
+   {
+      TransactionManager tm;
+
+      try
+      {
+         initCaches(Configuration.CacheMode.REPL_SYNC);
+
+         tm = beginTransaction();
+         cache1.put("test", new ContainerData());
+         tm.commit();
+
+         // We should not come here.
+         assertNotNull("NonSerializableData should not be null on cache2", cache2.get("test"));
+      }
+      catch (RollbackException rollback)
+      {
+         System.out.println("received RollbackException - as expected");
+      }
+      catch (Exception e)
+      {
+         // We should also examine that it is indeed throwing a NonSerilaizable exception.
+         fail(e.toString());
+      }
+   }
+
+   static class NonSerializabeData
+   {
+      int i;
+   }
+
+   static class ContainerData implements Serializable
+   {
+      int i;
+      NonSerializabeData non_serializable_data;
+      private static final long serialVersionUID = -8322197791060897247L;
+
+      public ContainerData()
+      {
+         i = 99;
+         non_serializable_data = new NonSerializabeData();
+      }
+   }
+}

Added: core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncCacheListenerTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncCacheListenerTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncCacheListenerTest.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -0,0 +1,256 @@
+/*
+ *
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.starobrno.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.lock.IsolationLevel;
+import org.jboss.cache.transaction.DummyTransactionManager;
+import org.jboss.cache.transaction.DummyTransactionManagerLookup;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.notifications.annotation.CacheEntryModified;
+import org.jboss.starobrno.notifications.annotation.CacheEntryRemoved;
+import org.jboss.starobrno.notifications.annotation.CacheListener;
+import org.jboss.starobrno.notifications.event.Event;
+import org.jboss.starobrno.notifications.event.TransactionalEvent;
+import org.jboss.starobrno.util.TestingUtil;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.naming.Context;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Test out the TreeCacheListener
+ *
+ * @version $Revision: 6905 $
+ */
+ at Test(groups = {"functional"}, sequential = true)
+public class SyncCacheListenerTest
+{
+   private CacheSPI<Object, Object> cache1, cache2;
+   private final static Log log_ = LogFactory.getLog(SyncCacheListenerTest.class);
+   //private String old_factory = null;
+   private final static String FACTORY = "org.jboss.cache.transaction.DummyContextFactory";
+
+   @BeforeMethod(alwaysRun = true)
+   public void setUp() throws Exception
+   {
+      System.out.println("*** starting setUp()");
+      //old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
+      System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
+
+      initCaches();
+      System.out.println("*** finished setUp()");
+   }
+
+   @AfterMethod(alwaysRun = true)
+   public void tearDown() throws Exception
+   {
+      System.out.println("*** starting tearDown()");
+      // We just can't kill DummyTransactionManager. We are sharing single instance in more tests.
+      TestingUtil.killTransaction(DummyTransactionManager.getInstance());
+      destroyCaches();
+      /*
+      if (old_factory != null)
+      {
+         System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
+         old_factory = null;
+      }
+       */
+      System.out.println("*** finished tearDown()");
+   }
+
+   private void initCaches()
+   {
+      Configuration conf = new Configuration();
+      conf.setSyncCommitPhase(true);
+      conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      conf.setIsolationLevel(IsolationLevel.SERIALIZABLE);
+      conf.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      conf.setLockAcquisitionTimeout(5000);
+
+      cache1 = (CacheSPI) new CacheManager(conf).createCache("firstCache");
+      cache2 = (CacheSPI) new CacheManager(conf).createCache("firstCache");
+      TestingUtil.blockUntilViewReceived(cache2, 2, 1000);
+   }
+
+   private void destroyCaches()
+   {
+      TestingUtil.killCaches(cache1, cache2);
+      cache1 = null;
+      cache2 = null;
+   }
+
+   public void testSyncTxRepl() throws Exception
+   {
+      Integer age;
+      TransactionManager tm = cache1.getTransactionManager();
+
+      tm.begin();
+      Transaction tx = tm.getTransaction();
+      Listener lis = new Listener();
+      cache1.getNotifier().addCacheListener(lis);
+      lis.put("age", 38);
+
+      tm.suspend();
+      assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("age"));
+      tm.resume(tx);
+      tm.commit();
+
+      // value on cache2 must be 38
+      age = (Integer) cache2.get("age");
+      assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
+      assertTrue("\"age\" must be 38", age == 38);
+   }
+
+   public void testRemoteCacheListener() throws Exception
+   {
+      Integer age;
+      RemoteListener lis = new RemoteListener();
+      cache2.getNotifier().addCacheListener(lis);
+      cache1.put("age", 38);
+
+      // value on cache2 must be 38
+      age = (Integer) cache2.get("age");
+      assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
+      assertTrue("\"age\" must be 38", age == 38);
+      cache1.remove("age");
+   }
+
+   public void testSyncRepl() throws Exception
+   {
+      Integer age;
+      Listener lis = new Listener();
+      cache1.addCacheListener(lis);
+      lis.put("age", 38);
+
+      // value on cache2 must be 38
+      age = (Integer) cache2.get("age");
+      assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
+      assertTrue("\"age\" must be 38", age == 38);
+   }
+
+
+   public void simpleReplicationTest() throws Exception
+   {
+      TransactionManager tm = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+      tm.begin();
+      cache1.put("key","value");
+      tm.commit();
+
+      assert cache2.get("key").equals("value");
+
+   }
+
+   public void testSyncTxReplMap() throws Exception
+   {
+      Integer age;
+      TransactionManager tm = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+      tm.begin();
+      Transaction tx = tm.getTransaction();
+      Listener lis = new Listener();
+
+      cache1.put("age", 38);
+      lis.put("name", "Ben");
+
+      assert cache1.get("age").equals(38);
+      tm.suspend();
+      assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("age"));
+      assertNull("age on cache1 must be null as the TX has been resumed", cache1.get("age"));
+      tm.resume(tx);
+      assertNotNull("age on cache1 must be not be null", cache1.get("age"));
+      tm.commit();
+      assertNotNull("age on cache1 must be not be null", cache1.get("age"));
+
+      System.out.println("  ********************** ");
+      // value on cache2 must be 38
+      age = (Integer) cache2.get("age");
+      assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
+      assertTrue("\"age\" must be 38", age == 38);
+   }
+
+   public void testSyncReplMap() throws Exception
+   {
+      Integer age;
+
+      Listener lis = new Listener();
+      cache1.getNotifier().addCacheListener(lis);
+      lis.put("age", 38);
+
+      cache1.put("name", "Ben");
+      // value on cache2 must be 38
+      age = (Integer) cache2.get("age");
+      assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
+      assertTrue("\"age\" must be 38", age == 38);
+   }
+
+   @CacheListener
+   public class Listener
+   {
+      Object key = null;
+
+      public void put(Object key, Object val)
+      {
+         this.key = key;
+         cache1.put(key, val);
+      }
+
+      public void put(String fqn, Map map)
+      {
+         if (map.size() == 0)
+            fail("put(): map size can't be 0");
+         Set<String> set = map.keySet();
+         key = set.iterator().next();// take anyone
+         cache1.put(fqn, map);
+      }
+
+      @CacheEntryModified
+      public void nodeModified(Event ne)
+      {
+         if (!ne.isPre())
+         {
+            log_.debug("nodeModified visited with fqn: " + key);
+            try
+            {
+               // test out if we can get the read lock since there is a write lock going as well.
+               cache1.get(key);
+            }
+            catch (CacheException e)
+            {
+               e.printStackTrace();//To change body of catch statement use File | Settings | File Templates.
+               fail("nodeModified: test failed with exception: " + e);
+            }
+         }
+      }
+
+   }
+
+   @CacheListener
+   public class RemoteListener
+   {
+
+      @CacheEntryRemoved
+      @CacheEntryModified
+      public void callback(TransactionalEvent e)
+      {
+         System.out.println("Callback got event " + e);
+         log_.debug("Callback got event " + e);
+         assertFalse("node was removed on remote cache so isLocal should be false", e.isOriginLocal());
+      }
+   }
+}

Added: core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncReplTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/replication/SyncReplTest.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.starobrno.replication;
+
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.starobrno.config.Configuration;
+
+/**
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ */
+ at Test(groups = {"functional", "jgroups"})
+public class SyncReplTest
+{
+   private ThreadLocal<Cache<Object, Object>[]> cachesTL = new ThreadLocal<Cache<Object, Object>[]>();
+
+   @BeforeMethod(alwaysRun = true)
+   public void setUp()
+   {
+      System.out.println("*** In setUp()");
+      Cache<Object, Object>[] caches = new Cache[2];
+      Configuration configuration = new Configuration();
+      configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      caches[0] = new CacheManager(configuration).createCache("test");
+      caches[1] = new CacheManager(configuration).createCache("test");
+      cachesTL.set(caches);
+      TestingUtil.blockUntilViewsReceived(caches, 5000);
+      System.out.println("*** Finished setUp()");
+   }
+
+   @AfterMethod(alwaysRun = true)
+   public void tearDown()
+   {
+      Cache<Object, Object>[] caches = cachesTL.get();
+      if (caches != null) TestingUtil.killCaches(caches);
+      cachesTL.set(null);
+   }
+
+   public void testBasicOperation()
+   {
+      Cache<Object, Object>[] caches = cachesTL.get();
+      assertClusterSize("Should only be 2  caches in the cluster!!!", 2);
+
+      String k = "key", v = "value";
+
+      assertNull("Should be null", caches[0].get(k));
+      assertNull("Should be null", caches[1].get(k));
+
+      caches[0].put(k, v);
+
+      assertEquals(v, caches[0].get(k));
+      assertEquals("Should have replicated", v, caches[1].get(k));
+   }
+
+   @SuppressWarnings("unchecked")
+   public void testSyncRepl()
+   {
+      Cache<Object, Object>[] caches = cachesTL.get();
+      assertClusterSize("Should only be 2  caches in the cluster!!!", 2);
+
+      caches[0].getConfiguration().setSyncCommitPhase(true);
+      caches[1].getConfiguration().setSyncCommitPhase(true);
+
+      caches[0].put("age", 38);
+      assertEquals("Value should be set", 38, caches[0].get("age"));
+      assertEquals("Value should have replicated", 38, caches[1].get("age"));
+   }
+
+   private void assertClusterSize(String message, int size)
+   {
+      Cache<Object, Object>[] caches = cachesTL.get();
+      for (Cache c : caches)
+      {
+         assertClusterSize(message, size, c);
+      }
+   }
+
+   private void assertClusterSize(String message, int size, Cache c)
+   {
+      assertEquals(message, size, c.getMembers().size());
+   }
+}

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java	2008-10-16 06:10:36 UTC (rev 6973)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -19,6 +19,7 @@
 import org.jboss.starobrno.lock.LockManager;
 import org.jgroups.JChannel;
 
+import javax.transaction.TransactionManager;
 import java.io.File;
 import java.lang.reflect.Field;
 import java.util.Arrays;
@@ -463,6 +464,21 @@
    }
 
    /**
+    * Clears transaction with the current thread in the given transaction manager.
+    * @param txManager a TransactionManager to be cleared
+    */
+   public static void killTransaction(TransactionManager txManager) {
+      if (txManager != null) {
+         try {
+            txManager.rollback();
+         } catch (Exception e) {
+               // don't care
+         }
+      }
+   }
+   
+
+   /**
     * Clears any associated transactions with the current thread in the caches' transaction managers.
     */
    public static void killTransactions(Cache... caches)

Added: core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java	2008-10-16 21:12:47 UTC (rev 6974)
@@ -0,0 +1,269 @@
+package org.jboss.starobrno.util.internals;
+
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.remote.ReplicateCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.io.ByteBuffer;
+import org.jboss.starobrno.marshall.CacheMarshallerStarobrno;
+import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
+import org.jboss.starobrno.marshall.ExtendedMarshaller;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+
+/*
+* Utility class that notifies when certain commands were asynchronously replicated on secondary cache.
+ * Especially useful for avaoiding Thread.sleep() statements.
+ * <p/>
+ * Usage:
+ * <pre>
+ *   Cache c1, c2; //these being two async caches
+ *   AsyncReplicationListener listener2 = new AsyncReplicationListener(c2);
+ *   listener2.expect(PutKeyValueCommand.class);
+ *   c1.put(fqn, key, value);
+ *   listener2.waitForReplicationToOccur(1000); // -this will block here untill c2 recieves the PutKeyValueCommand command
+ * </pre>
+ * Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the object returns to the
+ * non-initialized state and *can* be reused through expect-wait cycle.
+ * <b>Note</b>:  this class might be used aswell for sync caches, e.g. a test could have subclasses which use sync and
+ * async replication
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public class ReplicationListener
+{
+   private CountDownLatch latch = new CountDownLatch(1);
+   private Set<Class<? extends ReplicableCommand>> expectedCommands;
+
+   /**
+    * Builds a listener that will observe the given cache for recieving replication commands.
+    */
+   public ReplicationListener(Cache cache)
+   {
+      ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
+      RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
+      CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
+      RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
+      RpcDispatcher.Marshaller2 delegate = null;
+      delegate = new MarshallerDelegate(realMarshaller);
+      realDispatcher.setMarshaller(delegate);
+      realDispatcher.setRequestMarshaller(delegate);
+      realDispatcher.setResponseMarshaller(delegate);
+   }
+
+   private class MarshallerDelegate implements RpcDispatcher.Marshaller2
+   {
+      RpcDispatcher.Marshaller2 marshaller;
+
+      private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller)
+      {
+         this.marshaller = marshaller;
+      }
+
+      public byte[] objectToByteBuffer(Object obj) throws Exception
+      {
+         return marshaller.objectToByteBuffer(obj);
+      }
+
+      public Object objectFromByteBuffer(byte bytes[]) throws Exception
+      {
+         Object result = marshaller.objectFromByteBuffer(bytes);
+         if (result instanceof ReplicateCommand && expectedCommands != null)
+         {
+            ReplicateCommand replicateCommand = (ReplicateCommand) result;
+            return new ReplicateCommandDelegate(replicateCommand);
+         }
+         return result;
+      }
+
+      public Buffer objectToBuffer(Object o) throws Exception
+      {
+         return marshaller.objectToBuffer(o);
+      }
+
+      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+      {
+         Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
+         if (result instanceof ReplicateCommand && expectedCommands != null)
+         {
+            ReplicateCommand replicateCommand = (ReplicateCommand) result;
+            return new ReplicateCommandDelegate(replicateCommand);
+         }
+         return result;
+      }
+   }
+
+   /**
+    * We want the notification to be performed only *after* the remote command is executed.
+    */
+   private class ReplicateCommandDelegate extends ReplicateCommand
+   {
+      ReplicateCommand realOne;
+
+      private ReplicateCommandDelegate(ReplicateCommand realOne)
+      {
+         this.realOne = realOne;
+      }
+
+      @Override
+      public Object perform(InvocationContext ctx) throws Throwable
+      {
+         try
+         {
+            return realOne.perform(ctx);
+         }
+         finally
+         {
+            System.out.println("Processed command: " + realOne);
+            Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
+            while (it.hasNext())
+            {
+               Class<? extends ReplicableCommand> replicableCommandClass = it.next();
+               if (realOne.containsCommandType(replicableCommandClass))
+               {
+                  it.remove();
+               } else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
+               {
+                  PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
+                  if (prepareCommand.containsModificationType(replicableCommandClass))
+                  {
+                     it.remove();
+                  }
+               }
+            }
+            if (expectedCommands.isEmpty())
+            {
+               latch.countDown();
+            }
+         }
+      }
+   }
+
+   /**
+    * Needed for region based marshalling.
+    */
+   private class RegionMarshallerDelegate extends CacheMarshallerStarobrno
+   {
+      private ExtendedMarshaller realOne;
+
+      private RegionMarshallerDelegate(ExtendedMarshaller realOne)
+      {
+         this.realOne = realOne;
+      }
+
+      @Override
+      public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
+      {
+         realOne.objectToObjectStream(obj, out);
+      }
+
+      @Override
+      public Object objectFromObjectStream(ObjectInputStream in) throws Exception
+      {
+         return realOne.objectFromObjectStream(in);
+      }
+
+      @Override
+      public Object objectFromStream(InputStream is) throws Exception
+      {
+         return realOne.objectFromStream(is);
+      }
+
+      public Object objectFromByteBuffer(byte[] bytes) throws Exception
+      {
+         return this.objectFromByteBuffer(bytes, 0, bytes.length);
+      }
+
+
+      public ByteBuffer objectToBuffer(Object o) throws Exception
+      {
+         return realOne.objectToBuffer(o);
+      }
+
+      public Object objectFromByteBuffer(byte[] buffer, int i, int i1) throws Exception
+      {
+         Object result =  realOne.objectFromByteBuffer(buffer, i , i1);
+         if (result instanceof ReplicateCommand && expectedCommands != null)
+         {
+            ReplicateCommand replicateCommand = (ReplicateCommand) result;
+            result = new ReplicateCommandDelegate(replicateCommand);
+         }
+         return result;
+      }
+   }
+
+   /**
+    * Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
+    * if replication does not occur in the give timeout then an exception is being thrown.
+    */
+   public void waitForReplicationToOccur(long timeoutMillis)
+   {
+      System.out.println("enter... ReplicationListener.waitForReplicationToOccur");
+      waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
+      System.out.println("exit... ReplicationListener.waitForReplicationToOccur");
+   }
+
+   /**
+    * Similar to {@link #waitForReplicationToOccur(long)} except that this method provides more flexibility in time units.
+    *
+    * @param timeout  the maximum time to wait
+    * @param timeUnit the time unit of the <tt>timeout</tt> argument.
+    */
+   public void waitForReplicationToOccur(long timeout, TimeUnit timeUnit)
+   {
+      assert expectedCommands != null : "there are no replication expectations; please use AsyncReplicationListener.expect(...) before calling this method";
+      try
+      {
+         if (!latch.await(timeout, timeUnit))
+         {
+            assert false : "waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
+         }
+      }
+      catch (InterruptedException e)
+      {
+         throw new IllegalStateException("unexpected", e);
+      }
+      finally
+      {
+         expectedCommands = null;
+         latch = new CountDownLatch(1);
+      }
+   }
+
+   /**
+    * {@link #waitForReplicationToOccur(long)} will block untill all the commands specified here are being replicated
+    * to this cache. The method can be called several times with various arguments.
+    */
+   public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+   {
+      if (this.expectedCommands == null)
+      {
+         this.expectedCommands = new HashSet<Class<? extends ReplicableCommand>>();
+      }
+      this.expectedCommands.addAll(Arrays.asList(expectedCommands));
+   }
+
+   /**
+    * Waits untill first command is replicated.
+    */
+   public void expectAny()
+   {
+      expect();
+   }
+}
\ No newline at end of file




More information about the jbosscache-commits mailing list