Author: mircea.markus
Date: 2008-11-21 11:35:19 -0500 (Fri, 21 Nov 2008)
New Revision: 7184
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
Log:
test concurrency
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-11-21 16:28:39 UTC
(rev 7183)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-11-21 16:35:19 UTC
(rev 7184)
@@ -120,7 +120,7 @@
private InterceptorChain interceptorChain;
private boolean isUsingBuddyReplication;
- private boolean isInLocalMode;
+ private volatile boolean isInLocalMode;
private ComponentRegistry componentRegistry;
private LockManager lockManager;
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -322,7 +322,7 @@
if (trace) log.trace("Requesting data gravitation for Fqn " + fqn);
List<Address> mbrs = rpcManager.getMembers();
- Boolean searchSubtrees = buddyManager.isDataGravitationSearchBackupTrees() ?
Boolean.TRUE : Boolean.FALSE;
+ Boolean searchSubtrees = buddyManager.isDataGravitationSearchBackupTrees();
GravitateDataCommand command = commandsFactory.buildGravitateDataCommand(fqn,
searchSubtrees);
// doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could
return null results from nodes that do
// not have either the primary OR backup, and stop polling other valid nodes.
Modified:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -94,8 +94,8 @@
assertNoLocks(caches);
}
- public void testRemovalFromCluster2Buddies() throws Exception
- {
+ public void testRemovalFromCluster2Buddies() throws Throwable
+{
log.debug("Running testRemovalFromCluster2Buddies");
List<CacheSPI<Object, Object>> caches = createCaches(2, 4, false);
cachesTL.set(caches);
Modified:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -8,7 +8,9 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import org.jgroups.JChannel;
import org.jgroups.protocols.DISCARD;
import static org.testng.AssertJUnit.*;
@@ -88,11 +90,13 @@
System.out.println("***** Killed original data owner, about to call a get on a
different cache instance. *****");
+ ReplicationListener replListener =
ReplicationListener.getReplicationListener(caches.get(1));
+ replListener.expect(DataGravitationCleanupCommand.class);
+
// according to data gravitation, a call to *any* cache should retrieve the data,
and move the data to the new cache.
assertEquals("Value should have gravitated", value,
caches.get(2).get(fqn, key));
+ replListener.waitForReplicationToOccur();
- delay(); // cleanup commands are async
-
TestingUtil.dumpCacheContents(caches);
// now lets test the eviction part of gravitation
@@ -173,9 +177,12 @@
System.out.println("*** Calling get() on cache[1] with force option");
+ ReplicationListener replicationListener0 =
ReplicationListener.getReplicationListener(caches.get(0));
+ replicationListener0.expect(DataGravitationCleanupCommand.class);
+
caches.get(1).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
assertEquals("value", caches.get(1).get(fqn, key));
- delay(); // cleanup commands are async
+ replicationListener0.waitForReplicationToOccur(); // cleanup commands are async
TestingUtil.dumpCacheContents(caches);
@@ -218,8 +225,10 @@
assertNoLocks(caches);
// gravitate to 2:
+ ReplicationListener replListener1 =
ReplicationListener.getReplicationListener(caches.get(0));
+ replListener1.expect(DataGravitationCleanupCommand.class);
caches.get(2).getNode(fqn); // expectWithTx entire subtree to gravitate.
- delay(); // cleanup commands are async
+ replListener1.waitForReplicationToOccur(); // cleanup commands are async
Fqn newBackupFqn = fqnTransformer.getBackupFqn(caches.get(2).getLocalAddress(),
fqn);
Fqn newBackupFqn2 = fqnTransformer.getBackupFqn(caches.get(2).getLocalAddress(),
fqn2);
@@ -248,9 +257,4 @@
assertNoLocks(caches);
}
-
- protected void delay()
- {
- TestingUtil.sleepThread(250);
- }
}
Modified:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -95,9 +95,9 @@
}
cachesTL.set(null);
System.gc();
-
+
new UnitTestCacheFactory().cleanUp();
-
+
}
protected final static int VIEW_BLOCK_TIMEOUT = 5000;
@@ -131,7 +131,7 @@
{
CacheSPI<Object, Object> c = (CacheSPI<Object, Object>) new
UnitTestCacheFactory<Object,
Object>().createCache(UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC,
false, false, true), false);
- String threadId = Thread.currentThread().getName();
+ String threadId = Thread.currentThread().getName();
//c.getConfiguration().setClusterName("BuddyReplicationTest-" +
threadId);
BuddyReplicationConfig brc = new BuddyReplicationConfig();
@@ -321,7 +321,7 @@
public void waitForBuddy(Cache dataOwner, Cache buddy, boolean onlyBuddy) throws
Exception
{
- waitForBuddy(dataOwner, buddy, onlyBuddy, 60000);
+ waitForBuddy(dataOwner, buddy, onlyBuddy, 60000);
}
/**
@@ -333,7 +333,7 @@
long timeout = 60000 + caches.length;
for (int i = 0; i < caches.length - 1; i++)
{
- waitForBuddy(caches[i], caches[i+1], true, timeout);
+ waitForBuddy(caches[i], caches[i + 1], true, timeout);
}
waitForBuddy(caches[caches.length - 1], caches[0], true, timeout);
}
@@ -367,7 +367,7 @@
BuddyGroup group =
buddyBuddyManager.buddyGroupsIParticipateIn.get(dataOwnerLocalAddress);
result = result &
buddyBuddyManager.buddyGroupsIParticipateIn.containsKey(dataOwnerLocalAddress);
if (onlyBuddy) result = result && group.getBuddies().size() == 1;
- result = result & group!= null && group.getBuddies() != null &&
group.getBuddies().contains(buddyLocalAddress);
+ result = result & group != null && group.getBuddies() != null
&& group.getBuddies().contains(buddyLocalAddress);
return result;
}
Modified: core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -53,9 +53,7 @@
Configuration c1 = new Configuration();
Configuration c2 = new Configuration();
- //c1.setClusterName("CCL-Test-" + Thread.currentThread().getName());
c1.setStateRetrievalTimeout(2000);
- //c2.setClusterName("CCL-Test-" + Thread.currentThread().getName());
c2.setStateRetrievalTimeout(2000);
c1.setCacheMode(Configuration.CacheMode.REPL_SYNC);
c2.setCacheMode(Configuration.CacheMode.REPL_SYNC);
@@ -69,6 +67,8 @@
cache1 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object,
Object>().createCache(c1, false);
cache2 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object,
Object>().createCache(c2, false);
+ cache1.getConfiguration().setSerializationExecutorPoolSize(0);
+ cache2.getConfiguration().setSerializationExecutorPoolSize(0);
if (useRegionBasedMarshalling)
@@ -225,7 +225,7 @@
assertTrue("should exist", loader2.exists(fqn));
}
- public void testCacheLoaderThreadSafety() throws Exception
+ public void testCacheLoaderThreadSafety() throws Throwable
{
threadSafetyTest(true);
}
Modified:
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -724,10 +724,8 @@
assertNull("should be invalidated", cache2.get(fqn, key));
}
- // now cache2
- replListener1.expect(PutDataMapCommand.class);
+ //do not expect replication for this one as the node is already thre
Node node2 = root2.addChild(fqn);
- replListener1.waitForReplicationToOccur();
mgr = cache2.getTransactionManager();
replListener1.expectWithTx(PutKeyValueCommand.class);
Modified:
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -27,10 +27,17 @@
*/
public void expect(Class<? extends ReplicableCommand>... expectedCommands)
{
- //in the case of optimistic replication, an prepare and an commit is expected foe
each node
+ //in the case of optimistic replication, an prepare and an commit is expected foe
each node
for (Class<? extends ReplicableCommand> command : expectedCommands)
{
- internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
+ if (isRemoteCommand(command))
+ {
+ internalExpect(command);
+ }
+ else
+ {
+ internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
+ }
}
}
@@ -42,7 +49,7 @@
internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
}
- protected void postCommandExecution(ReplicateCommand realOne)
+ protected void postReplicateExecution(ReplicateCommand realOne)
{
int initialExpectations = expectedCommands.size();
List<ReplicableCommand> mods = getAllModifications(realOne);
Modified:
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -2,6 +2,7 @@
import org.jboss.cache.Cache;
import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.legacy.write.*;
@@ -29,6 +30,7 @@
mvcc2PessMap.put(PutKeyValueCommand.class, PessPutKeyValueCommand.class);
mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
+ mvcc2PessMap.put(DataGravitationCleanupCommand.class,
DataGravitationCleanupCommand.class);
}
@@ -66,6 +68,8 @@
private Class<? extends ReplicableCommand> getPessCommand(Class<? extends
ReplicableCommand> command)
{
- return mvcc2PessMap.get((Class<? extends ReplicableCommand>) command);
+ Class<? extends ReplicableCommand> result = mvcc2PessMap.get((Class<?
extends ReplicableCommand>) command);
+ if (result == null) throw new IllegalStateException("Unknown command: " +
command);
+ return result;
}
}
Modified:
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java 2008-11-21
16:28:39 UTC (rev 7183)
+++
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java 2008-11-21
16:35:19 UTC (rev 7184)
@@ -3,7 +3,8 @@
import org.jboss.cache.*;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.commands.remote.*;
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.io.ByteBuffer;
@@ -15,6 +16,7 @@
import org.jboss.cache.util.TestingUtil;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Buffer;
+import org.jgroups.Address;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -61,6 +63,7 @@
private CountDownLatch latch = new CountDownLatch(1);
protected List<Class<? extends ReplicableCommand>> expectedCommands;
protected Configuration config;
+ protected final Address localAddress;
/**
* Builds a listener that will observe the given cache for recieving replication
commands.
@@ -84,10 +87,12 @@
realDispatcher.setRequestMarshaller(delegate);
realDispatcher.setResponseMarshaller(delegate);
this.config = cache.getConfiguration();
+ this.localAddress = cache.getLocalAddress();
}
protected ReplicationListener()
{
+ localAddress = null;
}
abstract public void expect(Class<? extends ReplicableCommand>...
expectedCommands);
@@ -138,44 +143,51 @@
public Object objectFromByteBuffer(byte bytes[]) throws Exception
{
Object result = marshaller.objectFromByteBuffer(bytes);
- if (result instanceof ReplicateCommand && expectedCommands != null)
- {
- ReplicateCommand replicateCommand = (ReplicateCommand) result;
- return new ReplicateCommandDelegate(replicateCommand);
- }
- return result;
+ return process(result);
}
- public Buffer objectToBuffer(Object o) throws Exception
+ public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
{
- return marshaller.objectToBuffer(o);
+ Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
+ return process(result);
}
- public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+ private Object process(Object result)
{
- Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
- if (result instanceof ReplicateCommand && expectedCommands != null)
+ System.out.println(localAddress + " >>>> " + result );
+ boolean isReplicate = result instanceof ReplicateCommand;
+ boolean isRemote = result != null &&
isRemoteCommand(result.getClass());
+ if ((isReplicate || isRemote) && (expectedCommands != null))
{
- ReplicateCommand replicateCommand = (ReplicateCommand) result;
- return new ReplicateCommandDelegate(replicateCommand);
+ ReplicableCommand replicateCommand = (ReplicableCommand) result;
+ result = new ReplicableCommandDelegate(replicateCommand);
}
return result;
}
+
+ public Buffer objectToBuffer(Object o) throws Exception
+ {
+ return marshaller.objectToBuffer(o);
+ }
}
/**
* We want the notification to be performed only *after* the remote command is
executed.
*/
- private class ReplicateCommandDelegate extends ReplicateCommand
+ private class ReplicableCommandDelegate implements ReplicableCommand
{
- ReplicateCommand realOne;
+ ReplicableCommand realOne;
- private ReplicateCommandDelegate(ReplicateCommand realOne)
+ private ReplicableCommandDelegate(ReplicableCommand realOne)
{
+ if (realOne instanceof VisitableCommand)
+ {
+ throw new IllegalArgumentException("Visitable commands not allowed!!!
;recieved="
+ + realOne.getClass().getName());
+ }
this.realOne = realOne;
}
- @Override
public Object perform(InvocationContext ctx) throws Throwable
{
try
@@ -184,19 +196,45 @@
}
finally
{
- postCommandExecution(realOne);
+ if (realOne instanceof ReplicateCommand)
+ {
+ postReplicateExecution((ReplicateCommand)realOne);
+ }
+ else
+ {
+ postNonVisitableExecution(realOne);
+ }
if (expectedCommands.isEmpty())
{
latch.countDown();
}
}
}
+
+ public int getCommandId()
+ {
+ return realOne.getCommandId();
+ }
+
+ public Object[] getParameters()
+ {
+ return realOne.getParameters();
+ }
+
+ public void setParameters(int commandId, Object[] parameters)
+ {
+ realOne.setParameters(commandId, parameters);
+ }
}
-
- protected void postCommandExecution(ReplicateCommand realOne)
+ protected void postNonVisitableExecution(ReplicableCommand realOne)
{
- System.out.println("Processed command: " + realOne + "; expecting -
" + expectedCommands.size() + " commands");
+ expectedCommands.remove(realOne.getClass());
+ }
+
+ protected void postReplicateExecution(ReplicateCommand realOne)
+ {
+ System.out.println("Processed command: " + realOne + "; expecting
" + expectedCommands.size() + " commands");
Iterator<Class<? extends ReplicableCommand>> it =
expectedCommands.iterator();
while (it.hasNext())
{
@@ -254,7 +292,7 @@
if (result.command instanceof ReplicateCommand && expectedCommands !=
null)
{
ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
- result.command = new ReplicateCommandDelegate(replicateCommand);
+ result.command = new ReplicableCommandDelegate(replicateCommand);
}
return result;
}
@@ -333,4 +371,11 @@
}
this.expectedCommands.addAll(Arrays.asList(expectedCommands));
}
+
+ protected boolean isRemoteCommand(Class clazz)
+ {
+ return clazz.equals(AnnounceBuddyPoolNameCommand.class) ||
clazz.equals(AssignToBuddyGroupCommand.class) ||
+ clazz.equals(ClusteredGetCommand.class) ||
clazz.equals(DataGravitationCleanupCommand.class) ||
+ clazz.equals(RemoveFromBuddyGroupCommand.class) ||
clazz.equals(ReplicateCommand.class);
+ }
}