[jbosscache-commits] JBoss Cache SVN: r7163 - in core/trunk/src/test/java/org/jboss/cache: buddyreplication and 10 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Wed Nov 19 06:57:48 EST 2008
Author: mircea.markus
Date: 2008-11-19 06:57:48 -0500 (Wed, 19 Nov 2008)
New Revision: 7163
Added:
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/InvalidationReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/MvccReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
Removed:
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Modified:
core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
core/trunk/src/test/java/org/jboss/cache/eviction/ExpirationPolicyTest.java
core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationOptLocksTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationPessLocksTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplOptLocksTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplPessLocksTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationOptLocksTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationPessLocksTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplOptLocksTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplPessLocksTest.java
core/trunk/src/test/java/org/jboss/cache/replicated/AsyncReplTest.java
core/trunk/src/test/java/org/jboss/cache/replicated/PessimisticSyncReplTxTest.java
Log:
enhnaced tests
Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -2,7 +2,6 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
-import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.lock.NodeLock;
@@ -38,42 +37,40 @@
*/
public void testNoOpWhenLockedAnd0msTimeout() throws Exception
{
- PutForExternalReadTestBaseTL threadCfg = threadLocal.get();
// create the parent node first ...
- threadCfg.replListener2 = new ReplicationListener(threadCfg.cache2);
- threadCfg.replListener2.smartExpect(PutKeyValueCommand.class, false);
- threadCfg.cache1.put(parentFqn, key, value);
- threadCfg.replListener2.waitForReplicationToOccur(10000);
+ replListener2.expect(PutKeyValueCommand.class);
+ cache1.put(parentFqn, key, value);
+ replListener2.waitForReplicationToOccur(10000);
- threadCfg.replListener2.smartExpect(PutKeyValueCommand.class, true);
- threadCfg.tm1.begin();
- threadCfg.cache1.put(parentFqn, key, value2);
+ replListener2.expectWithTx(PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.put(parentFqn, key, value2);
- Transaction t = threadCfg.tm1.suspend();
+ Transaction t = tm1.suspend();
- assertLocked(parentFqn, threadCfg.cache1, true);
+ assertLocked(parentFqn, cache1, true);
// parentFqn should be write-locked.
long startTime = System.currentTimeMillis();
- threadCfg.cache1.putForExternalRead(fqn, key, value);
+ cache1.putForExternalRead(fqn, key, value);
long waited = System.currentTimeMillis() - startTime;
// crappy way to test that pFER does not block, but it is effective.
- assertTrue("Should not wait " + waited + " millis for lock timeout, should attempt to acquire lock with 0ms!", waited < threadCfg.cache1.getConfiguration().getLockAcquisitionTimeout());
+ assertTrue("Should not wait " + waited + " millis for lock timeout, should attempt to acquire lock with 0ms!", waited < cache1.getConfiguration().getLockAcquisitionTimeout());
// should not block.
- threadCfg.tm1.resume(t);
- threadCfg.tm1.commit();
+ tm1.resume(t);
+ tm1.commit();
- threadCfg.replListener2.waitForReplicationToOccur(1000);
+ replListener2.waitForReplicationToOccur(1000);
- assertEquals("Parent node write should have succeeded", value2, threadCfg.cache1.get(parentFqn, key));
+ assertEquals("Parent node write should have succeeded", value2, cache1.get(parentFqn, key));
if (isUsingInvalidation())
- assertNull("Parent node write should have invalidated", threadCfg.cache2.get(parentFqn, key));
+ assertNull("Parent node write should have invalidated", cache2.get(parentFqn, key));
else
- assertEquals("Parent node write should have replicated", value2, threadCfg.cache2.get(parentFqn, key));
+ assertEquals("Parent node write should have replicated", value2, cache2.get(parentFqn, key));
- assertNull("PFER should have been a no-op", threadCfg.cache1.get(fqn, key));
- assertNull("PFER should have been a no-op", threadCfg.cache2.get(fqn, key));
+ assertNull("PFER should have been a no-op", cache1.get(fqn, key));
+ assertNull("PFER should have been a no-op", cache2.get(fqn, key));
}
}
\ No newline at end of file
Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -2,12 +2,11 @@
import org.easymock.EasyMock;
import static org.easymock.EasyMock.*;
-import org.jboss.cache.Cache;
-import org.jboss.cache.CacheFactory;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.RPCManager;
+import org.jboss.cache.*;
import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.write.PutForExternalReadCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.factories.ComponentRegistry;
@@ -16,7 +15,7 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionContext;
import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import org.jgroups.Address;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
@@ -28,100 +27,92 @@
import javax.transaction.TransactionManager;
import java.util.List;
import java.util.Vector;
-import org.jboss.cache.UnitTestCacheFactory;
@Test(groups = {"functional", "jgroups", "transaction"})
public abstract class PutForExternalReadTestBase
{
- // TODO: Refactor this test so it isn't bound to using Thread.sleep() calls.
-
protected Configuration.CacheMode cacheMode;
protected NodeLockingScheme nodeLockingScheme;
protected final Fqn fqn = Fqn.fromString("/one/two");
protected final Fqn parentFqn = fqn.getParent();
protected final String key = "k", value = "v", value2 = "v2";
-
- protected class PutForExternalReadTestBaseTL {
- protected CacheSPI<String, String> cache1, cache2;
- ReplicationListener replListener1;
- ReplicationListener replListener2;
+ protected CacheSPI<String, String> cache1, cache2;
- protected TransactionManager tm1, tm2;
+ ReplicationListener replListener1;
+ ReplicationListener replListener2;
- protected boolean useTx;
- }
-
-
- ThreadLocal<PutForExternalReadTestBaseTL> threadLocal = new ThreadLocal<PutForExternalReadTestBaseTL>();
+ protected TransactionManager tm1, tm2;
+ protected boolean useTx;
+
+
@BeforeMethod(alwaysRun = true)
public void setUp()
{
- PutForExternalReadTestBaseTL tl = new PutForExternalReadTestBaseTL();
- threadLocal.set(tl);
-
+
CacheFactory<String, String> cf = new UnitTestCacheFactory<String, String>();
- tl.cache1 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
- tl.cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
- tl.cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very important for async tests!
- tl.cache1.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
+ cache1 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
+ cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+ cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very important for async tests!
+ cache1.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
- tl.cache1.start();
- tl.tm1 = tl.cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+ cache1.start();
+ tm1 = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
- tl.cache2 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
- tl.cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
- tl.cache2.getConfiguration().setSerializationExecutorPoolSize(0); //this is very important for async tests!
- tl.cache2.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
+ cache2 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
+ cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+ cache2.getConfiguration().setSerializationExecutorPoolSize(0); //this is very important for async tests!
+ cache2.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
- tl.cache2.start();
- tl.tm2 = tl.cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+ cache2.start();
+ tm2 = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+ replListener1 = ReplicationListener.getReplicationListener(cache1);
+ replListener2 = ReplicationListener.getReplicationListener(cache2);
- TestingUtil.blockUntilViewsReceived(10000, tl.cache1, tl.cache2);
+ TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
}
@AfterMethod(alwaysRun = true)
public void tearDown()
{
- PutForExternalReadTestBaseTL tl = threadLocal.get();
- if (tl != null) {
- TestingUtil.killCaches(tl.cache1, tl.cache2);
- threadLocal.set(null);
- }
+ TestingUtil.killCaches(cache1, cache2);
}
public void testNoOpWhenNodePresent()
{
- PutForExternalReadTestBaseTL tl = threadLocal.get();
- tl.cache1.putForExternalRead(fqn, key, value);
- asyncWait();
+ replListener2.expect(PutForExternalReadCommand.class);
+ cache1.putForExternalRead(fqn, key, value);
+ replListener2.waitForReplicationToOccur();
- assertEquals("PFER should have succeeded", value, tl.cache1.get(fqn, key));
+
+ assertEquals("PFER should have succeeded", value, cache1.get(fqn, key));
if (isUsingInvalidation())
- assertNull("PFER should not have effected cache2", tl.cache2.get(fqn, key));
+ assertNull("PFER should not have effected cache2", cache2.get(fqn, key));
else
- assertEquals("PFER should have replicated", value, tl.cache2.get(fqn, key));
+ assertEquals("PFER should have replicated", value, cache2.get(fqn, key));
// reset
- tl.cache1.removeNode(fqn);
- asyncWait();
+ replListener2.expect(RemoveNodeCommand.class);
+ cache1.removeNode(fqn);
+ replListener2.waitForReplicationToOccur();
- assertFalse("Should have reset", tl.cache1.getRoot().hasChild(fqn));
- assertFalse("Should have reset", tl.cache2.getRoot().hasChild(fqn));
+ assertFalse("Should have reset", cache1.getRoot().hasChild(fqn));
+ assertFalse("Should have reset", cache2.getRoot().hasChild(fqn));
- tl.cache1.put(fqn, key, value);
- asyncWait();
+ replListener2.expect(PutKeyValueCommand.class);
+ cache1.put(fqn, key, value);
+ replListener2.waitForReplicationToOccur();
// now this pfer should be a no-op
- tl.cache1.putForExternalRead(fqn, key, value2);
+ cache1.putForExternalRead(fqn, key, value2);
- assertEquals("PFER should have been a no-op", value, tl.cache1.get(fqn, key));
+ assertEquals("PFER should have been a no-op", value, cache1.get(fqn, key));
if (isUsingInvalidation())
- assertNull("PFER should have been a no-op", tl.cache2.get(fqn, key));
+ assertNull("PFER should have been a no-op", cache2.get(fqn, key));
else
- assertEquals("PFER should have been a no-op", value, tl.cache2.get(fqn, key));
+ assertEquals("PFER should have been a no-op", value, cache2.get(fqn, key));
}
private Vector<Address> anyAddresses()
@@ -132,72 +123,72 @@
public void testAsyncForce() throws Exception
{
- PutForExternalReadTestBaseTL tl = threadLocal.get();
RPCManager rpcManager = EasyMock.createNiceMock(RPCManager.class);
- RPCManager originalRpcManager = tl.cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+ RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
List<Address> memberList = originalRpcManager.getMembers();
expect(rpcManager.getMembers()).andReturn(memberList).anyTimes();
// inject a mock RPC manager so that we can test whether calls made are sync or async.
- ComponentRegistry cr = TestingUtil.extractComponentRegistry(tl.cache1);
+ ComponentRegistry cr = TestingUtil.extractComponentRegistry(cache1);
cr.registerComponent(rpcManager, RPCManager.class);
cr.rewire();
// invalidations will not trigger any rpc calls for PFER
if (!isUsingInvalidation())
{
- // specify what we expect called on the mock Rpc Manager. For params we don't care about, just use ANYTHING.
- // setting the mock object to expect the "sync" param to be false.
+ // specify what we expectWithTx called on the mock Rpc Manager. For params we don't care about, just use ANYTHING.
+ // setting the mock object to expectWithTx the "sync" param to be false.
expect(rpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), eq(false), anyLong(), anyBoolean())).andReturn(null);
}
replay(rpcManager);
// now try a simple replication. Since the RPCManager is a mock object it will not actually replicate anything.
- tl.cache1.putForExternalRead(fqn, key, value);
+ cache1.putForExternalRead(fqn, key, value);
verify(rpcManager);
// cleanup
- TestingUtil.extractComponentRegistry(tl.cache1).registerComponent(originalRpcManager, RPCManager.class);
- tl.cache1.removeNode(fqn);
+ TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager, RPCManager.class);
+ cache1.removeNode(fqn);
}
public void testTxSuspension() throws Exception
{
- PutForExternalReadTestBaseTL tl = threadLocal.get();
// create parent node first
- tl.cache1.put(parentFqn, key, value);
+ replListener2.expect(PutKeyValueCommand.class);
+ cache1.put(parentFqn, key, value);
+ replListener2.waitForReplicationToOccur();
// start a tx and do some stuff.
- tl.tm1.begin();
- tl.cache1.get(parentFqn, key);
- tl.cache1.putForExternalRead(fqn, key, value); // should have happened in a separate tx and have committed already.
- Transaction t = tl.tm1.suspend();
+ replListener2.expect(PutForExternalReadCommand.class);
+ tm1.begin();
+ cache1.get(parentFqn, key);
+ cache1.putForExternalRead(fqn, key, value); // should have happened in a separate tx and have committed already.
+ Transaction t = tm1.suspend();
- asyncWait();
+ replListener2.waitForReplicationToOccur();
- assertLocked(parentFqn, tl.cache1, false);
+ assertLocked(parentFqn, cache1, false);
- assertEquals("PFER should have completed", value, tl.cache1.get(fqn, key));
+ assertEquals("PFER should have completed", value, cache1.get(fqn, key));
if (isUsingInvalidation())
- assertNull("PFER should not have effected cache2", tl.cache2.get(fqn, key));
+ assertNull("PFER should not have effected cache2", cache2.get(fqn, key));
else
- assertEquals("PFER should have completed", value, tl.cache2.get(fqn, key));
+ assertEquals("PFER should have completed", value, cache2.get(fqn, key));
- tl.tm1.resume(t);
- tl.tm1.commit();
+ tm1.resume(t);
+ tm1.commit();
- assertEquals("parent fqn tx should have completed", value, tl.cache1.get(parentFqn, key));
+ assertEquals("parent fqn tx should have completed", value, cache1.get(parentFqn, key));
if (isUsingInvalidation())
- assertNull("parent fqn tx should have invalidated cache2", tl.cache2.get(parentFqn, key));
+ assertNull("parent fqn tx should have invalidated cache2", cache2.get(parentFqn, key));
else
- assertEquals("parent fqn tx should have completed", value, tl.cache2.get(parentFqn, key));
+ assertEquals("parent fqn tx should have completed", value, cache2.get(parentFqn, key));
}
public void testExceptionSuppression() throws Exception
{
- PutForExternalReadTestBaseTL tl = threadLocal.get();
RPCManager barfingRpcManager = EasyMock.createNiceMock(RPCManager.class);
- RPCManager originalRpcManager = tl.cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+ RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
try
{
List<Address> memberList = originalRpcManager.getMembers();
@@ -206,13 +197,13 @@
expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), anyBoolean(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
replay(barfingRpcManager);
- TestingUtil.extractComponentRegistry(tl.cache1).registerComponent(barfingRpcManager, RPCManager.class);
- tl.cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
- TestingUtil.extractComponentRegistry(tl.cache1).rewire();
+ TestingUtil.extractComponentRegistry(cache1).registerComponent(barfingRpcManager, RPCManager.class);
+ cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
+ TestingUtil.extractComponentRegistry(cache1).rewire();
try
{
- tl.cache1.put(fqn, key, value);
+ cache1.put(fqn, key, value);
if (!isOptimistic()) fail("Should have barfed");
}
catch (RuntimeException re)
@@ -222,14 +213,13 @@
if (isOptimistic() && !isUsingInvalidation())
{
// proves that the put did, in fact, barf. Doesn't work for invalidations since the inability to invalidate will not cause a rollback.
- assertNull(tl.cache1.get(fqn, key));
- }
- else
+ assertNull(cache1.get(fqn, key));
+ } else
{
// clean up any indeterminate state left over
try
{
- tl.cache1.removeNode(fqn);
+ cache1.removeNode(fqn);
// as above, the inability to invalidate will not cause an exception
if (!isUsingInvalidation()) fail("Should have barfed");
}
@@ -238,36 +228,35 @@
}
}
- assertNull("Should have cleaned up", tl.cache1.get(fqn, key));
+ assertNull("Should have cleaned up", cache1.get(fqn, key));
// should not barf
- tl.cache1.putForExternalRead(fqn, key, value);
+ cache1.putForExternalRead(fqn, key, value);
}
finally
{
- TestingUtil.extractComponentRegistry(tl.cache1).registerComponent(originalRpcManager, RPCManager.class);
+ TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager, RPCManager.class);
}
}
public void testBasicPropagation() throws Exception
{
- PutForExternalReadTestBaseTL tl = threadLocal.get();
- assert !tl.cache1.exists(fqn);
- assert !tl.cache2.exists(fqn);
+ assert !cache1.exists(fqn);
+ assert !cache2.exists(fqn);
- tl.cache1.putForExternalRead(fqn, key, value);
+ replListener2.expect(PutForExternalReadCommand.class);
+ cache1.putForExternalRead(fqn, key, value);
+ replListener2.waitForReplicationToOccur();
- asyncWait();
-
- assertEquals("PFER updated cache1", value, tl.cache1.get(fqn, key));
+ assertEquals("PFER updated cache1", value, cache1.get(fqn, key));
Object expected = isUsingInvalidation() ? null : value;
- assertEquals("PFER propagated to cache2 as expected", expected, tl.cache2.get(fqn, key));
+ assertEquals("PFER propagated to cache2 as expected", expected, cache2.get(fqn, key));
// replication to cache 1 should NOT happen.
- tl.cache2.putForExternalRead(fqn, key, value);
+ cache2.putForExternalRead(fqn, key, value);
- assertEquals("PFER updated cache2", value, tl.cache2.get(fqn, key));
- assertEquals("Cache1 should be unaffected", value, tl.cache1.get(fqn, key));
+ assertEquals("PFER updated cache2", value, cache2.get(fqn, key));
+ assertEquals("Cache1 should be unaffected", value, cache1.get(fqn, key));
}
/**
@@ -297,56 +286,59 @@
*/
public void testMemLeakOnSuspendedTransactions() throws Exception
{
- PutForExternalReadTestBaseTL tl = threadLocal.get();
Fqn fqn2 = Fqn.fromString("/fqn/two");
- tl.tm1.begin();
- tl.cache1.putForExternalRead(fqn, key, value);
- tl.tm1.commit();
+ replListener2.expect(PutForExternalReadCommand.class);
+ tm1.begin();
+ cache1.putForExternalRead(fqn, key, value);
+ tm1.commit();
+ replListener2.waitForReplicationToOccur(10000);
- TestingUtil.sleepThread(500);
- assert tl.cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
- assert tl.cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
- assert tl.cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
- assert tl.cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+ assert cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+ assert cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+ assert cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+ assert cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
- tl.tm1.begin();
- tl.cache1.putForExternalRead(fqn, key, value);
- tl.cache1.put(fqn2, key, value);
- tl.tm1.commit();
+ System.out.println("PutForExternalReadTestBase.testMemLeakOnSuspendedTransactions");
+ //do not expectWithTx a PFER replication, as the node already exists so this is a no-op
+ replListener2.expectWithTx(PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.putForExternalRead(fqn, key, value);
+ cache1.put(fqn2, key, value);
+ tm1.commit();
+ replListener2.waitForReplicationToOccur();
- asyncWait();
+ assert cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+ assert cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+ assert cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+ assert cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
- assert tl.cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
- assert tl.cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
- assert tl.cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
- assert tl.cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+ replListener2.expectWithTx(PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.put(fqn2, key, value);
+ cache1.putForExternalRead(fqn, key, value);
+ tm1.commit();
+ replListener2.waitForReplicationToOccur(60000);
- tl.tm1.begin();
- tl.cache1.put(fqn2, key, value);
- tl.cache1.putForExternalRead(fqn, key, value);
- tl.tm1.commit();
+ assert cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+ assert cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+ assert cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+ assert cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
- asyncWait();
+ //do not expectWithTx a PFER replication, as the node already exists so this is a no-op
+ replListener2.expectWithTx(PutKeyValueCommand.class, PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.put(fqn2, key, value);
+ cache1.putForExternalRead(fqn, key, value);
+ cache1.put(fqn2, key, value);
+ tm1.commit();
+ replListener2.waitForReplicationToOccur(60000);
- assert tl.cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
- assert tl.cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
- assert tl.cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
- assert tl.cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
-
- tl.tm1.begin();
- tl.cache1.put(fqn2, key, value);
- tl.cache1.putForExternalRead(fqn, key, value);
- tl.cache1.put(fqn2, key, value);
- tl.tm1.commit();
-
- asyncWait();
-
- assert tl.cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
- assert tl.cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
- assert tl.cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
- assert tl.cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+ assert cache1.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+ assert cache1.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+ assert cache2.getTransactionTable().getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+ assert cache2.getTransactionTable().getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
}
/**
@@ -357,30 +349,29 @@
*/
private void cacheModeLocalTest(boolean transactional) throws Exception
{
- PutForExternalReadTestBaseTL tl = threadLocal.get();
RPCManager rpcManager = EasyMock.createMock(RPCManager.class);
- RPCManager originalRpcManager = tl.cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+ RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
// inject a mock RPC manager so that we can test whether calls made are sync or async.
- tl.cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
+ cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
- // specify that we expect nothing will be called on the mock Rpc Manager.
+ // specify that we expectWithTx nothing will be called on the mock Rpc Manager.
replay(rpcManager);
// now try a simple replication. Since the RPCManager is a mock object it will not actually replicate anything.
if (transactional)
- tl.tm1.begin();
+ tm1.begin();
- tl.cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
- tl.cache1.putForExternalRead(fqn, key, value);
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.putForExternalRead(fqn, key, value);
if (transactional)
- tl.tm1.commit();
+ tm1.commit();
verify(rpcManager);
// cleanup
- tl.cache1.getConfiguration().getRuntimeConfig().setRPCManager(originalRpcManager);
- tl.cache1.removeNode(fqn);
+ cache1.getConfiguration().getRuntimeConfig().setRPCManager(originalRpcManager);
+ cache1.removeNode(fqn);
}
protected abstract void assertLocked(Fqn fqn, CacheSPI cache, boolean writeLocked);
@@ -416,10 +407,4 @@
{
return nodeLockingScheme == NodeLockingScheme.OPTIMISTIC;
}
-
- protected void asyncWait()
- {
- // always needs to do this since PFER will force async comms.
- TestingUtil.sleepThread(500);
- }
}
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -101,6 +101,8 @@
cachesTL.set(caches);
assertNoLocks(caches);
+ TestingUtil.sleepThread(getSleepTimeout());
+
System.out.println("*** Testing cache 0");
waitForBuddy(caches.get(0), caches.get(1), false);
waitForBuddy(caches.get(0), caches.get(2), false);
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -70,6 +70,7 @@
System.out.println("***** About to kill original data owner (" + caches.get(0).getLocalAddress() + "). *****");
// forcefully kill data owner.
killChannel(caches.get(0), caches.get(1), 2);
+ waitForSingleBuddy(caches.get(1), caches.get(2));
System.out.println("Killed. Testing backup roots.");
TestingUtil.dumpCacheContents(caches);
@@ -217,7 +218,7 @@
assertNoLocks(caches);
// gravitate to 2:
- caches.get(2).getNode(fqn); // expect entire subtree to gravitate.
+ caches.get(2).getNode(fqn); // expectWithTx entire subtree to gravitate.
delay(); // cleanup commands are async
Fqn newBackupFqn = fqnTransformer.getBackupFqn(caches.get(2).getLocalAddress(), fqn);
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -324,6 +324,20 @@
waitForBuddy(dataOwner, buddy, onlyBuddy, 60000);
}
+ /**
+ * Will wait for 60 secs + 10sec * caches.length for the given caches to become buddys.
+ * The caches should be ordered as per underlying view.
+ */
+ public void waitForSingleBuddy(Cache... caches) throws Exception
+ {
+ long timeout = 60000 + caches.length;
+ for (int i = 0; i < caches.length - 1; i++)
+ {
+ waitForBuddy(caches[i], caches[i+1], true, timeout);
+ }
+ waitForBuddy(caches[caches.length - 1], caches[0], true, timeout);
+ }
+
public void waitForBuddy(Cache dataOwner, Cache buddy, boolean onlyBuddy, long timeout) throws Exception
{
long start = System.currentTimeMillis();
Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -4,8 +4,7 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationQueueNotifier;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -46,8 +45,8 @@
public void testTransactionalReplication() throws Exception
{
- ReplicationListener cache1Listener = new ReplicationListener(cache);
- ReplicationListener cache2Listener = new ReplicationListener(cache2);
+ ReplicationListener cache1Listener = ReplicationListener.getReplicationListener(cache);
+ ReplicationListener cache2Listener = ReplicationListener.getReplicationListener(cache2);
cache2Listener.expect(PutKeyValueCommand.class);
// outside of tx scope
Modified: core/trunk/src/test/java/org/jboss/cache/eviction/ExpirationPolicyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/eviction/ExpirationPolicyTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/eviction/ExpirationPolicyTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -72,19 +72,22 @@
public void testUpdateToFuture() throws Exception
{
+ future = System.currentTimeMillis() + 2500;
+ past = System.currentTimeMillis() - 1000;
try
{
+ EvictionController evictionController = new EvictionController(cache);
log.info("update 1 from future to past");
cache.put(fqn1, ExpirationAlgorithmConfig.EXPIRATION_KEY, future);
- TestingUtil.sleepThread(200);
- new EvictionController(cache).startEviction();
+ TestingUtil.sleepThread(1000);
+ evictionController.startEviction();
assertNotNull(cache.getNode(fqn1));
- cache.put(fqn1, ExpirationAlgorithmConfig.EXPIRATION_KEY, future + 250);
- TestingUtil.sleepThread(500);
- new EvictionController(cache).startEviction();
+ cache.put(fqn1, ExpirationAlgorithmConfig.EXPIRATION_KEY, future + 1200);
+ TestingUtil.sleepThread(2000);
+ evictionController.startEviction();
assertNotNull(cache.getNode(fqn1));
- TestingUtil.sleepThread(100);
- new EvictionController(cache).startEviction();
+ TestingUtil.sleepThread(1000);
+ evictionController.startEviction();
assertNull(cache.getNode(fqn1));
}
finally
Modified: core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -13,12 +13,13 @@
import org.jboss.cache.Node;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.CacheLoaderConfig;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.optimistic.DefaultDataVersion;
import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -118,18 +119,18 @@
cache2.start();
Fqn fqn = Fqn.fromString("/a/b");
- ReplicationListener replListener1 = new ReplicationListener(cache1);
- ReplicationListener replListener2 = new ReplicationListener(cache2);
+ ReplicationListener replListener1 = ReplicationListener.getReplicationListener(cache1);
+ ReplicationListener replListener2 = ReplicationListener.getReplicationListener(cache2);
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(fqn, "key", "value");
- replListener2.waitForReplicationToOccur(500);
+ replListener2.waitForReplicationToOccur();
// TestingUtil.sleepThread(500);// give it time to broadcast the evict call
// test that this has NOT replicated, but rather has been invalidated:
assertEquals("value", cache1.get(fqn, "key"));
assertNull("Should NOT have replicated!", cache2.getNode(fqn));
- replListener1.expectAny();
+ replListener1.expect(PutKeyValueCommand.class);
// now make sure cache2 is in sync with cache1:
cache2.put(fqn, "key", "value");
// TestingUtil.sleepThread(500);// give it time to broadcast the evict call
@@ -140,7 +141,7 @@
assertHasBeenInvalidated(n, "Should have been invalidated");
assertEquals("value", cache2.get(fqn, "key"));
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
// now test the invalidation:
cache1.put(fqn, "key2", "value2");
assertEquals("value2", cache1.get(fqn, "key2"));
Modified: core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -12,11 +12,12 @@
import org.jboss.cache.UnitTestCacheFactory;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.transaction.DummyTransactionManagerLookup;
import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;
import org.testng.annotations.AfterMethod;
@@ -202,7 +203,7 @@
public void testPessAsyncRepl() throws Exception
{
createCaches(false, false);
- ReplicationListener replListener = new ReplicationListener(cache2);
+ ReplicationListener replListener = ReplicationListener.getReplicationListener(cache2);
mgr1.begin();
cache1.put(fqn, key, "value");
@@ -212,9 +213,9 @@
assertNull(loader1.get(fqn));
assertNull(loader2.get(fqn));
- replListener.expect(PrepareCommand.class);
+ replListener.expectWithTx(PutKeyValueCommand.class);
mgr1.commit();
- replListener.waitForReplicationToOccur(500);
+ replListener.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
@@ -278,10 +279,10 @@
{
createCaches(false, true);
- ReplicationListener replListener = new ReplicationListener(cache2);
+ ReplicationListener replListener = ReplicationListener.getReplicationListener(cache2);
mgr1.begin();
- replListener.expect(CommitCommand.class);
+ replListener.expectWithTx(PutKeyValueCommand.class);
cache1.put(fqn, key, "value");
assertEquals("value", cache1.get(fqn, key));
@@ -291,7 +292,7 @@
mgr1.commit();
- replListener.waitForReplicationToOccur(500);
+ replListener.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -12,29 +12,23 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
-import org.jboss.cache.commands.legacy.write.PessPutKeyValueCommand;
+import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.marshall.data.Address;
import org.jboss.cache.marshall.data.Person;
import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import javax.transaction.*;
import java.lang.reflect.Method;
-import org.jboss.cache.UnitTestCacheFactory;
-import org.jboss.cache.config.Configuration;
/**
* Test marshalling for async mode.
@@ -66,8 +60,8 @@
cache2 = createCache("TestCache");
- replListener1 = new ReplicationListener(cache1);
- replListener2 = new ReplicationListener(cache2);
+ replListener1 = ReplicationListener.getReplicationListener(cache1);
+ replListener2 = ReplicationListener.getReplicationListener(cache2);
addr = new Address();
addr.setCity("San Jose");
ben = new Person();
@@ -121,11 +115,11 @@
}
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
- replListener2.expect(PessPutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(aop, "person", ben);
replListener2.waitForReplicationToOccur(500);
- replListener2.expect(PessPutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/alias"), "person", ben);
replListener2.waitForReplicationToOccur(500);
@@ -150,7 +144,7 @@
// Set it back to the cache
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
- replListener1.expect(PessPutKeyValueCommand.class);
+ replListener1.expect(PutKeyValueCommand.class);
cache2.put(aop, "person", ben2);
replListener1.waitForReplicationToOccur(1000);
if (useMarshalledValues) resetContextClassLoader();
@@ -180,11 +174,11 @@
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
- replListener2.expect(PessPutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/aop/1"), "person", ben);
replListener2.waitForReplicationToOccur(1000);
- replListener2.expect(PessPutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/aop/2"), "person", scopedBen1);
replListener2.waitForReplicationToOccur(1000);
@@ -204,7 +198,7 @@
public void testTxPut() throws Exception
{
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
beginTransaction();
cache1.put(aop, "person", ben);
commit();
@@ -228,7 +222,7 @@
}
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
beginTransaction();
cache1.put(aop, "person", ben);
commit();
@@ -254,7 +248,7 @@
// Set it back to the cache
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
- replListener1.expectAny();
+ replListener1.expect(PutKeyValueCommand.class);
cache2.put(aop, "person", ben2);
if (useMarshalledValues) resetContextClassLoader();
replListener1.waitForReplicationToOccur(100);
@@ -282,7 +276,7 @@
Fqn base = Fqn.fromString("/aop");
Fqn fqn = Fqn.fromRelativeElements(base, custom1);
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(fqn, "key", "value");
replListener2.waitForReplicationToOccur(10000);
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -4,11 +4,12 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -50,7 +51,7 @@
c1.start();
c2 = new UnitTestCacheFactory<Object, Object>().createCache(c1.getConfiguration().clone());
- replListener2 = new ReplicationListener(c2);
+ replListener2 = ReplicationListener.getReplicationListener(c2);
TestingUtil.blockUntilViewsReceived(60000, c1, c2);
}
@@ -69,7 +70,7 @@
c1.getRegion(fqn.getParent(), true).registerContextClassLoader(getClass().getClassLoader());
c2.getRegion(fqn.getParent(), true).registerContextClassLoader(getClass().getClassLoader());
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
// write something; will cause a stale region to be stored in C2's cache marshaller
c1.put(fqn, "k", "v");
assert c1.get(fqn, "k").equals("v");
Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -12,7 +12,7 @@
import org.jboss.cache.transaction.OptimisticTransactionContext;
import org.jboss.cache.transaction.TransactionTable;
import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.Test;
@@ -175,7 +175,7 @@
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
- ReplicationListener replListener2 = new ReplicationListener(cache2);
+ ReplicationListener replListener2 = ReplicationListener.getReplicationListener(cache2);
LockManager lockManager = TestingUtil.extractLockManager(cache);
LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
@@ -189,11 +189,11 @@
SamplePojo pojo = new SamplePojo(21, "test");
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
cache.put("/one/two", "key1", pojo);
mgr.commit();
- replListener2.waitForReplicationToOccur(1000);
+ replListener2.waitForReplicationToOccur();
// cache asserts
assertNull(mgr.getTransaction());
@@ -237,7 +237,7 @@
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
- ReplicationListener replListener2 = new ReplicationListener(cache2);
+ ReplicationListener replListener2 = ReplicationListener.getReplicationListener(cache2);
LockManager lockManager = TestingUtil.extractLockManager(cache);
LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
@@ -251,7 +251,7 @@
SamplePojo pojo = new SamplePojo(21, "test");
- replListener2.expect(PutKeyValueCommand.class, CommitCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache.put("/one/two", "key1", pojo);
mgr.commit();
@@ -291,9 +291,9 @@
assertNotNull(cache2.get(Fqn.fromString("/one/two"), "key1"));
replListener2.expect(RemoveNodeCommand.class);
- replListener2.expect(CommitCommand.class);
+ replListener2.expect();
cache.removeNode("/one/two");
- replListener2.waitForReplicationToOccur(1000);
+ replListener2.waitForReplicationToOccur();
assertEquals(false, cache.exists("/one/two"));
assertEquals(null, cache.get("/one/two", "key1"));
@@ -307,7 +307,7 @@
{
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
- ReplicationListener replListener = new ReplicationListener(cache);
+ ReplicationListener replListener = ReplicationListener.getReplicationListener(cache);
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
LockManager lockManager = TestingUtil.extractLockManager(cache);
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationOptLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationOptLocksTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationOptLocksTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -21,14 +21,4 @@
nodeLockingScheme = "OPTIMISTIC";
isInvalidation = true;
}
-
- protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
- {
- //do nothing
- }
-
- protected void verifyReplication()
- {
- TestingUtil.sleepThread(500);
- }
}
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationPessLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationPessLocksTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncInvalidationPessLocksTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -21,19 +21,4 @@
nodeLockingScheme = "PESSIMISTIC";
isInvalidation = true;
}
-
- protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
- {
- //do nothing
- }
-
- protected void verifyReplication()
- {
- TestingUtil.sleepThread(500);
- }
-
- public void testPutKeyValueViaNodeAPI() throws Exception
- {
- super.testPutDataViaNodeAPI();
- }
}
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplOptLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplOptLocksTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplOptLocksTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -22,75 +22,4 @@
cacheMode = Configuration.CacheMode.REPL_ASYNC;
nodeLockingScheme = "OPTIMISTIC";
}
-
- protected void verifyReplication()
- {
- TestingUtil.sleepThread(500);
- }
-
- public void testRemoveKey() throws Exception
- {
- super.testRemoveKey();
- }
-
- public void testPutKeyValue() throws Exception
- {
- super.testPutKeyValue();
- }
-
- public void testPutKeyValueViaNodeAPI() throws Exception
- {
- super.testPutKeyValueViaNodeAPI();
- }
-
- public void testPutData() throws Exception
- {
- super.testPutData();
- }
-
- public void testPutDataViaNodeAPI() throws Exception
- {
- super.testPutDataViaNodeAPI();
- }
-
- public void testRemoveNode() throws Exception
- {
- super.testRemoveNode();
- }
-
- public void testRemoveNodeViaNodeAPI() throws Exception
- {
- super.testRemoveNodeViaNodeAPI();
- }
-
- public void testRemoveKeyViaNodeAPI() throws Exception
- {
- super.testRemoveKeyViaNodeAPI();
- }
-
- public void testTransactionalBehaviourCommit() throws Exception
- {
- super.testTransactionalBehaviourCommit();
- }
-
- public void testTransactionalBehaviourRollback() throws Exception
- {
- super.testTransactionalBehaviourRollback();
- }
-
- public void testTransactionalBehaviourViaNodeAPI() throws Exception
- {
- super.testTransactionalBehaviourViaNodeAPI();
- }
-
- public void testAddChild() throws Exception
- {
- super.testAddChild();
- }
-
- protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
- {
- //do nothing
- }
-
}
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplPessLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplPessLocksTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/AsyncReplPessLocksTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -7,7 +7,7 @@
package org.jboss.cache.options.cachemodelocal;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.legacy.write.PessPutDataMapCommand;
import org.jboss.cache.commands.legacy.write.PessPutKeyValueCommand;
@@ -27,7 +27,6 @@
@Test(groups = {"functional", "jgroups"}, testName = "options.cachemodelocal.AsyncReplPessLocksTest")
public class AsyncReplPessLocksTest extends CacheModeLocalTestBase
{
-
ReplicationListener current;
Map<Cache, ReplicationListener> cache2Listener = new HashMap<Cache, ReplicationListener>(2);
@@ -37,29 +36,4 @@
cacheMode = Configuration.CacheMode.REPL_ASYNC;
nodeLockingScheme = "PESSIMISTIC";
}
-
- protected void registerReplicationCommand(CacheSPI<String, String> cache, Class<? extends ReplicableCommand> what)
- {
- if (what == PutDataMapCommand.class) what = PessPutDataMapCommand.class;
- if (what == PutKeyValueCommand.class) what = PessPutKeyValueCommand.class;
- if (what == RemoveKeyCommand.class) what = PessRemoveKeyCommand.class;
- if (what == RemoveNodeCommand.class) what = PessRemoveNodeCommand.class;
- getCacheListener(cache).expect(what);
- }
-
- protected void verifyReplication()
- {
- current.waitForReplicationToOccur(2000);
- }
-
- public ReplicationListener getCacheListener(Cache cache)
- {
- current = cache2Listener.get(cache);
- if (current == null)
- {
- current = new ReplicationListener(cache);
- cache2Listener.put(cache, current);
- }
- return current;
- }
}
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -19,6 +19,7 @@
import org.jboss.cache.commands.write.RemoveNodeCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -45,12 +46,15 @@
* set this to true if the implementing class plans to use an invalidating cache mode *
*/
protected boolean isInvalidation;
+ CacheSPI<String, String> cache1;
+ CacheSPI<String, String> cache2;
- private ThreadLocal<CacheSPI<String, String>> cache1TL = new ThreadLocal<CacheSPI<String, String>>();
- private ThreadLocal<CacheSPI<String, String>> cache2TL = new ThreadLocal<CacheSPI<String, String>>();
- private ThreadLocal<NodeSPI<String, String>> root1TL = new ThreadLocal<NodeSPI<String, String>>();
- private ThreadLocal<NodeSPI<String, String>> root2TL = new ThreadLocal<NodeSPI<String, String>>();
+ NodeSPI<String, String> root1;
+ NodeSPI<String, String> root2;
+ ReplicationListener replListener1;
+ ReplicationListener replListener2;
+
private final Fqn fqn = Fqn.fromString("/a");
private final String key = "key";
@@ -58,13 +62,6 @@
public void setUp() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
-
- // force a tear down if the test runner didn't run one before (happens in IDEA)
- if (cache1 != null || cache2 != null)
- tearDown();
-
CacheFactory<String, String> instance = new UnitTestCacheFactory<String, String>();
Configuration c = new Configuration();
@@ -89,29 +86,22 @@
cache2 = (CacheSPI<String, String>) instance.createCache(c, false);
cache2.start();
- cache1TL.set(cache1);
- cache2TL.set(cache2);
- root1TL.set(cache1.getRoot());
- root2TL.set(cache2.getRoot());
+ root1 = cache1.getRoot();
+ root2 = cache2.getRoot();
+
+ replListener1 = ReplicationListener.getReplicationListener(cache1);
+ replListener2 = ReplicationListener.getReplicationListener(cache2);
}
@AfterMethod(alwaysRun = true)
public void tearDown()
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
-
TestingUtil.killCaches(cache1, cache2);
- cache1TL.set(null);
- cache2TL.set(null);
}
public void testPutKeyValue() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
-
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
cache1.put(fqn, key, "value");
Thread.sleep(500);
@@ -123,10 +113,10 @@
assertNull("Should be null", cache2.get(fqn, key));
// now try again with passing the default options
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.getInvocationContext().getOptionOverrides().reset();
cache1.put(fqn, key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// cache1 should still have this
assertEquals("value", cache1.get(fqn, key));
@@ -147,10 +137,10 @@
assertEquals("value2", cache2.get(fqn, key));
assertEquals("value", cache1.get(fqn, key));
- registerReplicationCommand(cache1, PutKeyValueCommand.class);
+ replListener1.expect(PutKeyValueCommand.class);
cache2.getInvocationContext().getOptionOverrides().reset();
cache2.put(fqn, key, "value2");
- verifyReplication();
+ replListener1.waitForReplicationToOccur();
assertEquals("value2", cache2.get(fqn, key));
if (!isInvalidation)
{
@@ -164,11 +154,6 @@
public void testPutKeyValueViaNodeAPI() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
- NodeSPI<String, String> root1 = root1TL.get();
- NodeSPI<String, String> root2 = root2TL.get();
-
Node node1 = root1.addChild(fqn);
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
node1.put(key, "value");
@@ -180,10 +165,10 @@
assertNull("Should be null", cache2.get(fqn, key));
// now try again with passing the default options
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.getInvocationContext().getOptionOverrides().reset();
node1.put(key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// cache1 should still have this
assertEquals("value", cache1.get(fqn, key));
@@ -207,10 +192,10 @@
assertEquals("value2", cache2.get(fqn, key));
assertEquals("value", cache1.get(fqn, key));
- registerReplicationCommand(cache1, PutKeyValueCommand.class);
+ replListener1.expect(PutKeyValueCommand.class);
cache2.getInvocationContext().getOptionOverrides().reset();
node2.put(key, "value2");
- verifyReplication();
+ replListener1.waitForReplicationToOccur();
assertEquals("value2", cache2.get(fqn, key));
if (!isInvalidation)
{
@@ -224,9 +209,6 @@
public void testPutData() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
-
Map<String, String> map = new HashMap<String, String>();
map.put(key, "value");
@@ -239,10 +221,10 @@
assertNull("Should be null", cache2.get(fqn, key));
// now try again with passing the default options
- registerReplicationCommand(cache2, PutDataMapCommand.class);
+ replListener2.expect(PutDataMapCommand.class);
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
cache1.put(fqn, map);
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// cache1 should still have this
assertEquals("value", cache1.get(fqn, key));
// cache 2 should as well
@@ -264,10 +246,10 @@
assertEquals("value2", cache2.get(fqn, key));
assertEquals("value", cache1.get(fqn, key));
- registerReplicationCommand(cache1, PutKeyValueCommand.class);
+ replListener1.expect(PutKeyValueCommand.class);
cache2.getInvocationContext().getOptionOverrides().reset();
cache2.put(fqn, key, "value2");
- verifyReplication();
+ replListener1.waitForReplicationToOccur();
assertEquals("value2", cache2.get(fqn, key));
if (!isInvalidation)
{
@@ -281,11 +263,6 @@
public void testPutDataViaNodeAPI() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
- NodeSPI<String, String> root1 = root1TL.get();
- NodeSPI<String, String> root2 = root2TL.get();
-
Map<String, String> map = new HashMap<String, String>();
map.put(key, "value");
@@ -299,10 +276,10 @@
assertNull("Should be null", cache2.get(fqn, key));
// now try again with passing the default options
- registerReplicationCommand(cache2, PutDataMapCommand.class);
+ replListener2.expect(PutDataMapCommand.class);
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
node1.putAll(map);
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// cache1 should still have this
assertEquals("value", cache1.get(fqn, key));
// cache 2 should as well
@@ -326,10 +303,10 @@
assertEquals("value2", cache2.get(fqn, key));
assertEquals("value", cache1.get(fqn, key));
- registerReplicationCommand(cache1, PutKeyValueCommand.class);
+ replListener1.expect(PutKeyValueCommand.class);
cache2.getInvocationContext().getOptionOverrides().reset();
node2.put(key, "value2");
- verifyReplication();
+ replListener1.waitForReplicationToOccur();
assertEquals("value2", cache2.get(fqn, key));
if (!isInvalidation)
{
@@ -343,15 +320,12 @@
public void testRemoveNode() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
-
// put some stuff in the cache first
// make sure we cleanup thread local vars.
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.getInvocationContext().setOptionOverrides(null);
cache1.put(fqn, key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
if (isInvalidation)
{
@@ -379,9 +353,9 @@
}
// replace cache entries
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(fqn, key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
if (isInvalidation)
{
@@ -393,10 +367,10 @@
}
// now try again with passing the default options
- registerReplicationCommand(cache2, RemoveNodeCommand.class);
+ replListener2.expect(RemoveNodeCommand.class);
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
cache1.removeNode(fqn);
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// both should be null
assertNull("should be null", cache1.get(fqn, key));
@@ -405,18 +379,14 @@
public void testRemoveNodeViaNodeAPI() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
- NodeSPI<String, String> root1 = root1TL.get();
- NodeSPI<String, String> root2 = root2TL.get();
// put some stuff in the cache first
// make sure we cleanup thread local vars.
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.getInvocationContext().setOptionOverrides(null);
cache1.put(fqn, key, "value");
assertEquals("value", cache1.get(fqn, key));
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
if (isInvalidation)
{
assertNull("Should be null", cache2.get(fqn, key));
@@ -443,9 +413,9 @@
}
// replace cache entries
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(fqn, key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
if (isInvalidation)
{
@@ -456,11 +426,11 @@
assertEquals("value", cache2.get(fqn, key));
}
- registerReplicationCommand(cache2, RemoveNodeCommand.class);
+ replListener2.expect(RemoveNodeCommand.class);
// now try again with passing the default options
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
root1.removeChild(fqn);
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// both should be null
assertNull("should be null", cache1.get(fqn, key));
@@ -469,14 +439,11 @@
public void testRemoveKey() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
-
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect((Class<? extends ReplicableCommand>) PutKeyValueCommand.class);
// put some stuff in the cache first
cache1.getInvocationContext().setOptionOverrides(null);
cache1.put(fqn, key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
if (isInvalidation)
{
@@ -503,10 +470,10 @@
assertEquals("value", cache2.get(fqn, key));
}
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
// replace cache entries
cache1.put(fqn, key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
if (isInvalidation)
{
@@ -517,11 +484,11 @@
assertEquals("value", cache2.get(fqn, key));
}
- registerReplicationCommand(cache2, RemoveKeyCommand.class);
+ replListener2.expect(RemoveKeyCommand.class);
// now try again with passing the default options
cache1.getInvocationContext().getOptionOverrides().reset();
cache1.remove(fqn, key);
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// both should be null
assertNull("should be null", cache1.get(fqn, key));
@@ -530,17 +497,12 @@
public void testRemoveKeyViaNodeAPI() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
- NodeSPI<String, String> root1 = root1TL.get();
- NodeSPI<String, String> root2 = root2TL.get();
-
// put some stuff in the cache first
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
Node node1 = root1.addChild(fqn);
cache1.getInvocationContext().setOptionOverrides(null);
node1.put(key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
if (isInvalidation)
{
@@ -568,9 +530,9 @@
}
// replace cache entries
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expect(PutKeyValueCommand.class);
node1.put(key, "value");
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
assertEquals("value", cache1.get(fqn, key));
if (isInvalidation)
{
@@ -582,10 +544,10 @@
}
// now try again with passing the default options
- registerReplicationCommand(cache2, RemoveKeyCommand.class);
+ replListener2.expect(RemoveKeyCommand.class);
cache1.getInvocationContext().getOptionOverrides().reset();
node1.remove(key);
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// both should be null
assertNull("should be null", cache1.get(fqn, key));
@@ -594,18 +556,15 @@
public void testTransactionalBehaviourCommit() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
-
TransactionManager mgr = cache1.getTransactionManager();
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expectWithTx(PutKeyValueCommand.class);
mgr.begin();
cache1.getInvocationContext().getOptionOverrides().reset();
cache1.put(fqn, key, "value1");
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
cache1.put(fqn, key, "value2");
mgr.commit();
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// cache1 should still have this
assertEquals("value2", cache1.get(fqn, key));
@@ -618,7 +577,7 @@
assertNull(cache2.get(fqn, key));
}
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expectWithTx(PutKeyValueCommand.class);
// now try again with passing the default options
mgr.begin();
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
@@ -626,7 +585,7 @@
cache1.getInvocationContext().getOptionOverrides().reset();
cache1.put(fqn, key, "value");
mgr.commit();
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// cache1 should still have this
assertEquals("value", cache1.get(fqn, key));
@@ -641,7 +600,7 @@
}
// now cache2
- registerReplicationCommand(cache1, PutKeyValueCommand.class);
+ replListener1.expect(PutKeyValueCommand.class);
mgr = cache2.getTransactionManager();
mgr.begin();
cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
@@ -649,7 +608,7 @@
cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
cache2.put(fqn, key, "value2");
mgr.commit();
- verifyReplication();
+ replListener1.waitForReplicationToOccur();
assertEquals("value2", cache2.get(fqn, key));
@@ -662,14 +621,14 @@
assertNull(cache1.get(fqn, key));
}
- registerReplicationCommand(cache1, PutKeyValueCommand.class);
+ replListener1.expectWithTx(PutKeyValueCommand.class);
mgr.begin();
cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
cache2.put(fqn, key, "value2");
cache2.getInvocationContext().getOptionOverrides().reset();
cache2.put(fqn, key, "value4");
mgr.commit();
- verifyReplication();
+ replListener1.waitForReplicationToOccur();
assertEquals("value4", cache2.get(fqn, key));
if (!isInvalidation)
{
@@ -684,15 +643,12 @@
public void testTransactionalBehaviourRollback() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
-
TransactionManager mgr = cache1.getTransactionManager();
- // create these first ...
+ replListener2.expect(PutKeyValueCommand.class, PutKeyValueCommand.class);
cache1.put("/a", key, "old");
cache1.put("/b", key, "old");
- Thread.sleep(500);
+ replListener2.waitForReplicationToOccur();
mgr.begin();
@@ -720,13 +676,8 @@
public void testTransactionalBehaviourViaNodeAPI() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
- NodeSPI<String, String> root1 = root1TL.get();
- NodeSPI<String, String> root2 = root2TL.get();
-
Node node1 = root1.addChild(fqn);
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expectWithTx(PutKeyValueCommand.class);
TransactionManager mgr = cache1.getTransactionManager();
mgr.begin();
cache1.getInvocationContext().getOptionOverrides().reset();
@@ -734,7 +685,8 @@
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
node1.put(key, "value2");
mgr.commit();
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
+
// cache1 should still have this
assertEquals("value2", cache1.get(fqn, key));
@@ -748,14 +700,14 @@
}
// now try again with passing the default options
- registerReplicationCommand(cache2, PutKeyValueCommand.class);
+ replListener2.expectWithTx(PutKeyValueCommand.class);
mgr.begin();
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
node1.put(key, "value3");
cache1.getInvocationContext().getOptionOverrides().reset();
node1.put(key, "value");
mgr.commit();
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// cache1 should still have this
assertEquals("value", cache1.get(fqn, key));
@@ -772,14 +724,14 @@
// now cache2
Node node2 = root2.addChild(fqn);
mgr = cache2.getTransactionManager();
- registerReplicationCommand(cache1, PutKeyValueCommand.class);
+ replListener1.expectWithTx(PutKeyValueCommand.class);
mgr.begin();
cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
node2.put(key, "value3");
cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
node2.put(key, "value2");
mgr.commit();
- verifyReplication();
+ replListener1.waitForReplicationToOccur();
assertEquals("value2", cache2.get(fqn, key));
@@ -792,14 +744,14 @@
assertNull(cache1.get(fqn, key));
}
- registerReplicationCommand(cache1, PutKeyValueCommand.class);
+ replListener1.expectWithTx(PutKeyValueCommand.class);
mgr.begin();
cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
node2.put(key, "value2");
cache2.getInvocationContext().getOptionOverrides().reset();
node2.put(key, "value4");
mgr.commit();
- verifyReplication();
+ replListener1.waitForReplicationToOccur();
assertEquals("value4", cache2.get(fqn, key));
if (!isInvalidation)
{
@@ -814,14 +766,9 @@
public void testAddChild() throws Exception
{
- CacheSPI<String, String> cache1 = cache1TL.get();
- CacheSPI<String, String> cache2 = cache2TL.get();
- NodeSPI<String, String> root1 = root1TL.get();
- NodeSPI<String, String> root2 = root2TL.get();
-
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
root1.addChild(fqn);
- Thread.sleep(1000);
+
// cache1 should still have this
assertTrue(root1.hasChild(fqn));
// cache 2 should not
@@ -829,12 +776,15 @@
assertTrue("Should be null", node2 == null || (isInvalidation && !node2.isValid()));
// now try again with passing the default options
+ replListener2.expect(RemoveNodeCommand.class);
root1.removeChild(fqn);
+ replListener2.waitForReplicationToOccur();
+
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
- registerReplicationCommand(cache2, PutDataMapCommand.class);
+ replListener2.expect(PutDataMapCommand.class);
root1.addChild(fqn);
- verifyReplication();
+ replListener2.waitForReplicationToOccur();
// cache1 should still have this
assertTrue(root1.hasChild(fqn));
@@ -848,19 +798,4 @@
assertTrue("Should be null", node2 == null || !node2.isValid());
}
}
-
- /**
- * Register a command that should be received by the specified cache as the result of a replication taking place.
- * @param cache the cache on which the given command should be replicated.
- * @param what the command that should be replicated
- * @see org.jboss.cache.options.cachemodelocal.AsyncReplPessLocksTest#registerReplicationCommand(org.jboss.cache.CacheSPI, Class)
- */
- protected abstract void registerReplicationCommand(CacheSPI<String, String> cache, Class<? extends ReplicableCommand> what);
-
-
- /**
- * Wait a while until the give command gets replicated.
- * @see AsyncReplPessLocksTest#verifyReplication()
- */
- protected abstract void verifyReplication();
}
\ No newline at end of file
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationOptLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationOptLocksTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationOptLocksTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -20,13 +20,4 @@
nodeLockingScheme = "OPTIMISTIC";
isInvalidation = true;
}
-
- protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
- {
- //do nothing
- }
-
- protected void verifyReplication()
- {
- }
}
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationPessLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationPessLocksTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncInvalidationPessLocksTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -20,13 +20,4 @@
nodeLockingScheme = "PESSIMISTIC";
isInvalidation = true;
}
-
- protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
- {
- //do nothing
- }
-
- protected void verifyReplication()
- {
- }
}
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplOptLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplOptLocksTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplOptLocksTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -20,15 +20,4 @@
cacheMode = Configuration.CacheMode.REPL_SYNC;
nodeLockingScheme = "OPTIMISTIC";
}
-
- protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
- {
- //do nothing
- }
-
- protected void verifyReplication()
- {
- TestingUtil.sleepThread(250);
- }
-
}
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplPessLocksTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplPessLocksTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/SyncReplPessLocksTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -19,13 +19,4 @@
cacheMode = Configuration.CacheMode.REPL_SYNC;
nodeLockingScheme = "PESSIMISTIC";
}
-
- protected void registerReplicationCommand(CacheSPI<String, String> cache2, Class<? extends ReplicableCommand> what)
- {
- //do nothing
- }
-
- protected void verifyReplication()
- {
- }
}
Modified: core/trunk/src/test/java/org/jboss/cache/replicated/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/replicated/AsyncReplTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/replicated/AsyncReplTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -14,7 +14,7 @@
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.util.TestingUtil;
-import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -22,6 +22,7 @@
import javax.transaction.TransactionManager;
import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.Configuration;
/**
@@ -47,11 +48,11 @@
threadLocal.set(tl);
log("creating cache1");
tl.cache1 = createCache("CacheGroup");
- tl.replListener1 = new ReplicationListener(tl.cache1);
+ tl.replListener1 = ReplicationListener.getReplicationListener(tl.cache1);
log("creating cache2");
tl.cache2 = createCache("CacheGroup");
- tl.replListener2 = new ReplicationListener(tl.cache2);
+ tl.replListener2 = ReplicationListener.getReplicationListener(tl.cache2);
}
@@ -116,25 +117,24 @@
Fqn fqn = Fqn.fromString("/a");
String key = "key";
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(fqn, key, "value1");
// allow for replication
- replListener2.waitForReplicationToOccur(500);
+ replListener2.waitForReplicationToOccur();
assertEquals("value1", cache1.get(fqn, key));
assertEquals("value1", cache2.get(fqn, key));
TransactionManager mgr = cache1.getTransactionManager();
mgr.begin();
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(fqn, key, "value2");
assertEquals("value2", cache1.get(fqn, key));
assertEquals("value1", cache2.get(fqn, key));
mgr.commit();
+ replListener2.waitForReplicationToOccur();
- replListener2.waitForReplicationToOccur(500);
-
assertEquals("value2", cache1.get(fqn, key));
assertEquals("value2", cache2.get(fqn, key));
@@ -162,7 +162,7 @@
{
cache3 = createCache("DifferentGroup");
cache4 = createCache("DifferentGroup");
- replListener2.expectAny();
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put("/a/b/c", "age", 38);
// because we use async repl, modfication may not yet have been propagated to cache2, so
// we have to wait a little
Modified: core/trunk/src/test/java/org/jboss/cache/replicated/PessimisticSyncReplTxTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/replicated/PessimisticSyncReplTxTest.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/replicated/PessimisticSyncReplTxTest.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -61,7 +61,7 @@
public void setUp() throws Exception
{
t1_ex = t2_ex = null;
- lock = new Semaphore(1);
+ lock = new Semaphore(1, true);
}
@AfterMethod(alwaysRun = true)
@@ -938,6 +938,11 @@
{
initCaches(Configuration.CacheMode.REPL_SYNC);
final CacheSPI<Object, Object> c1 = this.cache1;
+
+ final Semaphore threadOneFirstPart = new Semaphore(0);
+ final Semaphore threadTwoFirstPart = new Semaphore(0);
+ final Semaphore threadOneSecondPart = new Semaphore(0);
+
Thread t1 = new Thread()
{
public void run()
@@ -946,14 +951,12 @@
try
{
- lock.acquire();
tm = beginTransaction();
c1.put("/a/b/c", "age", 38);
c1.put("/a/b/c", "age", 39);
- lock.release();
+ threadOneFirstPart.release();
- TestingUtil.sleepThread(300);
- lock.acquire();
+ threadTwoFirstPart.acquire();
try
{
tm.commit();
@@ -961,10 +964,9 @@
catch (RollbackException ex)
{
System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
- }
- finally
+ } finally
{
- lock.release();
+ threadOneSecondPart.release();
}
}
catch (Throwable ex)
@@ -972,10 +974,6 @@
ex.printStackTrace();
t1_ex = ex;
}
- finally
- {
- lock.release();
- }
}
};
@@ -987,21 +985,17 @@
try
{
- sleep(200);
- Thread.yield();
- lock.acquire();
+ threadOneFirstPart.acquire();
tm = beginTransaction();
assertNull(cache2.get("/a/b/c", "age"));// must be null as not yet committed
cache2.put("/a/b/c", "age", 40);
- lock.release();
- TestingUtil.sleepThread(300);
- lock.acquire();
+ threadTwoFirstPart.release();
+
+ threadOneSecondPart.acquire();
assertEquals(40, cache2.get("/a/b/c", "age"));// must not be null
tm.commit();
- lock.release();
- TestingUtil.sleepThread(1000);
tm = beginTransaction();
assertEquals("After cache2 commit", 40, cache2.get("/a/b/c", "age"));
tm.commit();
Deleted: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-11-19 11:46:13 UTC (rev 7162)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -1,336 +0,0 @@
-package org.jboss.cache.util.internals;
-
-import org.jboss.cache.Cache;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.RPCManager;
-import org.jboss.cache.config.Configuration;
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.WriteCommand;
-import org.jboss.cache.commands.legacy.write.*;
-import org.jboss.cache.commands.write.*;
-import org.jboss.cache.commands.remote.ReplicateCommand;
-import org.jboss.cache.commands.tx.PrepareCommand;
-import org.jboss.cache.commands.tx.CommitCommand;
-import org.jboss.cache.factories.ComponentRegistry;
-import org.jboss.cache.io.ByteBuffer;
-import org.jboss.cache.marshall.AbstractMarshaller;
-import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
-import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.marshall.RegionalizedMethodCall;
-import org.jboss.cache.util.TestingUtil;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
-
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility class that notifies when certain commands were asynchronously replicated on secondary cache.
- * Especially useful for avaoiding Thread.sleep() statements.
- * <p/>
- * Usage:
- * <pre>
- * Cache c1, c2; //these being two async caches
- * AsyncReplicationListener listener2 = new AsyncReplicationListener(c2);
- * listener2.expect(PutKeyValueCommand.class);
- * c1.put(fqn, key, value);
- * listener2.waitForReplicationToOccur(1000); // -this will block here untill c2 recieves the PutKeyValueCommand command
- * </pre>
- * Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the object returns to the
- * non-initialized state and *can* be reused through expect-wait cycle.
- * <b>Note</b>: this class might be used aswell for sync caches, e.g. a test could have subclasses which use sync and
- * async replication
- *
- * @author Mircea.Markus at jboss.com
- * @since 2.2
- */
-public class ReplicationListener
-{
- private CountDownLatch latch = new CountDownLatch(1);
- private Set<Class<? extends ReplicableCommand>> expectedCommands;
- private Configuration config;
- private static Map <Class<? extends WriteCommand>, Class<? extends WriteCommand>> mvcc2PessMap =
- new HashMap<Class<? extends WriteCommand>, Class<? extends WriteCommand>>();
- static
- {
- mvcc2PessMap.put(ClearDataCommand.class, PessClearDataCommand.class);
- mvcc2PessMap.put(MoveCommand.class, PessMoveCommand.class);
- mvcc2PessMap.put(PutDataMapCommand.class, PessPutDataMapCommand.class);
- mvcc2PessMap.put(PutForExternalReadCommand.class, PessPutForExternalReadCommand.class);
- mvcc2PessMap.put(PutKeyValueCommand.class, PessPutKeyValueCommand.class);
- mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
- mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
- }
-
- /**
- * Builds a listener that will observe the given cache for recieving replication commands.
- */
- public ReplicationListener(Cache cache)
- {
- ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
- RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
- CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
- RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
- RpcDispatcher.Marshaller2 delegate = null;
- if ((realMarshaller instanceof RegionMarshallerDelegate) || (realMarshaller instanceof MarshallerDelegate))
- {
- throw new RuntimeException("Illegal state");
- }
- if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
- delegate = new RegionMarshallerDelegate((Marshaller) realMarshaller);
- else
- delegate = new MarshallerDelegate(realMarshaller);
- realDispatcher.setMarshaller(delegate);
- realDispatcher.setRequestMarshaller(delegate);
- realDispatcher.setResponseMarshaller(delegate);
- this.config = cache.getConfiguration();
- }
-
- /**
- * Based on cache's configuration, will know for what specific commands to expect to be replicated.
- * E.g. async replication with a tx, would expect only a PrepareCommand (async is 1PC). sync repl with tx would expect
- * a prepare and a commit (sync is 2pc).
- * @param inTx do you expect replication to occur as result of a tx.commit?
- */
- public void smartExpect(Class<? extends WriteCommand> writeCommand, boolean inTx)
- {
- if (config.getCacheMode().equals(Configuration.CacheMode.INVALIDATION_ASYNC) || config.getCacheMode().equals(Configuration.CacheMode.INVALIDATION_SYNC))
- {
- expect(InvalidateCommand.class);
- return;
- }
- if (inTx)
- {
- expect(PrepareCommand.class);
- if (config.getCacheMode().isSynchronous())
- {
- expect(CommitCommand.class);
- }
- return;
- }
- if (config.getNodeLockingScheme().equals(Configuration.NodeLockingScheme.PESSIMISTIC))
- {
- expect(getPessCommand(writeCommand));
- }
- }
-
- private Class<? extends ReplicableCommand> getPessCommand(Class<? extends WriteCommand> writeCommand)
- {
- Class<? extends ReplicableCommand> result = mvcc2PessMap.get(writeCommand);
- if (result == null) throw new IllegalStateException("Unknown command: " + writeCommand);
- return result;
- }
-
- private class MarshallerDelegate implements RpcDispatcher.Marshaller2
- {
- RpcDispatcher.Marshaller2 marshaller;
-
- private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller)
- {
- this.marshaller = marshaller;
- }
-
- public byte[] objectToByteBuffer(Object obj) throws Exception
- {
- return marshaller.objectToByteBuffer(obj);
- }
-
- public Object objectFromByteBuffer(byte bytes[]) throws Exception
- {
- Object result = marshaller.objectFromByteBuffer(bytes);
- if (result instanceof ReplicateCommand && expectedCommands != null)
- {
- ReplicateCommand replicateCommand = (ReplicateCommand) result;
- return new ReplicateCommandDelegate(replicateCommand);
- }
- return result;
- }
-
- public Buffer objectToBuffer(Object o) throws Exception
- {
- return marshaller.objectToBuffer(o);
- }
-
- public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
- {
- Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
- if (result instanceof ReplicateCommand && expectedCommands != null)
- {
- ReplicateCommand replicateCommand = (ReplicateCommand) result;
- return new ReplicateCommandDelegate(replicateCommand);
- }
- return result;
- }
- }
-
- /**
- * We want the notification to be performed only *after* the remote command is executed.
- */
- private class ReplicateCommandDelegate extends ReplicateCommand
- {
- ReplicateCommand realOne;
-
- private ReplicateCommandDelegate(ReplicateCommand realOne)
- {
- this.realOne = realOne;
- }
-
- @Override
- public Object perform(InvocationContext ctx) throws Throwable
- {
- try
- {
- return realOne.perform(ctx);
- }
- finally
- {
- System.out.println("Processed command: " + realOne);
- Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
- while (it.hasNext())
- {
- Class<? extends ReplicableCommand> replicableCommandClass = it.next();
- if (realOne.containsCommandType(replicableCommandClass))
- {
- it.remove();
- }
- else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
- {
- PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
- if (prepareCommand.containsModificationType(replicableCommandClass))
- {
- it.remove();
- }
- }
- }
- if (expectedCommands.isEmpty())
- {
- latch.countDown();
- }
- }
- }
- }
-
- /**
- * Needed for region based marshalling.
- */
- private class RegionMarshallerDelegate extends AbstractMarshaller
- {
- private Marshaller realOne;
-
- private RegionMarshallerDelegate(Marshaller realOne)
- {
- this.realOne = realOne;
- }
-
- public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
- {
- realOne.objectToObjectStream(obj, out);
- }
-
- public Object objectFromObjectStream(ObjectInputStream in) throws Exception
- {
- return realOne.objectFromObjectStream(in);
- }
-
- public Object objectFromStream(InputStream is) throws Exception
- {
- return realOne.objectFromStream(is);
- }
-
- public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
- {
- realOne.objectToObjectStream(obj, out, region);
- }
-
- public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
- {
- RegionalizedMethodCall result = realOne.regionalizedMethodCallFromByteBuffer(buffer);
- if (result.command instanceof ReplicateCommand && expectedCommands != null)
- {
- ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
- result.command = new ReplicateCommandDelegate(replicateCommand);
- }
- return result;
- }
-
- public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
- {
- return realOne.regionalizedMethodCallFromObjectStream(in);
- }
-
- public ByteBuffer objectToBuffer(Object o) throws Exception
- {
- return realOne.objectToBuffer(o);
- }
-
- public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
- {
- return realOne.objectFromByteBuffer(bytes, i, i1);
- }
- }
-
- /**
- * Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
- * if replication does not occur in the give timeout then an exception is being thrown.
- */
- public void waitForReplicationToOccur(long timeoutMillis)
- {
- System.out.println("enter... ReplicationListener.waitForReplicationToOccur");
- waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
- System.out.println("exit... ReplicationListener.waitForReplicationToOccur");
- }
-
- /**
- * Similar to {@link #waitForReplicationToOccur(long)} except that this method provides more flexibility in time units.
- *
- * @param timeout the maximum time to wait
- * @param timeUnit the time unit of the <tt>timeout</tt> argument.
- */
- public void waitForReplicationToOccur(long timeout, TimeUnit timeUnit)
- {
- assert expectedCommands != null : "there are no replication expectations; please use AsyncReplicationListener.expect(...) before calling this method";
- try
- {
- if (!latch.await(timeout, timeUnit))
- {
- assert false : "waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
- }
- }
- catch (InterruptedException e)
- {
- throw new IllegalStateException("unexpected", e);
- }
- finally
- {
- expectedCommands = null;
- latch = new CountDownLatch(1);
- }
- }
-
- /**
- * {@link #waitForReplicationToOccur(long)} will block untill all the commands specified here are being replicated
- * to this cache. The method can be called several times with various arguments.
- */
- public void expect(Class<? extends ReplicableCommand>... expectedCommands)
- {
- if (this.expectedCommands == null)
- {
- this.expectedCommands = new HashSet<Class<? extends ReplicableCommand>>();
- }
- this.expectedCommands.addAll(Arrays.asList(expectedCommands));
- }
-
- /**
- * Waits untill first command is replicated.
- */
- public void expectAny()
- {
- expect();
- }
-}
Added: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/InvalidationReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/InvalidationReplicationListener.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/InvalidationReplicationListener.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,56 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.legacy.write.VersionedInvalidateCommand;
+import org.jboss.cache.commands.write.InvalidateCommand;
+import org.jboss.cache.commands.write.PutForExternalReadCommand;
+import org.jboss.cache.Cache;
+import org.jboss.cache.config.Configuration;
+
+/**
+ * Specialization of ReplicationListener for caches that use invalidation.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class InvalidationReplicationListener extends ReplicationListener
+{
+
+ Class<? extends InvalidateCommand> base;
+
+ public InvalidationReplicationListener(Cache cache)
+ {
+ super(cache);
+ if (cache.getConfiguration().getNodeLockingScheme().equals(Configuration.NodeLockingScheme.OPTIMISTIC))
+ {
+ base = VersionedInvalidateCommand.class;
+ } else
+ {
+ base = InvalidateCommand.class;
+ }
+ }
+
+ public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+ {
+ expectInvalidations(expectedCommands);
+ }
+
+ public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands)
+ {
+ expectInvalidations(writeCommands);
+ }
+
+ private void expectInvalidations(Class<? extends ReplicableCommand>... commands)
+ {
+ for (Class<? extends ReplicableCommand> command : commands)
+ {
+ if (command.equals(PutForExternalReadCommand.class))
+ {
+ internalExpect();//so that the map won't be empty
+ }
+ else
+ {
+ internalExpect(base);
+ }
+ }
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/MvccReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/MvccReplicationListener.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/MvccReplicationListener.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,34 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+
+/**
+ * Specialization of ReplicationListener for mvcc caches.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class MvccReplicationListener extends ReplicationListener
+{
+ public MvccReplicationListener(Cache cache)
+ {
+ super(cache);
+ }
+
+ /**
+ * all the commands should be received at the other end.
+ */
+ public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+ {
+ internalExpect(expectedCommands);
+ }
+
+ public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands)
+ {
+ internalExpect(PrepareCommand.class);
+ //this is because for async replication we have an 1pc transaction
+ if (config.getCacheMode().isSynchronous()) internalExpect(CommitCommand.class);
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,80 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Specialization of ReplicationListener for optimistic caches.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class OptimisticReplicationListener extends ReplicationListener
+{
+ public OptimisticReplicationListener(Cache cache)
+ {
+ super(cache);
+ }
+
+ /**
+ * For each command, expect an OptimisticPrepare and an commit.
+ */
+ public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+ {
+ //in the case of optimistic replication, an prepare and an commit is expected foe each node
+ for (Class<? extends ReplicableCommand> command : expectedCommands)
+ {
+ internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
+ }
+ }
+
+ /**
+ * For all given commands expect a single prepare, and a single commit.
+ */
+ public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands)
+ {
+ internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
+ }
+
+ protected void postCommandExecution(ReplicateCommand realOne)
+ {
+ int initialExpectations = expectedCommands.size();
+ List<ReplicableCommand> mods = getAllModifications(realOne);
+ for (Iterator<Class<? extends ReplicableCommand>> typeIt = expectedCommands.iterator(); typeIt.hasNext();)
+ {
+ Class<? extends ReplicableCommand> commadType = typeIt.next();
+ Iterator<ReplicableCommand> instanceIt = mods.iterator();
+ while (instanceIt.hasNext())
+ {
+ ReplicableCommand replicableCommand = instanceIt.next();
+ if (replicableCommand.getClass().equals(commadType))
+ {
+ instanceIt.remove();
+ typeIt.remove();
+ }
+ }
+ }
+ int remainingExpectations = expectedCommands.size();
+ System.out.println("Processed command: " + realOne + ". " + (initialExpectations - remainingExpectations) + " identified, remaining " + remainingExpectations);
+ }
+
+ private List<ReplicableCommand> getAllModifications(ReplicateCommand realOne)
+ {
+ List<ReplicableCommand> result = new ArrayList<ReplicableCommand>();
+ if (realOne.isSingleCommand())
+ {
+ result.add(realOne.getSingleModification());
+ }
+ else
+ {
+ result.addAll(realOne.getModifications());
+ }
+ return result;
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,71 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.legacy.write.*;
+import org.jboss.cache.commands.write.*;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Specialization of ReplicationListener for optimistic caches.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class PessReplicationListener extends ReplicationListener
+{
+ private static Map<Class<? extends ReplicableCommand>, Class<? extends ReplicableCommand>> mvcc2PessMap =
+ new HashMap<Class<? extends ReplicableCommand>, Class<? extends ReplicableCommand>>();
+
+ static
+ {
+ mvcc2PessMap.put(ClearDataCommand.class, PessClearDataCommand.class);
+ mvcc2PessMap.put(MoveCommand.class, PessMoveCommand.class);
+ mvcc2PessMap.put(PutDataMapCommand.class, PessPutDataMapCommand.class);
+ mvcc2PessMap.put(PutForExternalReadCommand.class, PessPutForExternalReadCommand.class);
+ mvcc2PessMap.put(PutKeyValueCommand.class, PessPutKeyValueCommand.class);
+ mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
+ mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
+ }
+
+
+ public PessReplicationListener(Cache cache)
+ {
+ super(cache);
+ }
+
+ /**
+ * In this scenario, all the commands shold be replaced wiht their pessimistic cunterparts.
+ */
+ public void expect(Class<? extends ReplicableCommand>... expectedCommands)
+ {
+ for (Class<? extends ReplicableCommand> command: expectedCommands)
+ {
+ super.internalExpect(getPessCommand(command));
+ }
+ }
+
+ public void expectWithTx(Class<? extends ReplicableCommand>... commands)
+ {
+ for (Class<? extends ReplicableCommand> command : commands)
+ {
+ if (command.equals(PessPutForExternalReadCommand.class))
+ {
+ throw new IllegalArgumentException("PFER are not part of tx, use no-tx .expect()");
+ }
+ }
+ internalExpect(PrepareCommand.class);
+ if (config.getCacheMode().isSynchronous())
+ {
+ internalExpect(CommitCommand.class);
+ }
+ }
+
+ private Class<? extends ReplicableCommand> getPessCommand(Class<? extends ReplicableCommand> command)
+ {
+ return mvcc2PessMap.get((Class<? extends ReplicableCommand>) command);
+ }
+}
Copied: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java (from rev 7147, core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java)
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java 2008-11-19 11:57:48 UTC (rev 7163)
@@ -0,0 +1,336 @@
+package org.jboss.cache.util.internals.replicationlisteners;
+
+import org.jboss.cache.*;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.io.ByteBuffer;
+import org.jboss.cache.marshall.AbstractMarshaller;
+import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
+import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
+import org.jboss.cache.marshall.Marshaller;
+import org.jboss.cache.marshall.RegionalizedMethodCall;
+import org.jboss.cache.util.TestingUtil;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class that notifies when certain commands were asynchronously replicated on secondary cache.
+ * Especially useful for avaoiding Thread.sleep() statements.
+ * <p/>
+ * Usage:
+ * <pre>
+ * no tx:
+ * Cache c1, c2; //these being two async caches
+ * ReplicationListener listener2 = ReplicationListener.getReplicationListener(c2);
+ * listener2.expect(PutKeyValueCommand.class);
+ * c1.put(fqn, key, value);
+ * listener2.waitForReplicationToOccur(1000); // -this will block here untill c2 recieves the PutKeyValueCommand command
+ * with tx: (difference is that expectWithTx is used insted of expect
+ * Cache c1, c2; //these being two async caches
+ * ReplicationListener listener2 = ReplicationListener.getReplicationListener(c2);
+ * listener2.expectWithTx(PutKeyValueCommand.class);
+ * txManager.begin();
+ * c1.put(fqn, key, value);
+ * txManager.commit();
+ * listener2.waitForReplicationToOccur(1000); // -this will block here untill c2 recieves the PutKeyValueCommand command
+ * </pre>
+ *
+ *
+ * Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the object returns to the
+ * non-initialized state and *can* be reused through expect-wait cycle.
+ * <b>Note</b>: this class might be used aswell for sync caches, e.g. a test could have subclasses which use sync and
+ * async replication
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+abstract public class ReplicationListener
+{
+
+ public static final long DEFAULT_TIMEOUT = 10000;
+ private CountDownLatch latch = new CountDownLatch(1);
+ protected List<Class<? extends ReplicableCommand>> expectedCommands;
+ protected Configuration config;
+
+ /**
+ * Builds a listener that will observe the given cache for recieving replication commands.
+ */
+ protected ReplicationListener(Cache cache)
+ {
+ ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
+ RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
+ CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
+ RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
+ RpcDispatcher.Marshaller2 delegate = null;
+ if ((realMarshaller instanceof RegionMarshallerDelegate) || (realMarshaller instanceof MarshallerDelegate))
+ {
+ throw new RuntimeException("Illegal state");
+ }
+ if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
+ delegate = new RegionMarshallerDelegate((Marshaller) realMarshaller);
+ else
+ delegate = new MarshallerDelegate(realMarshaller);
+ realDispatcher.setMarshaller(delegate);
+ realDispatcher.setRequestMarshaller(delegate);
+ realDispatcher.setResponseMarshaller(delegate);
+ this.config = cache.getConfiguration();
+ }
+
+ protected ReplicationListener()
+ {
+ }
+
+ abstract public void expect(Class<? extends ReplicableCommand>... expectedCommands);
+
+ /**
+ * Based on cache's configuration, will know for what specific commands to expect to be replicated.
+ * E.g. async replication with a tx, would expect only a PrepareCommand (async is 1PC). sync repl with tx would expect
+ * a prepare and a commit (sync is 2pc).
+ */
+ abstract public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands);
+
+ /**
+ * Factory method, to be used in order to obtain a replication listener based on a cache config.
+ */
+ public static ReplicationListener getReplicationListener(Cache cache)
+ {
+ if (cache.getConfiguration().getCacheMode().isInvalidation())
+ {
+ return new InvalidationReplicationListener(cache);
+ }
+ if (cache.getConfiguration().getNodeLockingScheme().equals(Configuration.NodeLockingScheme.OPTIMISTIC))
+ {
+ return new OptimisticReplicationListener(cache);
+ }
+ else if (cache.getConfiguration().getNodeLockingScheme().equals(Configuration.NodeLockingScheme.PESSIMISTIC))
+ {
+ return new PessReplicationListener(cache);
+ } else
+ {
+ return new MvccReplicationListener(cache);
+ }
+ }
+
+ private class MarshallerDelegate implements RpcDispatcher.Marshaller2
+ {
+ RpcDispatcher.Marshaller2 marshaller;
+
+ private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller)
+ {
+ this.marshaller = marshaller;
+ }
+
+ public byte[] objectToByteBuffer(Object obj) throws Exception
+ {
+ return marshaller.objectToByteBuffer(obj);
+ }
+
+ public Object objectFromByteBuffer(byte bytes[]) throws Exception
+ {
+ Object result = marshaller.objectFromByteBuffer(bytes);
+ if (result instanceof ReplicateCommand && expectedCommands != null)
+ {
+ ReplicateCommand replicateCommand = (ReplicateCommand) result;
+ return new ReplicateCommandDelegate(replicateCommand);
+ }
+ return result;
+ }
+
+ public Buffer objectToBuffer(Object o) throws Exception
+ {
+ return marshaller.objectToBuffer(o);
+ }
+
+ public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+ {
+ Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
+ if (result instanceof ReplicateCommand && expectedCommands != null)
+ {
+ ReplicateCommand replicateCommand = (ReplicateCommand) result;
+ return new ReplicateCommandDelegate(replicateCommand);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * We want the notification to be performed only *after* the remote command is executed.
+ */
+ private class ReplicateCommandDelegate extends ReplicateCommand
+ {
+ ReplicateCommand realOne;
+
+ private ReplicateCommandDelegate(ReplicateCommand realOne)
+ {
+ this.realOne = realOne;
+ }
+
+ @Override
+ public Object perform(InvocationContext ctx) throws Throwable
+ {
+ try
+ {
+ return realOne.perform(ctx);
+ }
+ finally
+ {
+ postCommandExecution(realOne);
+ if (expectedCommands.isEmpty())
+ {
+ latch.countDown();
+ }
+ }
+ }
+ }
+
+
+ protected void postCommandExecution(ReplicateCommand realOne)
+ {
+ System.out.println("Processed command: " + realOne + "; expecting - " + expectedCommands.size() + " commands");
+ Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
+ while (it.hasNext())
+ {
+ Class<? extends ReplicableCommand> replicableCommandClass = it.next();
+ if (realOne.containsCommandType(replicableCommandClass))
+ {
+ it.remove();
+ } else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
+ {
+ PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
+ if (prepareCommand.containsModificationType(replicableCommandClass))
+ {
+ it.remove();
+ }
+ }
+ }
+ System.out.println("After process expecing : " + expectedCommands.size() + " commands");
+ }
+
+ /**
+ * Needed for region based marshalling.
+ */
+ private class RegionMarshallerDelegate extends AbstractMarshaller
+ {
+ private Marshaller realOne;
+
+ private RegionMarshallerDelegate(Marshaller realOne)
+ {
+ this.realOne = realOne;
+ }
+
+ public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
+ {
+ realOne.objectToObjectStream(obj, out);
+ }
+
+ public Object objectFromObjectStream(ObjectInputStream in) throws Exception
+ {
+ return realOne.objectFromObjectStream(in);
+ }
+
+ public Object objectFromStream(InputStream is) throws Exception
+ {
+ return realOne.objectFromStream(is);
+ }
+
+ public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
+ {
+ realOne.objectToObjectStream(obj, out, region);
+ }
+
+ public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
+ {
+ RegionalizedMethodCall result = realOne.regionalizedMethodCallFromByteBuffer(buffer);
+ if (result.command instanceof ReplicateCommand && expectedCommands != null)
+ {
+ ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
+ result.command = new ReplicateCommandDelegate(replicateCommand);
+ }
+ return result;
+ }
+
+ public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
+ {
+ return realOne.regionalizedMethodCallFromObjectStream(in);
+ }
+
+ public ByteBuffer objectToBuffer(Object o) throws Exception
+ {
+ return realOne.objectToBuffer(o);
+ }
+
+ public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+ {
+ return realOne.objectFromByteBuffer(bytes, i, i1);
+ }
+ }
+
+ /**
+ * Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
+ * if replication does not occur in the give timeout then an exception is being thrown.
+ */
+ public void waitForReplicationToOccur(long timeoutMillis)
+ {
+ System.out.println("enter... ReplicationListener.waitForReplicationToOccur");
+ waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
+ System.out.println("exit... ReplicationListener.waitForReplicationToOccur");
+ }
+
+ /**
+ * same as {@link #waitForReplicationToOccur(long)}, just that it uses the {@link #DEFAULT_TIMEOUT} for timeout.
+ */
+ public void waitForReplicationToOccur()
+ {
+ waitForReplicationToOccur(DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * Similar to {@link #waitForReplicationToOccur(long)} except that this method provides more flexibility in time units.
+ *
+ * @param timeout the maximum time to wait
+ * @param timeUnit the time unit of the <tt>timeout</tt> argument.
+ */
+ public void waitForReplicationToOccur(long timeout, TimeUnit timeUnit)
+ {
+ assert expectedCommands != null : "there are no replication expectations; please use AsyncReplicationListener.expectWithTx(...) before calling this method";
+ try
+ {
+ if (!expectedCommands.isEmpty() && !latch.await(timeout, timeUnit))
+ {
+ assert false : "waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("unexpected", e);
+ }
+ finally
+ {
+ expectedCommands = null;
+ latch = new CountDownLatch(1);
+ }
+ }
+
+ /**
+ * {@link #waitForReplicationToOccur(long)} will block untill all the commands specified here are being replicated
+ * to this cache. The method can be called several times with various arguments.
+ */
+ protected void internalExpect(Class<? extends ReplicableCommand>... expectedCommands)
+ {
+ if (this.expectedCommands == null)
+ {
+ this.expectedCommands = new ArrayList<Class<? extends ReplicableCommand>>();
+ }
+ this.expectedCommands.addAll(Arrays.asList(expectedCommands));
+ }
+}
More information about the jbosscache-commits
mailing list