[jbosscache-commits] JBoss Cache SVN: r7163 - in core/trunk/src/test/java/org/jboss/cache: buddyreplication and 10 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Nov 19 06:57:48 EST 2008


Author: mircea.markus
Date: 2008-11-19 06:57:48 -0500 (Wed, 19 Nov 2008)
New Revision: 7163

Added:
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/InvalidationReplicationListener.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/MvccReplicationListener.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
Removed:
   core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Modified:
   core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java
   core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
   core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
   core/trunk/src/test/java/org/jboss/cache/eviction/ExpirationPolicyTest.java
   core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
   core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationOptLocksTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationPessLocksTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplOptLocksTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplPessLocksTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationOptLocksTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationPessLocksTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplOptLocksTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplPessLocksTest.java
   core/trunk/src/test/java/org/jboss/cache/replicated/AsyncReplTest.java
   core/trunk/src/test/java/org/jboss/cache/replicated/PessimisticSyncReplTxTest.java
Log:
enhnaced tests

Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -2,7 +2,6 @@
 
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
-import org.jboss.cache.util.internals.ReplicationListener;
 import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.config.Configuration.NodeLockingScheme;
 import org.jboss.cache.lock.NodeLock;
@@ -38,42 +37,40 @@
     */
    public void testNoOpWhenLockedAnd0msTimeout() throws Exception
    {
-      PutForExternalReadTestBaseTL threadCfg = threadLocal.get();
       // create the parent node first ...
-      threadCfg.replListener2 = new ReplicationListener(threadCfg.cache2);
-      threadCfg.replListener2.smartExpect(PutKeyValueCommand.class, false);
-      threadCfg.cache1.put(parentFqn, key, value);
-      threadCfg.replListener2.waitForReplicationToOccur(10000);
+      replListener2.expect(PutKeyValueCommand.class);
+      cache1.put(parentFqn, key, value);
+      replListener2.waitForReplicationToOccur(10000);
 
-      threadCfg.replListener2.smartExpect(PutKeyValueCommand.class, true);
-      threadCfg.tm1.begin();
-      threadCfg.cache1.put(parentFqn, key, value2);
+      replListener2.expectWithTx(PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.put(parentFqn, key, value2);
 
-      Transaction t = threadCfg.tm1.suspend();
+      Transaction t = tm1.suspend();
 
-      assertLocked(parentFqn, threadCfg.cache1, true);
+      assertLocked(parentFqn, cache1, true);
 
       // parentFqn should be write-locked.
       long startTime = System.currentTimeMillis();
-      threadCfg.cache1.putForExternalRead(fqn, key, value);
+      cache1.putForExternalRead(fqn, key, value);
 
       long waited = System.currentTimeMillis() - startTime;
       // crappy way to test that pFER does not block, but it is effective.
-      assertTrue("Should not wait " + waited + " millis for lock timeout, should attempt to acquire lock with 0ms!", waited < threadCfg.cache1.getConfiguration().getLockAcquisitionTimeout());
+      assertTrue("Should not wait " + waited + " millis for lock timeout, should attempt to acquire lock with 0ms!", waited < cache1.getConfiguration().getLockAcquisitionTimeout());
       // should not block.
 
-      threadCfg.tm1.resume(t);
-      threadCfg.tm1.commit();
+      tm1.resume(t);
+      tm1.commit();
 
-      threadCfg.replListener2.waitForReplicationToOccur(1000);
+      replListener2.waitForReplicationToOccur(1000);
 
-      assertEquals("Parent node write should have succeeded", value2, threadCfg.cache1.get(parentFqn, key));
+      assertEquals("Parent node write should have succeeded", value2, cache1.get(parentFqn, key));
       if (isUsingInvalidation())
-         assertNull("Parent node write should have invalidated", threadCfg.cache2.get(parentFqn, key));
+         assertNull("Parent node write should have invalidated", cache2.get(parentFqn, key));
       else
-         assertEquals("Parent node write should have replicated", value2, threadCfg.cache2.get(parentFqn, key));
+         assertEquals("Parent node write should have replicated", value2, cache2.get(parentFqn, key));
 
-      assertNull("PFER should have been a no-op", threadCfg.cache1.get(fqn, key));
-      assertNull("PFER should have been a no-op", threadCfg.cache2.get(fqn, key));
+      assertNull("PFER should have been a no-op", cache1.get(fqn, key));
+      assertNull("PFER should have been a no-op", cache2.get(fqn, key));
    }
 }
\ No newline at end of file

Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -2,12 +2,11 @@
 
 import org.easymock.EasyMock;
 import static org.easymock.EasyMock.*;
-import org.jboss.cache.Cache;
-import org.jboss.cache.CacheFactory;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.RPCManager;
+import org.jboss.cache.*;
 import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.write.PutForExternalReadCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Configuration.NodeLockingScheme;
 import org.jboss.cache.factories.ComponentRegistry;
@@ -16,7 +15,7 @@
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.OptimisticTransactionContext;
 import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import org.jgroups.Address;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
@@ -28,100 +27,92 @@
 import javax.transaction.TransactionManager;
 import java.util.List;
 import java.util.Vector;
-import org.jboss.cache.UnitTestCacheFactory;
 
 @Test(groups = {"functional", "jgroups", "transaction"})
 public abstract class PutForExternalReadTestBase
 {
-   // TODO: Refactor this test so it isn't bound to using Thread.sleep() calls.
-
    protected Configuration.CacheMode cacheMode;
    protected NodeLockingScheme nodeLockingScheme;
    protected final Fqn fqn = Fqn.fromString("/one/two");
    protected final Fqn parentFqn = fqn.getParent();
    protected final String key = "k", value = "v", value2 = "v2";
-   
-   protected class PutForExternalReadTestBaseTL {
-      protected CacheSPI<String, String> cache1, cache2;
 
-      ReplicationListener replListener1;
-      ReplicationListener replListener2;
+   protected CacheSPI<String, String> cache1, cache2;
 
-      protected TransactionManager tm1, tm2;
+   ReplicationListener replListener1;
+   ReplicationListener replListener2;
 
-      protected boolean useTx;
-   }
-   
-   
-   ThreadLocal<PutForExternalReadTestBaseTL> threadLocal = new ThreadLocal<PutForExternalReadTestBaseTL>();
+   protected TransactionManager tm1, tm2;
 
+   protected boolean useTx;
+
+
    @BeforeMethod(alwaysRun = true)
    public void setUp()
    {
-      PutForExternalReadTestBaseTL tl = new PutForExternalReadTestBaseTL();
-      threadLocal.set(tl);
-      
+
       CacheFactory<String, String> cf = new UnitTestCacheFactory<String, String>();
 
-      tl.cache1 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
-      tl.cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
-      tl.cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very important for async tests!
-      tl.cache1.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
+      cache1 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
+      cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+      cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very important for async tests!
+      cache1.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
 
-      tl.cache1.start();
-      tl.tm1 = tl.cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+      cache1.start();
+      tm1 = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
 
-      tl.cache2 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
-      tl.cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
-      tl.cache2.getConfiguration().setSerializationExecutorPoolSize(0); //this is very important for async tests!
-      tl.cache2.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
+      cache2 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
+      cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+      cache2.getConfiguration().setSerializationExecutorPoolSize(0); //this is very important for async tests!
+      cache2.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
 
-      tl.cache2.start();
-      tl.tm2 = tl.cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+      cache2.start();
+      tm2 = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+      replListener1 = ReplicationListener.getReplicationListener(cache1);
+      replListener2 = ReplicationListener.getReplicationListener(cache2);
 
-      TestingUtil.blockUntilViewsReceived(10000, tl.cache1, tl.cache2);
+      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
    }
 
    @AfterMethod(alwaysRun = true)
    public void tearDown()
    {
-      PutForExternalReadTestBaseTL tl = threadLocal.get();
-      if (tl != null) {
-         TestingUtil.killCaches(tl.cache1, tl.cache2);
-         threadLocal.set(null);
-      }
+         TestingUtil.killCaches(cache1, cache2);
    }
 
    public void testNoOpWhenNodePresent()
    {
-      PutForExternalReadTestBaseTL tl = threadLocal.get();
-      tl.cache1.putForExternalRead(fqn, key, value);
-      asyncWait();
+      replListener2.expect(PutForExternalReadCommand.class);
+      cache1.putForExternalRead(fqn, key, value);
+      replListener2.waitForReplicationToOccur();
 
-      assertEquals("PFER should have succeeded", value, tl.cache1.get(fqn, key));
+
+      assertEquals("PFER should have succeeded", value, cache1.get(fqn, key));
       if (isUsingInvalidation())
-         assertNull("PFER should not have effected cache2", tl.cache2.get(fqn, key));
+         assertNull("PFER should not have effected cache2", cache2.get(fqn, key));
       else
-         assertEquals("PFER should have replicated", value, tl.cache2.get(fqn, key));
+         assertEquals("PFER should have replicated", value, cache2.get(fqn, key));
 
       // reset
-      tl.cache1.removeNode(fqn);
-      asyncWait();
+      replListener2.expect(RemoveNodeCommand.class);
+      cache1.removeNode(fqn);
+      replListener2.waitForReplicationToOccur();
 
-      assertFalse("Should have reset", tl.cache1.getRoot().hasChild(fqn));
-      assertFalse("Should have reset", tl.cache2.getRoot().hasChild(fqn));
+      assertFalse("Should have reset", cache1.getRoot().hasChild(fqn));
+      assertFalse("Should have reset", cache2.getRoot().hasChild(fqn));
 
-      tl.cache1.put(fqn, key, value);
-      asyncWait();
+      replListener2.expect(PutKeyValueCommand.class);
+      cache1.put(fqn, key, value);
+      replListener2.waitForReplicationToOccur();
 
       // now this pfer should be a no-op
-      tl.cache1.putForExternalRead(fqn, key, value2);
+      cache1.putForExternalRead(fqn, key, value2);
 
-      assertEquals("PFER should have been a no-op", value, tl.cache1.get(fqn, key));
+      assertEquals("PFER should have been a no-op", value, cache1.get(fqn, key));
       if (isUsingInvalidation())
-         assertNull("PFER should have been a no-op", tl.cache2.get(fqn, key));
+         assertNull("PFER should have been a no-op", cache2.get(fqn, key));
       else
-         assertEquals("PFER should have been a no-op", value, tl.cache2.get(fqn, key));
+         assertEquals("PFER should have been a no-op", value, cache2.get(fqn, key));
    }
 
    private Vector<Address> anyAddresses()
@@ -132,72 +123,72 @@
 
    public void testAsyncForce() throws Exception
    {
-      PutForExternalReadTestBaseTL tl = threadLocal.get();
       RPCManager rpcManager = EasyMock.createNiceMock(RPCManager.class);
-      RPCManager originalRpcManager = tl.cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
       List<Address> memberList = originalRpcManager.getMembers();
       expect(rpcManager.getMembers()).andReturn(memberList).anyTimes();
       // inject a mock RPC manager so that we can test whether calls made are sync or async.
-      ComponentRegistry cr = TestingUtil.extractComponentRegistry(tl.cache1);
+      ComponentRegistry cr = TestingUtil.extractComponentRegistry(cache1);
       cr.registerComponent(rpcManager, RPCManager.class);
       cr.rewire();
 
       // invalidations will not trigger any rpc calls for PFER
       if (!isUsingInvalidation())
       {
-         // specify what we expect called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
-         // setting the mock object to expect the "sync" param to be false.
+         // specify what we expectWithTx called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
+         // setting the mock object to expectWithTx the "sync" param to be false.
          expect(rpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), eq(false), anyLong(), anyBoolean())).andReturn(null);
       }
 
       replay(rpcManager);
 
       // now try a simple replication.  Since the RPCManager is a mock object it will not actually replicate anything.
-      tl.cache1.putForExternalRead(fqn, key, value);
+      cache1.putForExternalRead(fqn, key, value);
       verify(rpcManager);
 
       // cleanup
-      TestingUtil.extractComponentRegistry(tl.cache1).registerComponent(originalRpcManager, RPCManager.class);
-      tl.cache1.removeNode(fqn);
+      TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager, RPCManager.class);
+      cache1.removeNode(fqn);
    }
 
    public void testTxSuspension() throws Exception
    {
-      PutForExternalReadTestBaseTL tl = threadLocal.get();
       // create parent node first
-      tl.cache1.put(parentFqn, key, value);
+      replListener2.expect(PutKeyValueCommand.class);
+      cache1.put(parentFqn, key, value);
+      replListener2.waitForReplicationToOccur();
 
       // start a tx and do some stuff.
-      tl.tm1.begin();
-      tl.cache1.get(parentFqn, key);
-      tl.cache1.putForExternalRead(fqn, key, value); // should have happened in a separate tx and have committed already.
-      Transaction t = tl.tm1.suspend();
+      replListener2.expect(PutForExternalReadCommand.class);
+      tm1.begin();
+      cache1.get(parentFqn, key);
+      cache1.putForExternalRead(fqn, key, value); // should have happened in a separate tx and have committed already.
+      Transaction t = tm1.suspend();
 
-      asyncWait();
+      replListener2.waitForReplicationToOccur();
 
-      assertLocked(parentFqn, tl.cache1, false);
+      assertLocked(parentFqn, cache1, false);
 
-      assertEquals("PFER should have completed", value, tl.cache1.get(fqn, key));
+      assertEquals("PFER should have completed", value, cache1.get(fqn, key));
       if (isUsingInvalidation())
-         assertNull("PFER should not have effected cache2", tl.cache2.get(fqn, key));
+         assertNull("PFER should not have effected cache2", cache2.get(fqn, key));
       else
-         assertEquals("PFER should have completed", value, tl.cache2.get(fqn, key));
+         assertEquals("PFER should have completed", value, cache2.get(fqn, key));
 
-      tl.tm1.resume(t);
-      tl.tm1.commit();
+      tm1.resume(t);
+      tm1.commit();
 
-      assertEquals("parent fqn tx should have completed", value, tl.cache1.get(parentFqn, key));
+      assertEquals("parent fqn tx should have completed", value, cache1.get(parentFqn, key));
       if (isUsingInvalidation())
-         assertNull("parent fqn tx should have invalidated cache2", tl.cache2.get(parentFqn, key));
+         assertNull("parent fqn tx should have invalidated cache2", cache2.get(parentFqn, key));
       else
-         assertEquals("parent fqn tx should have completed", value, tl.cache2.get(parentFqn, key));
+         assertEquals("parent fqn tx should have completed", value, cache2.get(parentFqn, key));
    }
 
    public void testExceptionSuppression() throws Exception
    {
-      PutForExternalReadTestBaseTL tl = threadLocal.get();
       RPCManager barfingRpcManager = EasyMock.createNiceMock(RPCManager.class);
-      RPCManager originalRpcManager = tl.cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
       try
       {
          List<Address> memberList = originalRpcManager.getMembers();
@@ -206,13 +197,13 @@
          expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), anyBoolean(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
          replay(barfingRpcManager);
 
-         TestingUtil.extractComponentRegistry(tl.cache1).registerComponent(barfingRpcManager, RPCManager.class);
-         tl.cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
-         TestingUtil.extractComponentRegistry(tl.cache1).rewire();
+         TestingUtil.extractComponentRegistry(cache1).registerComponent(barfingRpcManager, RPCManager.class);
+         cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
+         TestingUtil.extractComponentRegistry(cache1).rewire();
 
          try
          {
-            tl.cache1.put(fqn, key, value);
+            cache1.put(fqn, key, value);
             if (!isOptimistic()) fail("Should have barfed");
          }
          catch (RuntimeException re)
@@ -222,14 +213,13 @@
          if (isOptimistic() && !isUsingInvalidation())
          {
             // proves that the put did, in fact, barf.  Doesn't work for invalidations since the inability to invalidate will not cause a rollback.
-            assertNull(tl.cache1.get(fqn, key));
-         }
-         else
+            assertNull(cache1.get(fqn, key));
+         } else
          {
             // clean up any indeterminate state left over
             try
             {
-               tl.cache1.removeNode(fqn);
+               cache1.removeNode(fqn);
                // as above, the inability to invalidate will not cause an exception
                if (!isUsingInvalidation()) fail("Should have barfed");
             }
@@ -238,36 +228,35 @@
             }
          }
 
-         assertNull("Should have cleaned up", tl.cache1.get(fqn, key));
+         assertNull("Should have cleaned up", cache1.get(fqn, key));
 
          // should not barf
-         tl.cache1.putForExternalRead(fqn, key, value);
+         cache1.putForExternalRead(fqn, key, value);
       }
       finally
       {
-         TestingUtil.extractComponentRegistry(tl.cache1).registerComponent(originalRpcManager, RPCManager.class);
+         TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager, RPCManager.class);
       }
    }
 
    public void testBasicPropagation() throws Exception
    {
-      PutForExternalReadTestBaseTL tl = threadLocal.get();
-      assert !tl.cache1.exists(fqn);
-      assert !tl.cache2.exists(fqn);
+      assert !cache1.exists(fqn);
+      assert !cache2.exists(fqn);
 
-      tl.cache1.putForExternalRead(fqn, key, value);
+      replListener2.expect(PutForExternalReadCommand.class);
+      cache1.putForExternalRead(fqn, key, value);
+      replListener2.waitForReplicationToOccur();
 
-      asyncWait();
-
-      assertEquals("PFER updated cache1", value, tl.cache1.get(fqn, key));
+      assertEquals("PFER updated cache1", value, cache1.get(fqn, key));
       Object expected = isUsingInvalidation() ? null : value;
-      assertEquals("PFER propagated to cache2 as expected", expected, tl.cache2.get(fqn, key));
+      assertEquals("PFER propagated to cache2 as expected", expected, cache2.get(fqn, key));
 
       // replication to cache 1 should NOT happen.
-      tl.cache2.putForExternalRead(fqn, key, value);
+      cache2.putForExternalRead(fqn, key, value);
 
-      assertEquals("PFER updated cache2", value, tl.cache2.get(fqn, key));
-      assertEquals("Cache1 should be unaffected", value, tl.cache1.get(fqn, key));
+      assertEquals("PFER updated cache2", value, cache2.get(fqn, key));
+      assertEquals("Cache1 should be unaffected", value, cache1.get(fqn, key));
    }
 
    /**
@@ -297,56 +286,59 @@
     */
    public void testMemLeakOnSuspendedTransactions() throws Exception
    {
-      PutForExternalReadTestBaseTL tl = threadLocal.get();
       Fqn fqn2 = Fqn.fromString("/fqn/two");
 
-      tl.tm1.begin();
-      tl.cache1.putForExternalRead(fqn, key, value);
-      tl.tm1.commit();
+      replListener2.expect(PutForExternalReadCommand.class);
+      tm1.begin();
+      cache1.putForExternalRead(fqn, key, value);
+      tm1.commit();
+      replListener2.waitForReplicationToOccur(10000);
 
-      TestingUtil.sleepThread(500);
 
-      assert tl.cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
-      assert tl.cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
-      assert tl.cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
-      assert tl.cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+      assert cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+      assert cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+      assert cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+      assert cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
 
-      tl.tm1.begin();
-      tl.cache1.putForExternalRead(fqn, key, value);
-      tl.cache1.put(fqn2, key, value);
-      tl.tm1.commit();
+      System.out.println("PutForExternalReadTestBase.testMemLeakOnSuspendedTransactions");
+      //do not expectWithTx a PFER replication, as the node already exists so this is a no-op
+      replListener2.expectWithTx(PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.putForExternalRead(fqn, key, value);
+      cache1.put(fqn2, key, value);
+      tm1.commit();
+      replListener2.waitForReplicationToOccur();
 
-      asyncWait();
+      assert cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+      assert cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+      assert cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+      assert cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
 
-      assert tl.cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
-      assert tl.cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
-      assert tl.cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
-      assert tl.cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+      replListener2.expectWithTx(PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.put(fqn2, key, value);
+      cache1.putForExternalRead(fqn, key, value);
+      tm1.commit();
+      replListener2.waitForReplicationToOccur(60000);
 
-      tl.tm1.begin();
-      tl.cache1.put(fqn2, key, value);
-      tl.cache1.putForExternalRead(fqn, key, value);
-      tl.tm1.commit();
+      assert cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+      assert cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+      assert cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+      assert cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
 
-      asyncWait();
+      //do not expectWithTx a PFER replication, as the node already exists so this is a no-op
+      replListener2.expectWithTx(PutKeyValueCommand.class, PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.put(fqn2, key, value);
+      cache1.putForExternalRead(fqn, key, value);
+      cache1.put(fqn2, key, value);
+      tm1.commit();
+      replListener2.waitForReplicationToOccur(60000);
 
-      assert tl.cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
-      assert tl.cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
-      assert tl.cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
-      assert tl.cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
-
-      tl.tm1.begin();
-      tl.cache1.put(fqn2, key, value);
-      tl.cache1.putForExternalRead(fqn, key, value);
-      tl.cache1.put(fqn2, key, value);
-      tl.tm1.commit();
-
-      asyncWait();
-
-      assert tl.cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
-      assert tl.cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
-      assert tl.cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
-      assert tl.cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+      assert cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+      assert cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+      assert cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+      assert cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
    }
 
    /**
@@ -357,30 +349,29 @@
     */
    private void cacheModeLocalTest(boolean transactional) throws Exception
    {
-      PutForExternalReadTestBaseTL tl = threadLocal.get();
       RPCManager rpcManager = EasyMock.createMock(RPCManager.class);
-      RPCManager originalRpcManager = tl.cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
 
       // inject a mock RPC manager so that we can test whether calls made are sync or async.
-      tl.cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
+      cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
 
-      // specify that we expect nothing will be called on the mock Rpc Manager.
+      // specify that we expectWithTx nothing will be called on the mock Rpc Manager.
       replay(rpcManager);
 
       // now try a simple replication.  Since the RPCManager is a mock object it will not actually replicate anything.
       if (transactional)
-         tl.tm1.begin();
+         tm1.begin();
 
-      tl.cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
-      tl.cache1.putForExternalRead(fqn, key, value);
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.putForExternalRead(fqn, key, value);
 
       if (transactional)
-         tl.tm1.commit();
+         tm1.commit();
 
       verify(rpcManager);
       // cleanup
-      tl.cache1.getConfiguration().getRuntimeConfig().setRPCManager(originalRpcManager);
-      tl.cache1.removeNode(fqn);
+      cache1.getConfiguration().getRuntimeConfig().setRPCManager(originalRpcManager);
+      cache1.removeNode(fqn);
    }
 
    protected abstract void assertLocked(Fqn fqn, CacheSPI cache, boolean writeLocked);
@@ -416,10 +407,4 @@
    {
       return nodeLockingScheme == NodeLockingScheme.OPTIMISTIC;
    }
-
-   protected void asyncWait()
-   {
-      // always needs to do this since PFER will force async comms.
-      TestingUtil.sleepThread(500);
-   }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -101,6 +101,8 @@
       cachesTL.set(caches);
       assertNoLocks(caches);
 
+      TestingUtil.sleepThread(getSleepTimeout());
+
       System.out.println("*** Testing cache 0");
       waitForBuddy(caches.get(0), caches.get(1), false);
       waitForBuddy(caches.get(0), caches.get(2), false);

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -70,6 +70,7 @@
          System.out.println("***** About to kill original data owner (" + caches.get(0).getLocalAddress() + ").  *****");
          // forcefully kill data owner.
          killChannel(caches.get(0), caches.get(1), 2);
+         waitForSingleBuddy(caches.get(1), caches.get(2));
 
          System.out.println("Killed.  Testing backup roots.");
          TestingUtil.dumpCacheContents(caches);
@@ -217,7 +218,7 @@
       assertNoLocks(caches);
 
       // gravitate to 2:
-      caches.get(2).getNode(fqn);  // expect entire subtree to gravitate.
+      caches.get(2).getNode(fqn);  // expectWithTx entire subtree to gravitate.
       delay(); // cleanup commands are async
 
       Fqn newBackupFqn = fqnTransformer.getBackupFqn(caches.get(2).getLocalAddress(), fqn);

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -324,6 +324,20 @@
       waitForBuddy(dataOwner, buddy, onlyBuddy, 60000);   
    }
 
+   /**
+    * Will wait for 60 secs + 10sec * caches.length for the given caches to become buddys.
+    * The caches should be ordered as per underlying view.
+    */
+   public void waitForSingleBuddy(Cache... caches) throws Exception
+   {
+      long timeout = 60000 + caches.length;
+      for (int i = 0; i < caches.length - 1; i++)
+      {
+         waitForBuddy(caches[i], caches[i+1], true, timeout);
+      }
+      waitForBuddy(caches[caches.length - 1], caches[0], true, timeout);
+   }
+
    public void waitForBuddy(Cache dataOwner, Cache buddy, boolean onlyBuddy, long timeout) throws Exception
    {
       long start = System.currentTimeMillis();

Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -4,8 +4,7 @@
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationQueueNotifier;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -46,8 +45,8 @@
 
    public void testTransactionalReplication() throws Exception
    {
-      ReplicationListener cache1Listener = new ReplicationListener(cache);
-      ReplicationListener cache2Listener = new ReplicationListener(cache2);
+      ReplicationListener cache1Listener = ReplicationListener.getReplicationListener(cache);
+      ReplicationListener cache2Listener = ReplicationListener.getReplicationListener(cache2);
 
       cache2Listener.expect(PutKeyValueCommand.class);
       // outside of tx scope

Modified: core/trunk/src/test/java/org/jboss/cache/eviction/ExpirationPolicyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/eviction/ExpirationPolicyTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/eviction/ExpirationPolicyTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -72,19 +72,22 @@
 
    public void testUpdateToFuture() throws Exception
    {
+      future = System.currentTimeMillis() + 2500;
+      past = System.currentTimeMillis() - 1000;
       try
       {
+         EvictionController evictionController = new EvictionController(cache);
          log.info("update 1 from future to past");
          cache.put(fqn1, ExpirationAlgorithmConfig.EXPIRATION_KEY, future);
-         TestingUtil.sleepThread(200);
-         new EvictionController(cache).startEviction();
+         TestingUtil.sleepThread(1000);
+         evictionController.startEviction();
          assertNotNull(cache.getNode(fqn1));
-         cache.put(fqn1, ExpirationAlgorithmConfig.EXPIRATION_KEY, future + 250);
-         TestingUtil.sleepThread(500);
-         new EvictionController(cache).startEviction();
+         cache.put(fqn1, ExpirationAlgorithmConfig.EXPIRATION_KEY, future + 1200);
+         TestingUtil.sleepThread(2000);
+         evictionController.startEviction();
          assertNotNull(cache.getNode(fqn1));
-         TestingUtil.sleepThread(100);
-         new EvictionController(cache).startEviction();
+         TestingUtil.sleepThread(1000);
+         evictionController.startEviction();
          assertNull(cache.getNode(fqn1));
       }
       finally

Modified: core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -13,12 +13,13 @@
 import org.jboss.cache.Node;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.config.CacheLoaderConfig;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.optimistic.DefaultDataVersion;
 import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -118,18 +119,18 @@
       cache2.start();
 
       Fqn fqn = Fqn.fromString("/a/b");
-      ReplicationListener replListener1 = new ReplicationListener(cache1);
-      ReplicationListener replListener2 = new ReplicationListener(cache2);
+      ReplicationListener replListener1 = ReplicationListener.getReplicationListener(cache1);
+      ReplicationListener replListener2 = ReplicationListener.getReplicationListener(cache2);
 
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(fqn, "key", "value");
-      replListener2.waitForReplicationToOccur(500);
+      replListener2.waitForReplicationToOccur();
 //      TestingUtil.sleepThread(500);// give it time to broadcast the evict call
       // test that this has NOT replicated, but rather has been invalidated:
       assertEquals("value", cache1.get(fqn, "key"));
       assertNull("Should NOT have replicated!", cache2.getNode(fqn));
 
-      replListener1.expectAny();
+      replListener1.expect(PutKeyValueCommand.class);
       // now make sure cache2 is in sync with cache1:
       cache2.put(fqn, "key", "value");
 //      TestingUtil.sleepThread(500);// give it time to broadcast the evict call
@@ -140,7 +141,7 @@
       assertHasBeenInvalidated(n, "Should have been invalidated");
       assertEquals("value", cache2.get(fqn, "key"));
 
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       // now test the invalidation:
       cache1.put(fqn, "key2", "value2");
       assertEquals("value2", cache1.get(fqn, "key2"));

Modified: core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -12,11 +12,12 @@
 import org.jboss.cache.UnitTestCacheFactory;
 import org.jboss.cache.commands.tx.CommitCommand;
 import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Configuration.NodeLockingScheme;
 import org.jboss.cache.transaction.DummyTransactionManagerLookup;
 import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNull;
 import org.testng.annotations.AfterMethod;
@@ -202,7 +203,7 @@
    public void testPessAsyncRepl() throws Exception
    {
       createCaches(false, false);
-      ReplicationListener replListener = new ReplicationListener(cache2);
+      ReplicationListener replListener = ReplicationListener.getReplicationListener(cache2);
 
       mgr1.begin();
       cache1.put(fqn, key, "value");
@@ -212,9 +213,9 @@
       assertNull(loader1.get(fqn));
       assertNull(loader2.get(fqn));
 
-      replListener.expect(PrepareCommand.class);
+      replListener.expectWithTx(PutKeyValueCommand.class);
       mgr1.commit();
-      replListener.waitForReplicationToOccur(500);
+      replListener.waitForReplicationToOccur();
 
       assertEquals("value", cache1.get(fqn, key));
       assertEquals("value", cache2.get(fqn, key));
@@ -278,10 +279,10 @@
    {
       createCaches(false, true);
 
-      ReplicationListener replListener = new ReplicationListener(cache2);
+      ReplicationListener replListener = ReplicationListener.getReplicationListener(cache2);
 
       mgr1.begin();
-      replListener.expect(CommitCommand.class);
+      replListener.expectWithTx(PutKeyValueCommand.class);
       cache1.put(fqn, key, "value");
 
       assertEquals("value", cache1.get(fqn, key));
@@ -291,7 +292,7 @@
 
       mgr1.commit();
 
-      replListener.waitForReplicationToOccur(500);
+      replListener.waitForReplicationToOccur();
 
       assertEquals("value", cache1.get(fqn, key));
       assertEquals("value", cache2.get(fqn, key));

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -12,29 +12,23 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Region;
-import org.jboss.cache.commands.legacy.write.PessPutKeyValueCommand;
+import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.config.Configuration.NodeLockingScheme;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.marshall.data.Address;
 import org.jboss.cache.marshall.data.Person;
 import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import javax.transaction.*;
 import java.lang.reflect.Method;
-import org.jboss.cache.UnitTestCacheFactory;
-import org.jboss.cache.config.Configuration;
 
 /**
  * Test marshalling for async mode.
@@ -66,8 +60,8 @@
 
       cache2 = createCache("TestCache");
 
-      replListener1 = new ReplicationListener(cache1);
-      replListener2 = new ReplicationListener(cache2);
+      replListener1 = ReplicationListener.getReplicationListener(cache1);
+      replListener2 = ReplicationListener.getReplicationListener(cache2);
       addr = new Address();
       addr.setCity("San Jose");
       ben = new Person();
@@ -121,11 +115,11 @@
       }
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
-      replListener2.expect(PessPutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(aop, "person", ben);
       replListener2.waitForReplicationToOccur(500);
 
-      replListener2.expect(PessPutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(Fqn.fromString("/alias"), "person", ben);
       replListener2.waitForReplicationToOccur(500);
 
@@ -150,7 +144,7 @@
       // Set it back to the cache
       // Can't cast it to Person. CCE will resutl.
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
-      replListener1.expect(PessPutKeyValueCommand.class);
+      replListener1.expect(PutKeyValueCommand.class);
       cache2.put(aop, "person", ben2);
       replListener1.waitForReplicationToOccur(1000);
       if (useMarshalledValues) resetContextClassLoader();
@@ -180,11 +174,11 @@
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
 
-      replListener2.expect(PessPutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(Fqn.fromString("/aop/1"), "person", ben);
       replListener2.waitForReplicationToOccur(1000);
 
-      replListener2.expect(PessPutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(Fqn.fromString("/aop/2"), "person", scopedBen1);
       replListener2.waitForReplicationToOccur(1000);
 
@@ -204,7 +198,7 @@
 
    public void testTxPut() throws Exception
    {
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       beginTransaction();
       cache1.put(aop, "person", ben);
       commit();
@@ -228,7 +222,7 @@
       }
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       beginTransaction();
       cache1.put(aop, "person", ben);
       commit();
@@ -254,7 +248,7 @@
       // Set it back to the cache
       // Can't cast it to Person. CCE will resutl.
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
-      replListener1.expectAny();
+      replListener1.expect(PutKeyValueCommand.class);
       cache2.put(aop, "person", ben2);
       if (useMarshalledValues) resetContextClassLoader();
       replListener1.waitForReplicationToOccur(100);
@@ -282,7 +276,7 @@
 
       Fqn base = Fqn.fromString("/aop");
       Fqn fqn = Fqn.fromRelativeElements(base, custom1);
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(fqn, "key", "value");
       replListener2.waitForReplicationToOccur(10000);
 

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -4,11 +4,12 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Region;
 import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Configuration.NodeLockingScheme;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -50,7 +51,7 @@
       c1.start();
 
       c2 = new UnitTestCacheFactory<Object, Object>().createCache(c1.getConfiguration().clone());
-      replListener2 = new ReplicationListener(c2);
+      replListener2 = ReplicationListener.getReplicationListener(c2);
 
       TestingUtil.blockUntilViewsReceived(60000, c1, c2);
    }
@@ -69,7 +70,7 @@
       c1.getRegion(fqn.getParent(), true).registerContextClassLoader(getClass().getClassLoader());
       c2.getRegion(fqn.getParent(), true).registerContextClassLoader(getClass().getClassLoader());
 
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       // write something; will cause a stale region to be stored in C2's cache marshaller
       c1.put(fqn, "k", "v");
       assert c1.get(fqn, "k").equals("v");

Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -12,7 +12,7 @@
 import org.jboss.cache.transaction.OptimisticTransactionContext;
 import org.jboss.cache.transaction.TransactionTable;
 import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.Test;
 
@@ -175,7 +175,7 @@
       groupIncreaser++;
       CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
       CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
-      ReplicationListener replListener2 = new ReplicationListener(cache2);
+      ReplicationListener replListener2 = ReplicationListener.getReplicationListener(cache2);
       LockManager lockManager = TestingUtil.extractLockManager(cache);
       LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
 
@@ -189,11 +189,11 @@
 
       SamplePojo pojo = new SamplePojo(21, "test");
 
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       cache.put("/one/two", "key1", pojo);
 
       mgr.commit();
-      replListener2.waitForReplicationToOccur(1000);
+      replListener2.waitForReplicationToOccur();
 
       // cache asserts
       assertNull(mgr.getTransaction());
@@ -237,7 +237,7 @@
       groupIncreaser++;
       CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
       CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
-      ReplicationListener replListener2 = new ReplicationListener(cache2);
+      ReplicationListener replListener2 = ReplicationListener.getReplicationListener(cache2);
       LockManager lockManager = TestingUtil.extractLockManager(cache);
       LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
 
@@ -251,7 +251,7 @@
 
       SamplePojo pojo = new SamplePojo(21, "test");
 
-      replListener2.expect(PutKeyValueCommand.class, CommitCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache.put("/one/two", "key1", pojo);
 
       mgr.commit();
@@ -291,9 +291,9 @@
       assertNotNull(cache2.get(Fqn.fromString("/one/two"), "key1"));
 
       replListener2.expect(RemoveNodeCommand.class);
-      replListener2.expect(CommitCommand.class);
+      replListener2.expect();
       cache.removeNode("/one/two");
-      replListener2.waitForReplicationToOccur(1000);
+      replListener2.waitForReplicationToOccur();
 
       assertEquals(false, cache.exists("/one/two"));
       assertEquals(null, cache.get("/one/two", "key1"));
@@ -307,7 +307,7 @@
    {
       groupIncreaser++;
       CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
-      ReplicationListener replListener = new ReplicationListener(cache);
+      ReplicationListener replListener = ReplicationListener.getReplicationListener(cache);
       CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
       LockManager lockManager = TestingUtil.extractLockManager(cache);
 

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationOptLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationOptLocksTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationOptLocksTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -21,14 +21,4 @@
         nodeLockingScheme = "OPTIMISTIC";
         isInvalidation = true;
     }
-
-   protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
-   {
-      //do nothing
-   }
-
-   protected void verifyReplication()
-    {
-        TestingUtil.sleepThread(500);
-    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationPessLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationPessLocksTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationPessLocksTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -21,19 +21,4 @@
       nodeLockingScheme = "PESSIMISTIC";
       isInvalidation = true;
    }
-
-   protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
-   {
-      //do nothing
-   }
-
-   protected void verifyReplication()
-   {
-      TestingUtil.sleepThread(500);
-   }
-
-   public void testPutKeyValueViaNodeAPI() throws Exception
-   {
-      super.testPutDataViaNodeAPI();
-   }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplOptLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplOptLocksTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplOptLocksTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -22,75 +22,4 @@
       cacheMode = Configuration.CacheMode.REPL_ASYNC;
       nodeLockingScheme = "OPTIMISTIC";
    }
-
-   protected void verifyReplication()
-   {
-      TestingUtil.sleepThread(500);
-   }
-
-   public void testRemoveKey() throws Exception
-   {
-      super.testRemoveKey();
-   }
-
-   public void testPutKeyValue() throws Exception
-   {
-      super.testPutKeyValue();
-   }
-
-   public void testPutKeyValueViaNodeAPI() throws Exception
-   {
-      super.testPutKeyValueViaNodeAPI();
-   }
-
-   public void testPutData() throws Exception
-   {
-      super.testPutData();
-   }
-
-   public void testPutDataViaNodeAPI() throws Exception
-   {
-      super.testPutDataViaNodeAPI();
-   }
-
-   public void testRemoveNode() throws Exception
-   {
-      super.testRemoveNode();
-   }
-
-   public void testRemoveNodeViaNodeAPI() throws Exception
-   {
-      super.testRemoveNodeViaNodeAPI();
-   }
-
-   public void testRemoveKeyViaNodeAPI() throws Exception
-   {
-      super.testRemoveKeyViaNodeAPI();
-   }
-
-   public void testTransactionalBehaviourCommit() throws Exception
-   {
-      super.testTransactionalBehaviourCommit();
-   }
-
-   public void testTransactionalBehaviourRollback() throws Exception
-   {
-      super.testTransactionalBehaviourRollback();
-   }
-
-   public void testTransactionalBehaviourViaNodeAPI() throws Exception
-   {
-      super.testTransactionalBehaviourViaNodeAPI();
-   }
-
-   public void testAddChild() throws Exception
-   {
-      super.testAddChild();
-   }
-
-   protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
-   {
-      //do nothing
-   }
-
 }

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplPessLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplPessLocksTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplPessLocksTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -7,7 +7,7 @@
 package org.jboss.cache.options.cachemodelocal;
 
 import org.jboss.cache.config.Configuration;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.commands.legacy.write.PessPutDataMapCommand;
 import org.jboss.cache.commands.legacy.write.PessPutKeyValueCommand;
@@ -27,7 +27,6 @@
 @Test(groups = {"functional", "jgroups"}, testName = "options.cachemodelocal.AsyncReplPessLocksTest")
 public class AsyncReplPessLocksTest extends CacheModeLocalTestBase
 {
-
    ReplicationListener current;
 
    Map<Cache, ReplicationListener> cache2Listener = new HashMap<Cache, ReplicationListener>(2);
@@ -37,29 +36,4 @@
       cacheMode = Configuration.CacheMode.REPL_ASYNC;
       nodeLockingScheme = "PESSIMISTIC";
    }
-
-   protected void registerReplicationCommand(CacheSPI<String, String> cache, Class<? extends ReplicableCommand> what)
-   {
-      if (what == PutDataMapCommand.class) what = PessPutDataMapCommand.class;
-      if (what == PutKeyValueCommand.class) what = PessPutKeyValueCommand.class;
-      if (what == RemoveKeyCommand.class) what = PessRemoveKeyCommand.class;
-      if (what == RemoveNodeCommand.class) what = PessRemoveNodeCommand.class;
-      getCacheListener(cache).expect(what);
-   }
-
-   protected void verifyReplication()
-   {
-      current.waitForReplicationToOccur(2000);
-   }
-
-   public ReplicationListener getCacheListener(Cache cache)
-   {
-      current = cache2Listener.get(cache);
-      if (current == null)
-      {
-         current = new ReplicationListener(cache);
-         cache2Listener.put(cache, current);
-      }
-      return current;
-   }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -19,6 +19,7 @@
 import org.jboss.cache.commands.write.RemoveNodeCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -45,12 +46,15 @@
     * set this to true if the implementing class plans to use an invalidating cache mode *
     */
    protected boolean isInvalidation;
+   CacheSPI<String, String> cache1;
+   CacheSPI<String, String> cache2;
 
-   private ThreadLocal<CacheSPI<String, String>> cache1TL = new ThreadLocal<CacheSPI<String, String>>();
-   private ThreadLocal<CacheSPI<String, String>> cache2TL = new ThreadLocal<CacheSPI<String, String>>();
-   private ThreadLocal<NodeSPI<String, String>> root1TL = new ThreadLocal<NodeSPI<String, String>>();
-   private ThreadLocal<NodeSPI<String, String>> root2TL = new ThreadLocal<NodeSPI<String, String>>();
+   NodeSPI<String, String> root1;
+   NodeSPI<String, String> root2;
 
+   ReplicationListener replListener1;
+   ReplicationListener replListener2;
+
    private final Fqn fqn = Fqn.fromString("/a");
    private final String key = "key";
 
@@ -58,13 +62,6 @@
    public void setUp() throws Exception
    {
 
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-
-      // force a tear down if the test runner didn't run one before (happens in IDEA)
-      if (cache1 != null || cache2 != null)
-         tearDown();
-
       CacheFactory<String, String> instance = new UnitTestCacheFactory<String, String>();
 
       Configuration c = new Configuration();
@@ -89,29 +86,22 @@
       cache2 = (CacheSPI<String, String>) instance.createCache(c, false);
       cache2.start();
 
-      cache1TL.set(cache1);
-      cache2TL.set(cache2);
 
-      root1TL.set(cache1.getRoot());
-      root2TL.set(cache2.getRoot());
+      root1 = cache1.getRoot();
+      root2 = cache2.getRoot();
+
+      replListener1 = ReplicationListener.getReplicationListener(cache1);
+      replListener2 = ReplicationListener.getReplicationListener(cache2);
    }
 
    @AfterMethod(alwaysRun = true)
    public void tearDown()
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-
       TestingUtil.killCaches(cache1, cache2);
-      cache1TL.set(null);
-      cache2TL.set(null);
    }
 
    public void testPutKeyValue() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       cache1.put(fqn, key, "value");
       Thread.sleep(500);
@@ -123,10 +113,10 @@
       assertNull("Should be null", cache2.get(fqn, key));
 
       // now try again with passing the default options
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.getInvocationContext().getOptionOverrides().reset();
       cache1.put(fqn, key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       // cache1 should still have this
       assertEquals("value", cache1.get(fqn, key));
 
@@ -147,10 +137,10 @@
       assertEquals("value2", cache2.get(fqn, key));
       assertEquals("value", cache1.get(fqn, key));
 
-      registerReplicationCommand(cache1, PutKeyValueCommand.class);
+      replListener1.expect(PutKeyValueCommand.class);
       cache2.getInvocationContext().getOptionOverrides().reset();
       cache2.put(fqn, key, "value2");
-      verifyReplication();
+      replListener1.waitForReplicationToOccur();
       assertEquals("value2", cache2.get(fqn, key));
       if (!isInvalidation)
       {
@@ -164,11 +154,6 @@
 
    public void testPutKeyValueViaNodeAPI() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-      NodeSPI<String, String> root1 = root1TL.get();
-      NodeSPI<String, String> root2 = root2TL.get();
-
       Node node1 = root1.addChild(fqn);
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       node1.put(key, "value");
@@ -180,10 +165,10 @@
       assertNull("Should be null", cache2.get(fqn, key));
 
       // now try again with passing the default options
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.getInvocationContext().getOptionOverrides().reset();
       node1.put(key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       // cache1 should still have this
       assertEquals("value", cache1.get(fqn, key));
 
@@ -207,10 +192,10 @@
       assertEquals("value2", cache2.get(fqn, key));
       assertEquals("value", cache1.get(fqn, key));
 
-      registerReplicationCommand(cache1, PutKeyValueCommand.class);
+      replListener1.expect(PutKeyValueCommand.class);
       cache2.getInvocationContext().getOptionOverrides().reset();
       node2.put(key, "value2");
-      verifyReplication();
+      replListener1.waitForReplicationToOccur();
       assertEquals("value2", cache2.get(fqn, key));
       if (!isInvalidation)
       {
@@ -224,9 +209,6 @@
 
    public void testPutData() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-
       Map<String, String> map = new HashMap<String, String>();
       map.put(key, "value");
 
@@ -239,10 +221,10 @@
       assertNull("Should be null", cache2.get(fqn, key));
 
       // now try again with passing the default options
-      registerReplicationCommand(cache2, PutDataMapCommand.class);
+      replListener2.expect(PutDataMapCommand.class);
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
       cache1.put(fqn, map);
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       // cache1 should still have this
       assertEquals("value", cache1.get(fqn, key));
       // cache 2 should as well
@@ -264,10 +246,10 @@
       assertEquals("value2", cache2.get(fqn, key));
       assertEquals("value", cache1.get(fqn, key));
 
-      registerReplicationCommand(cache1, PutKeyValueCommand.class);
+      replListener1.expect(PutKeyValueCommand.class);
       cache2.getInvocationContext().getOptionOverrides().reset();
       cache2.put(fqn, key, "value2");
-      verifyReplication();
+      replListener1.waitForReplicationToOccur();
       assertEquals("value2", cache2.get(fqn, key));
       if (!isInvalidation)
       {
@@ -281,11 +263,6 @@
 
    public void testPutDataViaNodeAPI() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-      NodeSPI<String, String> root1 = root1TL.get();
-      NodeSPI<String, String> root2 = root2TL.get();
-
       Map<String, String> map = new HashMap<String, String>();
       map.put(key, "value");
 
@@ -299,10 +276,10 @@
       assertNull("Should be null", cache2.get(fqn, key));
 
       // now try again with passing the default options
-      registerReplicationCommand(cache2, PutDataMapCommand.class);
+      replListener2.expect(PutDataMapCommand.class);
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
       node1.putAll(map);
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       // cache1 should still have this
       assertEquals("value", cache1.get(fqn, key));
       // cache 2 should as well
@@ -326,10 +303,10 @@
       assertEquals("value2", cache2.get(fqn, key));
       assertEquals("value", cache1.get(fqn, key));
 
-      registerReplicationCommand(cache1, PutKeyValueCommand.class);
+      replListener1.expect(PutKeyValueCommand.class);
       cache2.getInvocationContext().getOptionOverrides().reset();
       node2.put(key, "value2");
-      verifyReplication();
+      replListener1.waitForReplicationToOccur();
       assertEquals("value2", cache2.get(fqn, key));
       if (!isInvalidation)
       {
@@ -343,15 +320,12 @@
 
    public void testRemoveNode() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-
       // put some stuff in the cache first
       // make sure we cleanup thread local vars.
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.getInvocationContext().setOptionOverrides(null);
       cache1.put(fqn, key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       assertEquals("value", cache1.get(fqn, key));
       if (isInvalidation)
       {
@@ -379,9 +353,9 @@
       }
 
       // replace cache entries
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(fqn, key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       assertEquals("value", cache1.get(fqn, key));
       if (isInvalidation)
       {
@@ -393,10 +367,10 @@
       }
 
       // now try again with passing the default options
-      registerReplicationCommand(cache2, RemoveNodeCommand.class);
+      replListener2.expect(RemoveNodeCommand.class);
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
       cache1.removeNode(fqn);
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
 
       // both should be null
       assertNull("should be null", cache1.get(fqn, key));
@@ -405,18 +379,14 @@
 
    public void testRemoveNodeViaNodeAPI() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-      NodeSPI<String, String> root1 = root1TL.get();
-      NodeSPI<String, String> root2 = root2TL.get();
 
       // put some stuff in the cache first
       // make sure we cleanup thread local vars.
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.getInvocationContext().setOptionOverrides(null);
       cache1.put(fqn, key, "value");
       assertEquals("value", cache1.get(fqn, key));
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       if (isInvalidation)
       {
          assertNull("Should be null", cache2.get(fqn, key));
@@ -443,9 +413,9 @@
       }
 
       // replace cache entries
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(fqn, key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       assertEquals("value", cache1.get(fqn, key));
       if (isInvalidation)
       {
@@ -456,11 +426,11 @@
          assertEquals("value", cache2.get(fqn, key));
       }
 
-      registerReplicationCommand(cache2, RemoveNodeCommand.class);
+      replListener2.expect(RemoveNodeCommand.class);
       // now try again with passing the default options
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
       root1.removeChild(fqn);
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
 
       // both should be null
       assertNull("should be null", cache1.get(fqn, key));
@@ -469,14 +439,11 @@
 
    public void testRemoveKey() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect((Class<? extends ReplicableCommand>) PutKeyValueCommand.class);
       // put some stuff in the cache first
       cache1.getInvocationContext().setOptionOverrides(null);
       cache1.put(fqn, key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       assertEquals("value", cache1.get(fqn, key));
       if (isInvalidation)
       {
@@ -503,10 +470,10 @@
          assertEquals("value", cache2.get(fqn, key));
       }
 
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       // replace cache entries
       cache1.put(fqn, key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       assertEquals("value", cache1.get(fqn, key));
       if (isInvalidation)
       {
@@ -517,11 +484,11 @@
          assertEquals("value", cache2.get(fqn, key));
       }
 
-      registerReplicationCommand(cache2, RemoveKeyCommand.class);
+      replListener2.expect(RemoveKeyCommand.class);
       // now try again with passing the default options
       cache1.getInvocationContext().getOptionOverrides().reset();
       cache1.remove(fqn, key);
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
 
       // both should be null
       assertNull("should be null", cache1.get(fqn, key));
@@ -530,17 +497,12 @@
 
    public void testRemoveKeyViaNodeAPI() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-      NodeSPI<String, String> root1 = root1TL.get();
-      NodeSPI<String, String> root2 = root2TL.get();
-
       // put some stuff in the cache first
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       Node node1 = root1.addChild(fqn);
       cache1.getInvocationContext().setOptionOverrides(null);
       node1.put(key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       assertEquals("value", cache1.get(fqn, key));
       if (isInvalidation)
       {
@@ -568,9 +530,9 @@
       }
 
       // replace cache entries
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expect(PutKeyValueCommand.class);
       node1.put(key, "value");
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       assertEquals("value", cache1.get(fqn, key));
       if (isInvalidation)
       {
@@ -582,10 +544,10 @@
       }
 
       // now try again with passing the default options
-      registerReplicationCommand(cache2, RemoveKeyCommand.class);
+      replListener2.expect(RemoveKeyCommand.class);
       cache1.getInvocationContext().getOptionOverrides().reset();
       node1.remove(key);
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
 
       // both should be null
       assertNull("should be null", cache1.get(fqn, key));
@@ -594,18 +556,15 @@
 
    public void testTransactionalBehaviourCommit() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-
       TransactionManager mgr = cache1.getTransactionManager();
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expectWithTx(PutKeyValueCommand.class);
       mgr.begin();
       cache1.getInvocationContext().getOptionOverrides().reset();
       cache1.put(fqn, key, "value1");
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       cache1.put(fqn, key, "value2");
       mgr.commit();
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       // cache1 should still have this
       assertEquals("value2", cache1.get(fqn, key));
 
@@ -618,7 +577,7 @@
          assertNull(cache2.get(fqn, key));
       }
 
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expectWithTx(PutKeyValueCommand.class);
       // now try again with passing the default options
       mgr.begin();
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
@@ -626,7 +585,7 @@
       cache1.getInvocationContext().getOptionOverrides().reset();
       cache1.put(fqn, key, "value");
       mgr.commit();
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       // cache1 should still have this
       assertEquals("value", cache1.get(fqn, key));
 
@@ -641,7 +600,7 @@
       }
 
       // now cache2
-      registerReplicationCommand(cache1, PutKeyValueCommand.class);
+      replListener1.expect(PutKeyValueCommand.class);
       mgr = cache2.getTransactionManager();
       mgr.begin();
       cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
@@ -649,7 +608,7 @@
       cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       cache2.put(fqn, key, "value2");
       mgr.commit();
-      verifyReplication();
+      replListener1.waitForReplicationToOccur();
 
       assertEquals("value2", cache2.get(fqn, key));
 
@@ -662,14 +621,14 @@
          assertNull(cache1.get(fqn, key));
       }
 
-      registerReplicationCommand(cache1, PutKeyValueCommand.class);
+      replListener1.expectWithTx(PutKeyValueCommand.class);
       mgr.begin();
       cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       cache2.put(fqn, key, "value2");
       cache2.getInvocationContext().getOptionOverrides().reset();
       cache2.put(fqn, key, "value4");
       mgr.commit();
-      verifyReplication();
+      replListener1.waitForReplicationToOccur();
       assertEquals("value4", cache2.get(fqn, key));
       if (!isInvalidation)
       {
@@ -684,15 +643,12 @@
 
    public void testTransactionalBehaviourRollback() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-
       TransactionManager mgr = cache1.getTransactionManager();
 
-      // create these first ...
+      replListener2.expect(PutKeyValueCommand.class, PutKeyValueCommand.class);
       cache1.put("/a", key, "old");
       cache1.put("/b", key, "old");
-      Thread.sleep(500);
+      replListener2.waitForReplicationToOccur();
 
 
       mgr.begin();
@@ -720,13 +676,8 @@
 
    public void testTransactionalBehaviourViaNodeAPI() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-      NodeSPI<String, String> root1 = root1TL.get();
-      NodeSPI<String, String> root2 = root2TL.get();
-
       Node node1 = root1.addChild(fqn);
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expectWithTx(PutKeyValueCommand.class);
       TransactionManager mgr = cache1.getTransactionManager();
       mgr.begin();
       cache1.getInvocationContext().getOptionOverrides().reset();
@@ -734,7 +685,8 @@
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       node1.put(key, "value2");
       mgr.commit();
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
+
       // cache1 should still have this
       assertEquals("value2", cache1.get(fqn, key));
 
@@ -748,14 +700,14 @@
       }
 
       // now try again with passing the default options
-      registerReplicationCommand(cache2, PutKeyValueCommand.class);
+      replListener2.expectWithTx(PutKeyValueCommand.class);
       mgr.begin();
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       node1.put(key, "value3");
       cache1.getInvocationContext().getOptionOverrides().reset();
       node1.put(key, "value");
       mgr.commit();
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
       // cache1 should still have this
       assertEquals("value", cache1.get(fqn, key));
 
@@ -772,14 +724,14 @@
       // now cache2
       Node node2 = root2.addChild(fqn);
       mgr = cache2.getTransactionManager();
-      registerReplicationCommand(cache1, PutKeyValueCommand.class);
+      replListener1.expectWithTx(PutKeyValueCommand.class);
       mgr.begin();
       cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
       node2.put(key, "value3");
       cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       node2.put(key, "value2");
       mgr.commit();
-      verifyReplication();
+      replListener1.waitForReplicationToOccur();
 
       assertEquals("value2", cache2.get(fqn, key));
 
@@ -792,14 +744,14 @@
          assertNull(cache1.get(fqn, key));
       }
 
-      registerReplicationCommand(cache1, PutKeyValueCommand.class);
+      replListener1.expectWithTx(PutKeyValueCommand.class);
       mgr.begin();
       cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       node2.put(key, "value2");
       cache2.getInvocationContext().getOptionOverrides().reset();
       node2.put(key, "value4");
       mgr.commit();
-      verifyReplication();
+      replListener1.waitForReplicationToOccur();
       assertEquals("value4", cache2.get(fqn, key));
       if (!isInvalidation)
       {
@@ -814,14 +766,9 @@
 
    public void testAddChild() throws Exception
    {
-      CacheSPI<String, String> cache1 = cache1TL.get();
-      CacheSPI<String, String> cache2 = cache2TL.get();
-      NodeSPI<String, String> root1 = root1TL.get();
-      NodeSPI<String, String> root2 = root2TL.get();
-
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
       root1.addChild(fqn);
-      Thread.sleep(1000);
+
       // cache1 should still have this
       assertTrue(root1.hasChild(fqn));
       // cache 2 should not
@@ -829,12 +776,15 @@
       assertTrue("Should be null", node2 == null || (isInvalidation && !node2.isValid()));
 
       // now try again with passing the default options
+      replListener2.expect(RemoveNodeCommand.class);
       root1.removeChild(fqn);
+      replListener2.waitForReplicationToOccur();
+
       cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
 
-      registerReplicationCommand(cache2, PutDataMapCommand.class);
+      replListener2.expect(PutDataMapCommand.class);
       root1.addChild(fqn);
-      verifyReplication();
+      replListener2.waitForReplicationToOccur();
 
       // cache1 should still have this
       assertTrue(root1.hasChild(fqn));
@@ -848,19 +798,4 @@
          assertTrue("Should be null", node2 == null || !node2.isValid());
       }
    }
-
-   /**
-    * Register a command that should be received by the specified cache as the result of a replication taking place.
-    * @param cache the cache on which the given command should be replicated.
-    * @param what the command that should be replicated
-    * @see org.jboss.cache.options.cachemodelocal.AsyncReplPessLocksTest#registerReplicationCommand(org.jboss.cache.CacheSPI, Class)
-    */
-   protected abstract void registerReplicationCommand(CacheSPI<String, String> cache, Class<? extends ReplicableCommand> what);
-
-
-   /**
-    * Wait a while until the give command gets replicated.
-    * @see AsyncReplPessLocksTest#verifyReplication() 
-    */
-   protected abstract void verifyReplication();
 }
\ No newline at end of file

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationOptLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationOptLocksTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationOptLocksTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -20,13 +20,4 @@
         nodeLockingScheme = "OPTIMISTIC";
         isInvalidation = true;
     }
-
-   protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
-   {
-      //do nothing
-   }
-
-   protected void verifyReplication()
-    {
-    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationPessLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationPessLocksTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationPessLocksTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -20,13 +20,4 @@
         nodeLockingScheme = "PESSIMISTIC";
         isInvalidation = true;
     }
-
-   protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
-   {
-      //do nothing
-   }
-
-   protected void verifyReplication()
-    {
-    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplOptLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplOptLocksTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplOptLocksTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -20,15 +20,4 @@
         cacheMode = Configuration.CacheMode.REPL_SYNC;
         nodeLockingScheme = "OPTIMISTIC";
     }
-
-   protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
-   {
-      //do nothing
-   }
-
-   protected void verifyReplication()
-    {
-        TestingUtil.sleepThread(250);
-    }
-
 }

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplPessLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplPessLocksTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplPessLocksTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -19,13 +19,4 @@
         cacheMode = Configuration.CacheMode.REPL_SYNC;
         nodeLockingScheme = "PESSIMISTIC";
     }
-
-   protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
-   {
-      //do nothing
-   }
-
-   protected void verifyReplication()
-    {
-    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/replicated/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/replicated/AsyncReplTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/replicated/AsyncReplTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -14,7 +14,7 @@
 import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -22,6 +22,7 @@
 
 import javax.transaction.TransactionManager;
 import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.config.Configuration;
 
 /**
@@ -47,11 +48,11 @@
       threadLocal.set(tl);
       log("creating cache1");
       tl.cache1 = createCache("CacheGroup");
-      tl.replListener1 = new ReplicationListener(tl.cache1);
+      tl.replListener1 = ReplicationListener.getReplicationListener(tl.cache1);
 
       log("creating cache2");
       tl.cache2 = createCache("CacheGroup");
-      tl.replListener2 = new ReplicationListener(tl.cache2);
+      tl.replListener2 = ReplicationListener.getReplicationListener(tl.cache2);
    }
 
 
@@ -116,25 +117,24 @@
       Fqn fqn = Fqn.fromString("/a");
       String key = "key";
 
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(fqn, key, "value1");
       // allow for replication
-      replListener2.waitForReplicationToOccur(500);
+      replListener2.waitForReplicationToOccur();
       assertEquals("value1", cache1.get(fqn, key));
       assertEquals("value1", cache2.get(fqn, key));
 
       TransactionManager mgr = cache1.getTransactionManager();
       mgr.begin();
 
-      replListener2.expectAny();
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(fqn, key, "value2");
       assertEquals("value2", cache1.get(fqn, key));
       assertEquals("value1", cache2.get(fqn, key));
 
       mgr.commit();
+      replListener2.waitForReplicationToOccur();
 
-      replListener2.waitForReplicationToOccur(500);
-
       assertEquals("value2", cache1.get(fqn, key));
       assertEquals("value2", cache2.get(fqn, key));
 
@@ -162,7 +162,7 @@
       {
          cache3 = createCache("DifferentGroup");
          cache4 = createCache("DifferentGroup");
-         replListener2.expectAny();
+         replListener2.expect(PutKeyValueCommand.class);
          cache1.put("/a/b/c", "age", 38);
          // because we use async repl, modfication may not yet have been propagated to cache2, so
          // we have to wait a little

Modified: core/trunk/src/test/java/org/jboss/cache/replicated/PessimisticSyncReplTxTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/replicated/PessimisticSyncReplTxTest.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/replicated/PessimisticSyncReplTxTest.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -61,7 +61,7 @@
    public void setUp() throws Exception
    {
       t1_ex = t2_ex = null;
-      lock = new Semaphore(1);
+      lock = new Semaphore(1, true);
    }
 
    @AfterMethod(alwaysRun = true)
@@ -938,6 +938,11 @@
    {
       initCaches(Configuration.CacheMode.REPL_SYNC);
       final CacheSPI<Object, Object> c1 = this.cache1;
+
+      final Semaphore threadOneFirstPart = new Semaphore(0);
+      final Semaphore threadTwoFirstPart = new Semaphore(0);
+      final Semaphore threadOneSecondPart = new Semaphore(0);
+
       Thread t1 = new Thread()
       {
          public void run()
@@ -946,14 +951,12 @@
 
             try
             {
-               lock.acquire();
                tm = beginTransaction();
                c1.put("/a/b/c", "age", 38);
                c1.put("/a/b/c", "age", 39);
-               lock.release();
+               threadOneFirstPart.release();
 
-               TestingUtil.sleepThread(300);
-               lock.acquire();
+               threadTwoFirstPart.acquire();
                try
                {
                   tm.commit();
@@ -961,10 +964,9 @@
                catch (RollbackException ex)
                {
                   System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
-               }
-               finally
+               } finally
                {
-                  lock.release();
+                  threadOneSecondPart.release();   
                }
             }
             catch (Throwable ex)
@@ -972,10 +974,6 @@
                ex.printStackTrace();
                t1_ex = ex;
             }
-            finally
-            {
-               lock.release();
-            }
          }
       };
 
@@ -987,21 +985,17 @@
 
             try
             {
-               sleep(200);
-               Thread.yield();
-               lock.acquire();
+               threadOneFirstPart.acquire();
                tm = beginTransaction();
                assertNull(cache2.get("/a/b/c", "age"));// must be null as not yet committed
                cache2.put("/a/b/c", "age", 40);
-               lock.release();
 
-               TestingUtil.sleepThread(300);
-               lock.acquire();
+               threadTwoFirstPart.release();
+
+               threadOneSecondPart.acquire();
                assertEquals(40, cache2.get("/a/b/c", "age"));// must not be null
                tm.commit();
-               lock.release();
 
-               TestingUtil.sleepThread(1000);
                tm = beginTransaction();
                assertEquals("After cache2 commit", 40, cache2.get("/a/b/c", "age"));
                tm.commit();

Deleted: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java	2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -1,336 +0,0 @@
-package org.jboss.cache.util.internals;
-
-import org.jboss.cache.Cache;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.RPCManager;
-import org.jboss.cache.config.Configuration;
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.WriteCommand;
-import org.jboss.cache.commands.legacy.write.*;
-import org.jboss.cache.commands.write.*;
-import org.jboss.cache.commands.remote.ReplicateCommand;
-import org.jboss.cache.commands.tx.PrepareCommand;
-import org.jboss.cache.commands.tx.CommitCommand;
-import org.jboss.cache.factories.ComponentRegistry;
-import org.jboss.cache.io.ByteBuffer;
-import org.jboss.cache.marshall.AbstractMarshaller;
-import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
-import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.marshall.RegionalizedMethodCall;
-import org.jboss.cache.util.TestingUtil;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
-
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility class that notifies when certain commands were asynchronously replicated on secondary cache.
- * Especially useful for avaoiding Thread.sleep() statements.
- * <p/>
- * Usage:
- * <pre>
- *   Cache c1, c2; //these being two async caches
- *   AsyncReplicationListener listener2 = new AsyncReplicationListener(c2);
- *   listener2.expect(PutKeyValueCommand.class);
- *   c1.put(fqn, key, value);
- *   listener2.waitForReplicationToOccur(1000); // -this will block here untill c2 recieves the PutKeyValueCommand command
- * </pre>
- * Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the object returns to the
- * non-initialized state and *can* be reused through expect-wait cycle.
- * <b>Note</b>:  this class might be used aswell for sync caches, e.g. a test could have subclasses which use sync and
- * async replication
- *
- * @author Mircea.Markus at jboss.com
- * @since 2.2
- */
-public class ReplicationListener
-{
-   private CountDownLatch latch = new CountDownLatch(1);
-   private Set<Class<? extends ReplicableCommand>> expectedCommands;
-   private Configuration config;
-   private static Map <Class<? extends WriteCommand>, Class<? extends WriteCommand>> mvcc2PessMap =
-         new HashMap<Class<? extends WriteCommand>, Class<? extends WriteCommand>>();
-   static
-   {
-      mvcc2PessMap.put(ClearDataCommand.class, PessClearDataCommand.class);
-      mvcc2PessMap.put(MoveCommand.class, PessMoveCommand.class);
-      mvcc2PessMap.put(PutDataMapCommand.class, PessPutDataMapCommand.class);
-      mvcc2PessMap.put(PutForExternalReadCommand.class, PessPutForExternalReadCommand.class);
-      mvcc2PessMap.put(PutKeyValueCommand.class, PessPutKeyValueCommand.class);
-      mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
-      mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
-   }
-
-   /**
-    * Builds a listener that will observe the given cache for recieving replication commands.
-    */
-   public ReplicationListener(Cache cache)
-   {
-      ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
-      RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
-      CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
-      RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
-      RpcDispatcher.Marshaller2 delegate = null;
-      if ((realMarshaller instanceof RegionMarshallerDelegate) || (realMarshaller instanceof MarshallerDelegate))
-      {
-         throw new RuntimeException("Illegal state");
-      }
-      if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
-         delegate = new RegionMarshallerDelegate((Marshaller) realMarshaller);
-      else
-         delegate = new MarshallerDelegate(realMarshaller);
-      realDispatcher.setMarshaller(delegate);
-      realDispatcher.setRequestMarshaller(delegate);
-      realDispatcher.setResponseMarshaller(delegate);
-      this.config = cache.getConfiguration();
-   }
-
-   /**
-    * Based on cache's configuration, will know for what specific commands to expect to be replicated.
-    * E.g. async replication with a tx, would expect only a PrepareCommand (async is 1PC). sync repl with tx would expect
-    * a prepare and a commit (sync is 2pc).
-    * @param inTx do you expect replication to occur as result of a tx.commit?
-    */
-   public void smartExpect(Class<? extends WriteCommand> writeCommand, boolean inTx)
-   {
-      if (config.getCacheMode().equals(Configuration.CacheMode.INVALIDATION_ASYNC) || config.getCacheMode().equals(Configuration.CacheMode.INVALIDATION_SYNC))
-      {
-         expect(InvalidateCommand.class);
-         return;
-      }
-      if (inTx)
-      {
-         expect(PrepareCommand.class);
-         if (config.getCacheMode().isSynchronous())
-         {
-            expect(CommitCommand.class);
-         }
-         return;
-      }
-      if (config.getNodeLockingScheme().equals(Configuration.NodeLockingScheme.PESSIMISTIC))
-      {
-         expect(getPessCommand(writeCommand));
-      }
-   }
-
-   private Class<? extends ReplicableCommand> getPessCommand(Class<? extends WriteCommand> writeCommand)
-   {
-      Class<? extends ReplicableCommand> result = mvcc2PessMap.get(writeCommand);
-      if (result == null) throw new IllegalStateException("Unknown command: " + writeCommand);
-      return result;
-   }
-
-   private class MarshallerDelegate implements RpcDispatcher.Marshaller2
-   {
-      RpcDispatcher.Marshaller2 marshaller;
-
-      private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller)
-      {
-         this.marshaller = marshaller;
-      }
-
-      public byte[] objectToByteBuffer(Object obj) throws Exception
-      {
-         return marshaller.objectToByteBuffer(obj);
-      }
-
-      public Object objectFromByteBuffer(byte bytes[]) throws Exception
-      {
-         Object result = marshaller.objectFromByteBuffer(bytes);
-         if (result instanceof ReplicateCommand && expectedCommands != null)
-         {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result;
-            return new ReplicateCommandDelegate(replicateCommand);
-         }
-         return result;
-      }
-
-      public Buffer objectToBuffer(Object o) throws Exception
-      {
-         return marshaller.objectToBuffer(o);
-      }
-
-      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
-      {
-         Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
-         if (result instanceof ReplicateCommand && expectedCommands != null)
-         {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result;
-            return new ReplicateCommandDelegate(replicateCommand);
-         }
-         return result;
-      }
-   }
-
-   /**
-    * We want the notification to be performed only *after* the remote command is executed.
-    */
-   private class ReplicateCommandDelegate extends ReplicateCommand
-   {
-      ReplicateCommand realOne;
-
-      private ReplicateCommandDelegate(ReplicateCommand realOne)
-      {
-         this.realOne = realOne;
-      }
-
-      @Override
-      public Object perform(InvocationContext ctx) throws Throwable
-      {
-         try
-         {
-            return realOne.perform(ctx);
-         }
-         finally
-         {
-            System.out.println("Processed command: " + realOne);
-            Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
-            while (it.hasNext())
-            {
-               Class<? extends ReplicableCommand> replicableCommandClass = it.next();
-               if (realOne.containsCommandType(replicableCommandClass))
-               {
-                  it.remove();
-               }
-               else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
-               {
-                  PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
-                  if (prepareCommand.containsModificationType(replicableCommandClass))
-                  {
-                     it.remove();
-                  }
-               }
-            }
-            if (expectedCommands.isEmpty())
-            {
-               latch.countDown();
-            }
-         }
-      }
-   }
-
-   /**
-    * Needed for region based marshalling.
-    */
-   private class RegionMarshallerDelegate extends AbstractMarshaller
-   {
-      private Marshaller realOne;
-
-      private RegionMarshallerDelegate(Marshaller realOne)
-      {
-         this.realOne = realOne;
-      }
-
-      public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
-      {
-         realOne.objectToObjectStream(obj, out);
-      }
-
-      public Object objectFromObjectStream(ObjectInputStream in) throws Exception
-      {
-         return realOne.objectFromObjectStream(in);
-      }
-
-      public Object objectFromStream(InputStream is) throws Exception
-      {
-         return realOne.objectFromStream(is);
-      }
-
-      public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
-      {
-         realOne.objectToObjectStream(obj, out, region);
-      }
-
-      public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
-      {
-         RegionalizedMethodCall result = realOne.regionalizedMethodCallFromByteBuffer(buffer);
-         if (result.command instanceof ReplicateCommand && expectedCommands != null)
-         {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
-            result.command = new ReplicateCommandDelegate(replicateCommand);
-         }
-         return result;
-      }
-
-      public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
-      {
-         return realOne.regionalizedMethodCallFromObjectStream(in);
-      }
-
-      public ByteBuffer objectToBuffer(Object o) throws Exception
-      {
-         return realOne.objectToBuffer(o);
-      }
-
-      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
-      {
-         return realOne.objectFromByteBuffer(bytes, i, i1);
-      }
-   }
-
-   /**
-    * Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
-    * if replication does not occur in the give timeout then an exception is being thrown.
-    */
-   public void waitForReplicationToOccur(long timeoutMillis)
-   {
-      System.out.println("enter... ReplicationListener.waitForReplicationToOccur");
-      waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
-      System.out.println("exit... ReplicationListener.waitForReplicationToOccur");
-   }
-
-   /**
-    * Similar to {@link #waitForReplicationToOccur(long)} except that this method provides more flexibility in time units.
-    *
-    * @param timeout  the maximum time to wait
-    * @param timeUnit the time unit of the <tt>timeout</tt> argument.
-    */
-   public void waitForReplicationToOccur(long timeout, TimeUnit timeUnit)
-   {
-      assert expectedCommands != null : "there are no replication expectations; please use AsyncReplicationListener.expect(...) before calling this method";
-      try
-      {
-         if (!latch.await(timeout, timeUnit))
-         {
-            assert false : "waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
-         }
-      }
-      catch (InterruptedException e)
-      {
-         throw new IllegalStateException("unexpected", e);
-      }
-      finally
-      {
-         expectedCommands = null;
-         latch = new CountDownLatch(1);
-      }
-   }
-
-   /**
-    * {@link #waitForReplicationToOccur(long)} will block untill all the commands specified here are being replicated
-    * to this cache. The method can be called several times with various arguments.
-    */
-   public void expect(Class<? extends ReplicableCommand>... expectedCommands)
-   {
-      if (this.expectedCommands == null)
-      {
-         this.expectedCommands = new HashSet<Class<? extends ReplicableCommand>>();
-      }
-      this.expectedCommands.addAll(Arrays.asList(expectedCommands));
-   }
-
-   /**
-    * Waits untill first command is replicated.
-    */
-   public void expectAny()
-   {
-      expect();
-   }
-}

Added: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/InvalidationReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/InvalidationReplicationListener.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/InvalidationReplicationListener.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,56 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.legacy.write.VersionedInvalidateCommand;
+import org.jboss.cache.commands.write.InvalidateCommand;
+import org.jboss.cache.commands.write.PutForExternalReadCommand;
+import org.jboss.cache.Cache;
+import org.jboss.cache.config.Configuration;
+
+/**
+ * Specialization of ReplicationListener for caches that use invalidation.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class InvalidationReplicationListener extends ReplicationListener
+{
+
+   Class<? extends InvalidateCommand> base;
+
+   public InvalidationReplicationListener(Cache cache)
+   {
+      super(cache);
+      if (cache.getConfiguration().getNodeLockingScheme().equals(Configuration.NodeLockingScheme.OPTIMISTIC))
+      {
+         base = VersionedInvalidateCommand.class;
+      } else
+      {
+         base = InvalidateCommand.class;
+      }
+   }
+
+   public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+   {
+      expectInvalidations(expectedCommands);
+   }
+
+   public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands)
+   {
+      expectInvalidations(writeCommands);
+   }
+
+   private void expectInvalidations(Class<? extends ReplicableCommand>... commands)
+   {
+      for (Class<? extends ReplicableCommand> command : commands)
+      {
+         if (command.equals(PutForExternalReadCommand.class))
+         {
+            internalExpect();//so that the map won't be empty
+         }
+         else 
+         {
+            internalExpect(base);
+         }
+      }
+   }
+}

Added: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/MvccReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/MvccReplicationListener.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/MvccReplicationListener.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,34 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+
+/**
+ * Specialization of ReplicationListener for mvcc caches.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class MvccReplicationListener extends ReplicationListener
+{
+   public MvccReplicationListener(Cache cache)
+   {
+      super(cache);
+   }
+
+   /**
+    * all the commands should be received at the other end.
+    */
+   public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+   {
+      internalExpect(expectedCommands);
+   }
+
+   public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands)
+   {
+      internalExpect(PrepareCommand.class);
+      //this is because for async replication we have an 1pc transaction
+      if (config.getCacheMode().isSynchronous()) internalExpect(CommitCommand.class);
+   }
+}

Added: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,80 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Specialization of ReplicationListener for optimistic caches.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class OptimisticReplicationListener extends ReplicationListener
+{
+   public OptimisticReplicationListener(Cache cache)
+   {
+      super(cache);
+   }
+
+   /**
+    * For each command, expect an OptimisticPrepare and an commit.
+    */
+   public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+   {
+      //in the case of optimistic replication, an prepare and an commit is expected foe each node 
+      for (Class<? extends ReplicableCommand> command : expectedCommands)
+      {
+         internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
+      }
+   }
+
+   /**
+    * For all given commands expect a single prepare, and a single commit.
+    */
+   public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands)
+   {
+      internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
+   }
+
+   protected void postCommandExecution(ReplicateCommand realOne)
+   {
+      int initialExpectations = expectedCommands.size();
+      List<ReplicableCommand> mods = getAllModifications(realOne);
+      for (Iterator<Class<? extends ReplicableCommand>> typeIt = expectedCommands.iterator(); typeIt.hasNext();)
+      {
+         Class<? extends ReplicableCommand> commadType = typeIt.next();
+         Iterator<ReplicableCommand> instanceIt = mods.iterator();
+         while (instanceIt.hasNext())
+         {
+            ReplicableCommand replicableCommand = instanceIt.next();
+            if (replicableCommand.getClass().equals(commadType))
+            {
+               instanceIt.remove();
+               typeIt.remove();
+            }
+         }
+      }
+      int remainingExpectations = expectedCommands.size();
+      System.out.println("Processed command: " + realOne + ". " + (initialExpectations - remainingExpectations) + " identified, remaining " + remainingExpectations);
+   }
+
+   private List<ReplicableCommand> getAllModifications(ReplicateCommand realOne)
+   {
+      List<ReplicableCommand> result = new ArrayList<ReplicableCommand>();
+      if (realOne.isSingleCommand())
+      {
+         result.add(realOne.getSingleModification());
+      }
+      else
+      {
+         result.addAll(realOne.getModifications());
+      }
+      return result;
+   }
+}

Added: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,71 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.legacy.write.*;
+import org.jboss.cache.commands.write.*;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Specialization of ReplicationListener for optimistic caches.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class PessReplicationListener extends ReplicationListener
+{
+   private static Map<Class<? extends ReplicableCommand>, Class<? extends ReplicableCommand>> mvcc2PessMap =
+         new HashMap<Class<? extends ReplicableCommand>, Class<? extends ReplicableCommand>>();
+
+   static
+   {
+      mvcc2PessMap.put(ClearDataCommand.class, PessClearDataCommand.class);
+      mvcc2PessMap.put(MoveCommand.class, PessMoveCommand.class);
+      mvcc2PessMap.put(PutDataMapCommand.class, PessPutDataMapCommand.class);
+      mvcc2PessMap.put(PutForExternalReadCommand.class, PessPutForExternalReadCommand.class);
+      mvcc2PessMap.put(PutKeyValueCommand.class, PessPutKeyValueCommand.class);
+      mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
+      mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
+   }
+
+
+   public PessReplicationListener(Cache cache)
+   {
+      super(cache);
+   }
+
+   /**
+    * In this scenario, all the commands shold be replaced wiht their pessimistic cunterparts.
+    */
+   public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+   {
+      for (Class<? extends ReplicableCommand> command: expectedCommands)
+      {
+         super.internalExpect(getPessCommand(command));
+      }
+   }
+
+   public void expectWithTx(Class<? extends ReplicableCommand>... commands)
+   {
+      for (Class<? extends ReplicableCommand> command : commands)
+      {
+         if (command.equals(PessPutForExternalReadCommand.class))
+         {
+            throw new IllegalArgumentException("PFER are not part of tx, use no-tx .expect()");
+         }
+      }
+      internalExpect(PrepareCommand.class);
+      if (config.getCacheMode().isSynchronous())
+      {
+         internalExpect(CommitCommand.class);
+      }
+   }
+
+   private Class<? extends ReplicableCommand> getPessCommand(Class<? extends ReplicableCommand> command)
+   {
+      return mvcc2PessMap.get((Class<? extends ReplicableCommand>) command);
+   }
+}

Copied: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java (from rev 7147, core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java)
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java	2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,336 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.*;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.io.ByteBuffer;
+import org.jboss.cache.marshall.AbstractMarshaller;
+import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
+import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
+import org.jboss.cache.marshall.Marshaller;
+import org.jboss.cache.marshall.RegionalizedMethodCall;
+import org.jboss.cache.util.TestingUtil;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class that notifies when certain commands were asynchronously replicated on secondary cache.
+ * Especially useful for avaoiding Thread.sleep() statements.
+ * <p/>
+ * Usage:
+ * <pre>
+ *   no tx:
+ *       Cache c1, c2; //these being two async caches
+ *       ReplicationListener listener2 = ReplicationListener.getReplicationListener(c2);
+ *       listener2.expect(PutKeyValueCommand.class);
+ *       c1.put(fqn, key, value);
+ *       listener2.waitForReplicationToOccur(1000); // -this will block here untill c2 recieves the PutKeyValueCommand command
+ *   with tx: (difference is that expectWithTx is used insted of expect
+ *       Cache c1, c2; //these being two async caches
+ *       ReplicationListener listener2 = ReplicationListener.getReplicationListener(c2);
+ *       listener2.expectWithTx(PutKeyValueCommand.class);
+ *       txManager.begin();
+ *       c1.put(fqn, key, value);
+ *       txManager.commit();
+ *       listener2.waitForReplicationToOccur(1000); // -this will block here untill c2 recieves the PutKeyValueCommand command
+ * </pre>
+ *
+ *
+ * Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the object returns to the
+ * non-initialized state and *can* be reused through expect-wait cycle.
+ * <b>Note</b>:  this class might be used aswell for sync caches, e.g. a test could have subclasses which use sync and
+ * async replication
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+abstract public class ReplicationListener
+{
+
+   public static final long DEFAULT_TIMEOUT = 10000;
+   private CountDownLatch latch = new CountDownLatch(1);
+   protected List<Class<? extends ReplicableCommand>> expectedCommands;
+   protected Configuration config;
+
+   /**
+    * Builds a listener that will observe the given cache for recieving replication commands.
+    */
+   protected ReplicationListener(Cache cache)
+   {
+      ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
+      RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
+      CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
+      RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
+      RpcDispatcher.Marshaller2 delegate = null;
+      if ((realMarshaller instanceof RegionMarshallerDelegate) || (realMarshaller instanceof MarshallerDelegate))
+      {
+         throw new RuntimeException("Illegal state");
+      }
+      if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
+         delegate = new RegionMarshallerDelegate((Marshaller) realMarshaller);
+      else
+         delegate = new MarshallerDelegate(realMarshaller);
+      realDispatcher.setMarshaller(delegate);
+      realDispatcher.setRequestMarshaller(delegate);
+      realDispatcher.setResponseMarshaller(delegate);
+      this.config = cache.getConfiguration();
+   }
+
+   protected ReplicationListener()
+   {
+   }
+
+   abstract public void expect(Class<? extends ReplicableCommand>... expectedCommands);
+
+   /**
+    * Based on cache's configuration, will know for what specific commands to expect to be replicated.
+    * E.g. async replication with a tx, would expect only a PrepareCommand (async is 1PC). sync repl with tx would expect
+    * a prepare and a commit (sync is 2pc).
+    */
+   abstract public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands);
+
+   /**
+    * Factory method, to be used in order to obtain a replication listener based on a cache config.
+    */
+   public static ReplicationListener getReplicationListener(Cache cache)
+   {
+      if (cache.getConfiguration().getCacheMode().isInvalidation())
+      {
+         return new InvalidationReplicationListener(cache);
+      }
+      if (cache.getConfiguration().getNodeLockingScheme().equals(Configuration.NodeLockingScheme.OPTIMISTIC))
+      {
+         return new OptimisticReplicationListener(cache);
+      }
+      else if (cache.getConfiguration().getNodeLockingScheme().equals(Configuration.NodeLockingScheme.PESSIMISTIC))
+      {
+         return new PessReplicationListener(cache);
+      } else
+      {
+         return new MvccReplicationListener(cache);
+      }
+   }
+
+   private class MarshallerDelegate implements RpcDispatcher.Marshaller2
+   {
+      RpcDispatcher.Marshaller2 marshaller;
+
+      private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller)
+      {
+         this.marshaller = marshaller;
+      }
+
+      public byte[] objectToByteBuffer(Object obj) throws Exception
+      {
+         return marshaller.objectToByteBuffer(obj);
+      }
+
+      public Object objectFromByteBuffer(byte bytes[]) throws Exception
+      {
+         Object result = marshaller.objectFromByteBuffer(bytes);
+         if (result instanceof ReplicateCommand && expectedCommands != null)
+         {
+            ReplicateCommand replicateCommand = (ReplicateCommand) result;
+            return new ReplicateCommandDelegate(replicateCommand);
+         }
+         return result;
+      }
+
+      public Buffer objectToBuffer(Object o) throws Exception
+      {
+         return marshaller.objectToBuffer(o);
+      }
+
+      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+      {
+         Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
+         if (result instanceof ReplicateCommand && expectedCommands != null)
+         {
+            ReplicateCommand replicateCommand = (ReplicateCommand) result;
+            return new ReplicateCommandDelegate(replicateCommand);
+         }
+         return result;
+      }
+   }
+
+   /**
+    * We want the notification to be performed only *after* the remote command is executed.
+    */
+   private class ReplicateCommandDelegate extends ReplicateCommand
+   {
+      ReplicateCommand realOne;
+
+      private ReplicateCommandDelegate(ReplicateCommand realOne)
+      {
+         this.realOne = realOne;
+      }
+
+      @Override
+      public Object perform(InvocationContext ctx) throws Throwable
+      {
+         try
+         {
+            return realOne.perform(ctx);
+         }
+         finally
+         {
+            postCommandExecution(realOne);
+            if (expectedCommands.isEmpty())
+            {
+               latch.countDown();
+            }
+         }
+      }
+   }
+
+
+   protected void postCommandExecution(ReplicateCommand realOne)
+   {
+      System.out.println("Processed command: " + realOne + "; expecting - " + expectedCommands.size() + " commands");
+      Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
+      while (it.hasNext())
+      {
+         Class<? extends ReplicableCommand> replicableCommandClass = it.next();
+         if (realOne.containsCommandType(replicableCommandClass))
+         {
+            it.remove();
+         } else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
+         {
+            PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
+            if (prepareCommand.containsModificationType(replicableCommandClass))
+            {
+               it.remove();
+            }
+         }
+      }
+      System.out.println("After process expecing : " + expectedCommands.size() + " commands");
+   }
+
+   /**
+    * Needed for region based marshalling.
+    */
+   private class RegionMarshallerDelegate extends AbstractMarshaller
+   {
+      private Marshaller realOne;
+
+      private RegionMarshallerDelegate(Marshaller realOne)
+      {
+         this.realOne = realOne;
+      }
+
+      public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
+      {
+         realOne.objectToObjectStream(obj, out);
+      }
+
+      public Object objectFromObjectStream(ObjectInputStream in) throws Exception
+      {
+         return realOne.objectFromObjectStream(in);
+      }
+
+      public Object objectFromStream(InputStream is) throws Exception
+      {
+         return realOne.objectFromStream(is);
+      }
+
+      public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
+      {
+         realOne.objectToObjectStream(obj, out, region);
+      }
+
+      public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
+      {
+         RegionalizedMethodCall result = realOne.regionalizedMethodCallFromByteBuffer(buffer);
+         if (result.command instanceof ReplicateCommand && expectedCommands != null)
+         {
+            ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
+            result.command = new ReplicateCommandDelegate(replicateCommand);
+         }
+         return result;
+      }
+
+      public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
+      {
+         return realOne.regionalizedMethodCallFromObjectStream(in);
+      }
+
+      public ByteBuffer objectToBuffer(Object o) throws Exception
+      {
+         return realOne.objectToBuffer(o);
+      }
+
+      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+      {
+         return realOne.objectFromByteBuffer(bytes, i, i1);
+      }
+   }
+
+   /**
+    * Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
+    * if replication does not occur in the give timeout then an exception is being thrown.
+    */
+   public void waitForReplicationToOccur(long timeoutMillis)
+   {
+      System.out.println("enter... ReplicationListener.waitForReplicationToOccur");
+      waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
+      System.out.println("exit... ReplicationListener.waitForReplicationToOccur");
+   }
+
+   /**
+    * same as {@link #waitForReplicationToOccur(long)}, just that it uses the {@link #DEFAULT_TIMEOUT} for timeout.
+    */
+   public void waitForReplicationToOccur()
+   {
+      waitForReplicationToOccur(DEFAULT_TIMEOUT);
+   }
+
+   /**
+    * Similar to {@link #waitForReplicationToOccur(long)} except that this method provides more flexibility in time units.
+    *
+    * @param timeout  the maximum time to wait
+    * @param timeUnit the time unit of the <tt>timeout</tt> argument.
+    */
+   public void waitForReplicationToOccur(long timeout, TimeUnit timeUnit)
+   {
+      assert expectedCommands != null : "there are no replication expectations; please use AsyncReplicationListener.expectWithTx(...) before calling this method";
+      try
+      {
+         if (!expectedCommands.isEmpty() && !latch.await(timeout, timeUnit))
+         {
+            assert false : "waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
+         }
+      }
+      catch (InterruptedException e)
+      {
+         throw new IllegalStateException("unexpected", e);
+      }
+      finally
+      {
+         expectedCommands = null;
+         latch = new CountDownLatch(1);
+      }
+   }
+
+   /**
+    * {@link #waitForReplicationToOccur(long)} will block untill all the commands specified here are being replicated
+    * to this cache. The method can be called several times with various arguments.
+    */
+   protected void internalExpect(Class<? extends ReplicableCommand>... expectedCommands)
+   {
+      if (this.expectedCommands == null)
+      {
+         this.expectedCommands = new ArrayList<Class<? extends ReplicableCommand>>();
+      }
+      this.expectedCommands.addAll(Arrays.asList(expectedCommands));
+   }
+}




More information about the jbosscache-commits mailing list