Author: mircea.markus
Date: 2008-05-30 10:27:41 -0400 (Fri, 30 May 2008)
New Revision: 5919
Modified:
core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java
core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java
core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java
core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java
core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java
core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java
Log:
JBCACHE-1338 - removed some Thread.sleep() statements and finalized the
ReplicationListener - now it also works with region based marshalling
Modified: core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -287,10 +287,10 @@
*/
protected void processAddedNodes(EvictedEventNode evictedEventNode) throws
EvictionException
{
- processAddedNodes(evictedEventNode, evictedEventNode.getElementDifference(),
evictedEventNode.isResetElementCount());
+ processAddedNodes(evictedEventNode, evictedEventNode.getElementDifference());
}
- protected void processAddedNodes(EvictedEventNode evictedEventNode, int
numAddedElements, boolean resetElementCount) throws EvictionException
+ protected void processAddedNodes(EvictedEventNode evictedEventNode, int
numAddedElements) throws EvictionException
{
Fqn fqn = evictedEventNode.getFqn();
@@ -304,14 +304,7 @@
{
ne.setModifiedTimeStamp(evictedEventNode.getCreationTimestamp());
ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1);
- if (resetElementCount)
- {
- ne.setNumberOfElements(numAddedElements);
- }
- else
- {
- ne.setNumberOfElements(ne.getNumberOfElements() + numAddedElements);
- }
+ ne.setNumberOfElements(ne.getNumberOfElements() + numAddedElements);
if (trace)
{
log.trace("Queue already contains " + ne.getFqn() + "
processing it as visited");
@@ -410,7 +403,7 @@
{
log.debug("Visiting node that was not added to eviction queues. Assuming
that it has 1 element.");
}
- this.processAddedNodes(evictedEventNode, 1, false);
+ this.processAddedNodes(evictedEventNode, 1);
return;
}
// note this method will visit and modify the node statistics by reference!
@@ -451,7 +444,7 @@
{
log.trace("Adding element " + fqn + " for a node that
doesn't exist yet. Process as an add.");
}
- this.processAddedNodes(evictedEventNode, 1, false);
+ this.processAddedNodes(evictedEventNode, 1);
return;
}
Modified: core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java 2008-05-29
15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -18,9 +18,8 @@
public class EvictedEventNode implements Cloneable
{
private Fqn fqn_;
- private NodeEventType type_;
- private int elementDifference_;
- private boolean resetElementCount_;
+ private NodeEventType type;
+ private int elementDifference;
private long inUseTimeout;
private long creationTimestamp;
@@ -53,24 +52,14 @@
this.inUseTimeout = inUseTimeout;
}
- public boolean isResetElementCount()
- {
- return this.resetElementCount_;
- }
-
- public void setResetElementCount(boolean resetElementCount)
- {
- this.resetElementCount_ = resetElementCount;
- }
-
public int getElementDifference()
{
- return elementDifference_;
+ return elementDifference;
}
public void setElementDifference(int elementDifference_)
{
- this.elementDifference_ = elementDifference_;
+ this.elementDifference = elementDifference_;
}
public Fqn getFqn()
@@ -85,18 +74,18 @@
public void setEventType(NodeEventType event)
{
- type_ = event;
+ type = event;
}
public NodeEventType getEventType()
{
- return type_;
+ return type;
}
@Override
public String toString()
{
- return "EvictedEN[fqn=" + fqn_ + " event=" + type_ + "
diff=" + elementDifference_ + "]";
+ return "EvictedEN[fqn=" + fqn_ + " event=" + type + "
diff=" + elementDifference + "]";
}
public EvictedEventNode clone(Fqn cloneFqn)
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -116,7 +116,6 @@
size = command.getData().size();
}
EvictedEventNode event = new EvictedEventNode(fqn,
NodeEventType.ADD_NODE_EVENT, size);
- event.setResetElementCount(false);
registerEvictionEventToRegionManager(event, r);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -14,8 +14,8 @@
* @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
* @since 2.1.1
*/
-class RegionalizedMethodCall
+public class RegionalizedMethodCall
{
- ReplicableCommand command;
- Fqn region;
+ public ReplicableCommand command;
+ public Fqn region;
}
Modified:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -41,7 +41,7 @@
caches.add(createCache(1, "TEST", false, true));
TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]),
VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
Fqn<String> test =
BuddyFqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(), main);
@@ -50,17 +50,18 @@
caches.add(createCache(1, "TEST", false, true));
TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]),
VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
assertNull("State not transferred", caches.get(2).get(test,
"name"));
// Make 2 the buddy of 0
caches.get(1).stop();
- caches.set(1, null);
+ caches.remove(1);
+ TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]),
VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
- assertEquals("State transferred", "Joe",
caches.get(2).get(test, "name"));
+ assertEquals("State transferred", "Joe",
caches.get(1).get(test, "name"));
}
public void testRegionBasedStateTransfer() throws Exception
@@ -88,7 +89,7 @@
caches.get(3).start();
TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]),
VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
Fqn fqnA = Fqn.fromString("/a");
Fqn fqnD = Fqn.fromString("/d");
@@ -160,7 +161,7 @@
caches.get(1).start();
TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]),
VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
Fqn test = BuddyFqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(),
main);
Modified:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -58,7 +58,7 @@
c1.activate();
cache1.put(A_B, "name", JOE);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
System.out.println("Cache dump BEFORE activation");
System.out.println("cache1 " + CachePrinter.printCacheDetails(cache1));
@@ -95,7 +95,7 @@
cache1.put(A_B, "name", JOE);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
assertNull("Should be no replication to inactive region", cache2.get(A_B,
"name"));
assertNull("Should be no replication to inactive backup region",
cache2.get(backupFqn, "name"));
Modified:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -47,7 +47,7 @@
caches = createCaches(2, 3, false);
TestingUtil.blockUntilViewsReceived(5000, caches.toArray(new Cache[0]));
- TestingUtil.sleepThread(2000);
+// TestingUtil.sleepThread(2000);
System.out.println("*** Testing cache 0");
assertIsBuddy(caches.get(0), caches.get(1), false);
Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -6,6 +6,7 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.RPCManager;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationQueueNotifier;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.ComponentRegistry;
@@ -102,7 +103,6 @@
assert totalInvocations % COUNT == 0 : "NumThreads and NumLoopsPerThread must
multiply to be a multiple of COUNT";
- int expectedReplications = totalInvocations / COUNT;
final CountDownLatch latch = new CountDownLatch(1);
// mock the RPCManager used in the cache
@@ -148,7 +148,9 @@
// now test results
verify(mockRpcManager);
- TestingUtil.sleepThread(250); // make sure the queue flushes
+ ReplicationQueueNotifier notifier = new ReplicationQueueNotifier(cache);
+ notifier.waitUntillAllReplicated(250);
+// TestingUtil.sleepThread(250); // make sure the queue flushes
assert replQ.elements.size() == 0;
}
@@ -161,7 +163,9 @@
cache.put("/a/b/c" + i, "key", "value");
assertNotNull(cache.get("/a/b/c" + i, "key"));
}
- TestingUtil.sleepThread(500);
+ ReplicationQueueNotifier notifier = new ReplicationQueueNotifier(cache);
+ notifier.waitUntillAllReplicated(500);
+// TestingUtil.sleepThread(500);
for (int i = 0; i < COUNT; i++) assertNotNull("on get i = " + i,
cache2.get("/a/b/c" + i, "key"));
}
}
Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -3,6 +3,7 @@
import org.jboss.cache.Cache;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationQueueNotifier;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.testng.annotations.AfterMethod;
@@ -42,7 +43,8 @@
// outside of tx scope
cache.put("/a", "k", "v");
- TestingUtil.sleepThread(200);
+ ReplicationQueueNotifier replicationQueueNotifier = new
ReplicationQueueNotifier(cache);
+ replicationQueueNotifier.waitUntillAllReplicated(200);
assert cache2.get("/a", "k").equals("v");
@@ -51,8 +53,10 @@
cache2.put("/a", "k", "v2");
txManager.commit();
- TestingUtil.sleepThread(200);
+ ReplicationQueueNotifier replicationQueueNotifier2 = new
ReplicationQueueNotifier(cache2);
+ replicationQueueNotifier2.waitUntillAllReplicated(200);
+
assert cache.get("/a", "k").equals("v2");
}
}
Modified:
core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -54,7 +54,9 @@
dataMap.put("k2", "v2");
Map expected = new HashMap(dataMap);
expected.putAll(node.getDataDirect());
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(testFqn, true,
NodeModifiedEvent.ModificationType.PUT_MAP, node.getData(), null);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(testFqn, false,
NodeModifiedEvent.ModificationType.PUT_MAP, expected, null);
control.replay();
Modified:
core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -44,9 +44,11 @@
{
nodes.adfNode.put("existingKey", "existingValue");
expect(container.peekStrict(globalTransaction, fqn,
false)).andReturn(nodes.adfNode);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.PUT_DATA,
nodes.adfNode.getDataDirect(), ctx);
Map expected = new HashMap();
expected.put("k", "v");
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, false,
NodeModifiedEvent.ModificationType.PUT_DATA, expected, ctx);
control.replay();
assert null == command.perform(ctx) : "no pre existing value";
@@ -68,9 +70,11 @@
{
nodes.adfNode.put("k", "oldValue");
expect(container.peekStrict(globalTransaction, fqn,
false)).andReturn(nodes.adfNode);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.PUT_DATA,
nodes.adfNode.getDataDirect(), ctx);
Map expected = new HashMap();
expected.put("k", "v");
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, false,
NodeModifiedEvent.ModificationType.PUT_DATA, expected, ctx);
control.replay();
assert "oldValue".equals(command.perform(ctx)) : "no pre existing
value";
Modified:
core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -41,9 +41,11 @@
expected.put("newKey","newValue");
nodes.adfgNode.putAll(expected);
expect(container.peek(fqn, false, false)).andReturn(nodes.adfgNode);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, true,
NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
expected = new HashMap();
expected.put(key,null);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, false,
NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
control.replay();
assert null == command.perform(ctx);
@@ -66,7 +68,9 @@
expected.put(key,"newValue");
nodes.adfgNode.putAll(expected);
expect(container.peek(fqn, false, false)).andReturn(nodes.adfgNode);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, true,
NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, false,
NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
control.replay();
assert "newValue" == command.perform(ctx);
Modified: core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -10,6 +10,7 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.EvictionController;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.EvictionConfig;
import org.jboss.cache.config.EvictionRegionConfig;
@@ -105,8 +106,9 @@
e.printStackTrace();
}
}
+ EvictionController evictionController = new EvictionController(cache);
+ evictionController.startEviction();
- TestingUtil.sleepThread(wakeupIntervalMillis + 500);
try
{
String val = (String) cache.get(rootStr + "3", rootStr +
"3");
@@ -136,7 +138,8 @@
cache.put(fqn, str, str);
}
- TestingUtil.sleepThread(wakeupIntervalMillis + 10000);
+ EvictionController evictionController = new EvictionController(cache);
+ evictionController.startEviction();
assertEquals("Number of nodes", maxNodes + 2, cache.getNumberOfNodes());
for (int i = 0; i < maxNodes; i++)
{
@@ -181,7 +184,8 @@
int period = wakeupIntervalMillis + 500;
log("sleeping for " + period + "ms");
- TestingUtil.sleepThread(period);// it really depends the eviction thread time.
+ EvictionController evictionController = new EvictionController(cache);
+ evictionController.startEviction();
try
{
for (int i = 0; i < 5; i++)
@@ -197,7 +201,7 @@
assertNotNull(cache.get(fqn, str));
}
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
// since it is FIFO if we leave it alone and revisit, cache should remain the
same.
for (int i = 5; i < 10; i++)
{
@@ -235,9 +239,9 @@
}
}
- int period = (wakeupIntervalMillis + 500);
- log("period is " + period);
- TestingUtil.sleepThread(period);
+ EvictionController evictionController = new EvictionController(cache);
+ evictionController.startEviction();
+
String str1 = rootStr + "7";
Fqn fqn1 = Fqn.fromString(str1);
String str2 = rootStr + "7/7";
@@ -247,11 +251,11 @@
assertNotNull(cache.get(fqn2, str2));
assertNotNull(cache.get(fqn1, str1));
cache.removeNode(fqn2);
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
assertNull(cache.get(fqn2, str2));
assertNotNull(cache.get(fqn1, str1));
cache.removeNode(fqn1);
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
assertNull(cache.get(fqn1, str1));
assertNull(cache.get(fqn2, str2));
@@ -262,12 +266,12 @@
assertNotNull(cache.get(fqn3, str3));
assertNotNull(cache.get(fqn4, str4));
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
// remove the node above fqn4 /org/jboss/test/5 will cascade the delete into
/org/jboss/test/5/5
cache.removeNode(fqn4);
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
assertNull(cache.get(fqn3, str3));
assertNull(cache.get(fqn4, str4));
}
Modified:
core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -344,7 +344,6 @@
PutDataMapCommand putCommand = commandsFactory.buildPutDataMapCommand(null, fqn,
data);
invoker.invoke(putCommand);
event = regionManager.getRegion(fqn.toString(), false).takeLastEventNode();
- assertFalse(event.isResetElementCount());
assertEquals(NodeEventType.ADD_NODE_EVENT, event.getEventType());
assertEquals(fqn, event.getFqn());
assertEquals(100, event.getElementDifference());
@@ -366,7 +365,6 @@
assertEquals(NodeEventType.ADD_NODE_EVENT, event.getEventType());
assertEquals(fqn, event.getFqn());
assertEquals(100, event.getElementDifference());
- assertTrue(event.isResetElementCount());
assertNull(regionManager.getRegion(fqn.toString(), false).takeLastEventNode());
Modified:
core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -14,6 +14,7 @@
import org.jboss.cache.Node;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.config.CacheLoaderConfig;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.XmlConfigurationParser;
@@ -119,25 +120,34 @@
cache2.start();
Fqn fqn = Fqn.fromString("/a/b");
+ ReplicationListener replListener1 = new ReplicationListener(cache1);
+ ReplicationListener replListener2 = new ReplicationListener(cache2);
+
+ replListener2.expectAny();
cache1.put(fqn, "key", "value");
- TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+ replListener2.waitForReplicationToOccur(500);
+// TestingUtil.sleepThread(500);// give it time to broadcast the evict call
// test that this has NOT replicated, but rather has been invalidated:
assertEquals("value", cache1.get(fqn, "key"));
assertNull("Should NOT have replicated!", cache2.getNode(fqn));
+ replListener1.expectAny();
// now make sure cache2 is in sync with cache1:
cache2.put(fqn, "key", "value");
- TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+// TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+ replListener1.waitForReplicationToOccur(500);
// since the node already exists even PL will not remove it - but will invalidate
it's data
Node n = cache1.getNode(fqn);
assertHasBeenInvalidated(n, "Should have been invalidated");
assertEquals("value", cache2.get(fqn, "key"));
+ replListener2.expectAny();
// now test the invalidation:
cache1.put(fqn, "key2", "value2");
assertEquals("value2", cache1.get(fqn, "key2"));
- TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+// TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+ replListener2.waitForReplicationToOccur(500);
// since the node already exists even PL will not remove it - but will invalidate
it's data
n = cache2.getNode(fqn);
Modified:
core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -10,7 +10,10 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.config.Configuration;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;
@@ -194,6 +197,7 @@
public void testPessAsyncRepl() throws Exception
{
createCaches(false, false);
+ ReplicationListener replListener = new ReplicationListener(cache2);
mgr1.begin();
cache1.put(fqn, key, "value");
@@ -202,10 +206,11 @@
assertNull(cache2.get(fqn, key));
assertNull(loader1.get(fqn));
assertNull(loader2.get(fqn));
+
+ replListener.expect(PrepareCommand.class);
mgr1.commit();
+ replListener.waitForReplicationToOccur(500);
- TestingUtil.sleepThread(500);
-
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
assertEquals("value", loader1.get(fqn).get(key));
@@ -221,8 +226,6 @@
mgr1.rollback();
- TestingUtil.sleepThread(500);
-
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
assertEquals("value", loader1.get(fqn).get(key));
@@ -266,16 +269,20 @@
{
createCaches(false, true);
+ ReplicationListener replListener = new ReplicationListener(cache2);
+
mgr1.begin();
+ replListener.expect(CommitCommand.class);
cache1.put(fqn, key, "value");
assertEquals("value", cache1.get(fqn, key));
assertNull(cache2.get(fqn, key));
assertNull(loader1.get(fqn));
assertNull(loader2.get(fqn));
+
mgr1.commit();
- TestingUtil.sleepThread(500);
+ replListener.waitForReplicationToOccur(500);
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
@@ -291,7 +298,6 @@
assertEquals("value", loader2.get(fqn).get(key));
mgr1.rollback();
- TestingUtil.sleepThread(500);
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -13,23 +13,19 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.marshall.data.Address;
import org.jboss.cache.marshall.data.Person;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import javax.transaction.*;
import java.lang.reflect.Method;
/**
@@ -48,6 +44,8 @@
Throwable ex;
private Fqn<String> aop = Fqn.fromString("/aop");
protected boolean useMarshalledValues = false;
+ ReplicationListener replListener1;
+ ReplicationListener replListener2;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception
@@ -60,6 +58,8 @@
cache2 = createCache("TestCache");
+ replListener1 = new ReplicationListener(cache1);
+ replListener2 = new ReplicationListener(cache2);
addr = new Address();
addr.setCity("San Jose");
ben = new Person();
@@ -109,11 +109,16 @@
}
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(aop, "person", ben);
+ replListener2.waitForReplicationToOccur(500);
+
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/alias"), "person", ben);
+ replListener2.waitForReplicationToOccur(500);
+
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
Object ben2 = null;
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
@@ -133,9 +138,10 @@
// Set it back to the cache
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
+ replListener1.expect(PutKeyValueCommand.class);
cache2.put(aop, "person", ben2);
+ replListener1.waitForReplicationToOccur(1000);
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
Object ben3 = cache1.get(aop, "person");
if (useMarshalledValues) resetContextClassLoader();
@@ -161,10 +167,16 @@
Object scopedBen2 = getPersonFromClassloader(clb);
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/aop/1"), "person", ben);
+ replListener2.waitForReplicationToOccur(1000);
+
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/aop/2"), "person", scopedBen1);
+ replListener2.waitForReplicationToOccur(1000);
+
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
Object ben2 = null;
// Can't cast it to Person. CCE will resutl.
@@ -180,11 +192,11 @@
public void testTxPut() throws Exception
{
+ replListener2.expectAny();
beginTransaction();
cache1.put(aop, "person", ben);
-// cache1.put(aop, "person1", ben);
commit();
- TestingUtil.sleepThread(1000);
+ replListener2.waitForReplicationToOccur(1000);
Person ben2 = (Person) cache2.get(aop, "person");
assertNotNull("Person from 2nd cache should not be null ", ben2);
assertEquals(ben.toString(), ben2.toString());
@@ -204,12 +216,13 @@
}
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+ replListener2.expectAny();
beginTransaction();
cache1.put(aop, "person", ben);
commit();
+ replListener2.waitForReplicationToOccur(1000);
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
Object ben2 = null;
// Can't cast it to Person. CCE will resutl.
@@ -229,9 +242,10 @@
// Set it back to the cache
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
+ replListener1.expectAny();
cache2.put(aop, "person", ben2);
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
+ replListener1.waitForReplicationToOccur(100);
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
Object ben3 = cache1.get(aop, "person");
if (useMarshalledValues) resetContextClassLoader();
@@ -256,8 +270,9 @@
Fqn base = Fqn.fromString("/aop");
Fqn fqn = Fqn.fromRelativeElements(base, custom1);
+ replListener2.expectAny();
cache1.put(fqn, "key", "value");
- TestingUtil.sleepThread(1000);
+ replListener2.waitForReplicationToOccur(1000);
Fqn fqn2 = Fqn.fromRelativeElements(base, custom2);
Object val = cache2.get(fqn2, "key");
Modified:
core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -5,6 +5,7 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.testng.annotations.AfterMethod;
@@ -27,6 +28,7 @@
public class InvalidRegionForStateTransferTest
{
Cache<Object, Object> c1, c2;
+ ReplicationListener replListener2;
@BeforeMethod
public void setUp() throws CloneNotSupportedException
@@ -47,6 +49,7 @@
c1.start();
c2 = new DefaultCacheFactory<Object,
Object>().createCache(c1.getConfiguration().clone());
+ replListener2 = new ReplicationListener(c2);
TestingUtil.blockUntilViewsReceived(60000, c1, c2);
}
@@ -63,11 +66,12 @@
c1.getRegion(fqn.getParent(),
true).registerContextClassLoader(getClass().getClassLoader());
c2.getRegion(fqn.getParent(),
true).registerContextClassLoader(getClass().getClassLoader());
+ replListener2.expectAny();
// write something; will cause a stale region to be stored in C2's cache
marshaller
c1.put(fqn, "k", "v");
assert c1.get(fqn, "k").equals("v");
- TestingUtil.sleepThread(250); // async repl
+ replListener2.waitForReplicationToOccur(250);
// assert that this made it to c2
assert c2.get(fqn, "k").equals("v");
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -294,7 +294,7 @@
cache1.put(Fqn.fromString("/aop/2"), map);
cache1.removeNode(Fqn.fromString("/aop/2"));
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
+// TestingUtil.sleepThread(1000);
}
public void testTxMethodCall() throws Exception
@@ -322,7 +322,7 @@
cache1.removeNode(Fqn.fromString("/aop/2"));
tm.commit();
- TestingUtil.sleepThread(1000);
+// TestingUtil.sleepThread(1000);
if (useMarshalledValues) resetContextClassLoader();
}
Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -9,6 +9,7 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.loader.SamplePojo;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.transaction.TransactionSetup;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
@@ -22,12 +23,14 @@
public class AsyncCacheTest extends AbstractOptimisticTestCase
{
private CacheSPI<Object, Object> cache, cache2;
+ ReplicationListener replListener2;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception
{
cache = createReplicatedCache(Configuration.CacheMode.REPL_ASYNC);
cache2 = createReplicatedCache(Configuration.CacheMode.REPL_ASYNC);
+ replListener2 = new ReplicationListener(cache2);
}
@AfterMethod(alwaysRun = true)
@@ -63,11 +66,13 @@
SamplePojo pojo = new SamplePojo(21, "test");
+ replListener2.expectAny();
cache.put("/one/two", "key1", pojo);
//GlobalTransaction globalTransaction = cache.getCurrentTransaction(tx);
assertNotNull(mgr.getTransaction());
mgr.commit();
+ replListener2.waitForReplicationToOccur(1000);
assertNull(mgr.getTransaction());
@@ -79,8 +84,6 @@
assertTrue(cache.exists(Fqn.fromString("/one")));
assertEquals(pojo, cache.get(Fqn.fromString("/one/two"),
"key1"));
- // allow changes to replicate since this is async
- TestingUtil.sleepThread((long) 1000);
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -106,10 +109,12 @@
SamplePojo pojo = new SamplePojo(21, "test");
+ replListener2.expectAny();
cache.put("/one/two", "key1", pojo);
assertNotNull(mgr.getTransaction());
mgr.commit();
+ replListener2.waitForReplicationToOccur(1000);
assertNull(mgr.getTransaction());
@@ -121,9 +126,7 @@
assertTrue(cache.exists(Fqn.fromString("/one")));
assertEquals(pojo, cache.get(Fqn.fromString("/one/two"),
"key1"));
- // let the async calls complete
- TestingUtil.sleepThread((long) 1000);
-
+
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
Modified:
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -2,10 +2,13 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.loader.SamplePojo;
import org.jboss.cache.lock.LockManager;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
@@ -174,6 +177,7 @@
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
+ ReplicationListener replListener2 = new ReplicationListener(cache2);
LockManager lockManager = TestingUtil.extractLockManager(cache);
LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
@@ -187,9 +191,11 @@
SamplePojo pojo = new SamplePojo(21, "test");
+ replListener2.expectAny();
cache.put("/one/two", "key1", pojo);
mgr.commit();
+ replListener2.waitForReplicationToOccur(1000);
// cache asserts
assertNull(mgr.getTransaction());
@@ -208,9 +214,6 @@
assertNotNull(cache.getNode("/one").getChild("two"));
assertNotNull(cache.get(Fqn.fromString("/one/two"), "key1"));
- // let async calls propagate
- TestingUtil.sleepThread((long) 1000);
-
// cache2 asserts
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -235,7 +238,9 @@
{
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
+ ReplicationListener replListener = new ReplicationListener(cache);
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
+ ReplicationListener replListener2 = new ReplicationListener(cache2);
LockManager lockManager = TestingUtil.extractLockManager(cache);
LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
@@ -249,9 +254,11 @@
SamplePojo pojo = new SamplePojo(21, "test");
+ replListener2.expectAny();
cache.put("/one/two", "key1", pojo);
mgr.commit();
+ replListener2.waitForReplicationToOccur(1000);
// cache asserts
assertNull(mgr.getTransaction());
@@ -270,9 +277,6 @@
assertNotNull(cache.getNode("/one").getChild("two"));
assertNotNull(cache.get(Fqn.fromString("/one/two"), "key1"));
- // let async calls propagate
- TestingUtil.sleepThread((long) 1000);
-
// cache2 asserts
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -289,16 +293,14 @@
assertNotNull(cache2.getNode("/one").getChild("two"));
assertNotNull(cache2.get(Fqn.fromString("/one/two"), "key1"));
+ replListener2.expect(RemoveNodeCommand.class);
cache.removeNode("/one/two");
+ replListener2.waitForReplicationToOccur(1000);
+
assertEquals(false, cache.exists("/one/two"));
assertEquals(null, cache.get("/one/two", "key1"));
-
- // let async calls propagate
- TestingUtil.sleepThread((long) 1000);
-
assertEquals(false, cache2.exists("/one/two"));
assertEquals(null, cache2.get("/one/two", "key1"));
-
destroyCache(cache);
destroyCache(cache2);
}
@@ -307,6 +309,7 @@
{
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
+ ReplicationListener replListener = new ReplicationListener(cache);
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
LockManager lockManager = TestingUtil.extractLockManager(cache);
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -48,7 +48,7 @@
{
try
{
- Method method =
EvictionTimerTask.class.getDeclaredMethod("processRegions", new Class[]{null});
+ Method method =
EvictionTimerTask.class.getDeclaredMethod("processRegions", new Class[]{});
method.setAccessible(true);
method.invoke(timerTask);
}
Modified:
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -2,11 +2,16 @@
import org.jboss.cache.Cache;
import org.jboss.cache.RPCManager;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Fqn;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
+import org.jboss.cache.marshall.Marshaller;
+import org.jboss.cache.marshall.RegionalizedMethodCall;
+import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
import org.jboss.cache.util.TestingUtil;
import org.jgroups.blocks.RpcDispatcher;
@@ -16,6 +21,9 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.InputStream;
/**
* Utility class that notifies when certain commands were asynchronously replicated on
secondary cache.
@@ -50,9 +58,18 @@
ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher)
TestingUtil.extractField(rpcManager, "rpcDispatcher");
- RpcDispatcher.Marshaller realMarshaller = (RpcDispatcher.Marshaller)
TestingUtil.extractField(RpcDispatcher.class, realDispatcher,
"req_marshaller");
- MarshallerDelegate delegate = new MarshallerDelegate(realMarshaller);
- TestingUtil.replaceField(delegate, "req_marshaller", realDispatcher,
RpcDispatcher.class);
+ if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
+ {
+ Marshaller realMarshaller = (Marshaller)
TestingUtil.extractField(InactiveRegionAwareRpcDispatcher.class, realDispatcher,
"requestMarshaller");
+ RegionMarshallerDelegate delegate = new
RegionMarshallerDelegate(realMarshaller);
+ TestingUtil.replaceField(delegate, "requestMarshaller",
realDispatcher, InactiveRegionAwareRpcDispatcher.class);
+ }
+ else
+ {
+ RpcDispatcher.Marshaller realMarshaller = (RpcDispatcher.Marshaller)
TestingUtil.extractField(RpcDispatcher.class, realDispatcher,
"req_marshaller");
+ MarshallerDelegate delegate = new MarshallerDelegate(realMarshaller);
+ TestingUtil.replaceField(delegate, "req_marshaller", realDispatcher,
RpcDispatcher.class);
+ }
}
private class MarshallerDelegate implements RpcDispatcher.Marshaller
@@ -72,21 +89,47 @@
public Object objectFromByteBuffer(byte bytes[]) throws Exception
{
Object result = marshaller.objectFromByteBuffer(bytes);
- System.out.println("Received result = " + result);
if (result instanceof ReplicateCommand && expectedCommands != null)
{
ReplicateCommand replicateCommand = (ReplicateCommand) result;
+ return new ReplicateCommandDelegate(replicateCommand);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * We want the notification to be performed only *after* the remote command is
executed.
+ */
+ private class ReplicateCommandDelegate extends ReplicateCommand
+ {
+ ReplicateCommand realOne;
+
+ private ReplicateCommandDelegate(ReplicateCommand realOne)
+ {
+ this.realOne = realOne;
+ }
+
+ @Override
+ public Object perform(InvocationContext ctx) throws Throwable
+ {
+ try
+ {
+ return realOne.perform(ctx);
+ }
+ finally
+ {
Iterator<Class<? extends ReplicableCommand>> it =
expectedCommands.iterator();
while (it.hasNext())
{
Class<? extends ReplicableCommand> replicableCommandClass =
it.next();
- if (replicateCommand.containsCommandType(replicableCommandClass))
+ if (realOne.containsCommandType(replicableCommandClass))
{
it.remove();
}
- else if (replicateCommand.getSingleModification() instanceof
PrepareCommand) //explicit transaction
+ else if (realOne.getSingleModification() instanceof PrepareCommand)
//explicit transaction
{
- PrepareCommand prepareCommand = (PrepareCommand)
replicateCommand.getSingleModification();
+ PrepareCommand prepareCommand = (PrepareCommand)
realOne.getSingleModification();
if (prepareCommand.containsModificationType(replicableCommandClass))
{
it.remove();
@@ -98,8 +141,66 @@
latch.countDown();
}
}
+ }
+ }
+
+ /**
+ * Needed for region based marshalling.
+ */
+ private class RegionMarshallerDelegate implements Marshaller
+ {
+ private Marshaller realOne;
+
+ private RegionMarshallerDelegate(Marshaller realOne)
+ {
+ this.realOne = realOne;
+ }
+
+ public void objectToObjectStream(Object obj, ObjectOutputStream out) throws
Exception
+ {
+ realOne.objectToObjectStream(obj, out);
+ }
+
+ public Object objectFromObjectStream(ObjectInputStream in) throws Exception
+ {
+ return realOne.objectFromObjectStream(in);
+ }
+
+ public Object objectFromStream(InputStream is) throws Exception
+ {
+ return realOne.objectFromStream(is);
+ }
+
+ public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region)
throws Exception
+ {
+ realOne.objectToObjectStream(obj, out, region);
+ }
+
+ public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer)
throws Exception
+ {
+ RegionalizedMethodCall result =
realOne.regionalizedMethodCallFromByteBuffer(buffer);
+ if (result.command instanceof ReplicateCommand && expectedCommands !=
null)
+ {
+ ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
+ result.command = new ReplicateCommandDelegate(replicateCommand);
+ }
return result;
}
+
+ public RegionalizedMethodCall
regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
+ {
+ return realOne.regionalizedMethodCallFromObjectStream(in);
+ }
+
+ public byte[] objectToByteBuffer(Object o) throws Exception
+ {
+ return realOne.objectToByteBuffer(o);
+ }
+
+ public Object objectFromByteBuffer(byte[] bytes) throws Exception
+ {
+ return realOne.objectFromByteBuffer(bytes);
+ }
}
/**
Modified:
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java 2008-05-29
15:08:09 UTC (rev 5918)
+++
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java 2008-05-30
14:27:41 UTC (rev 5919)
@@ -52,11 +52,17 @@
}
}
- public void waitUntillAllReplicated(long timeout) throws Exception
+ public void waitUntillAllReplicated(long timeout)
{
synchronized (replicated)
{
- replicated.wait(timeout);
+ try
+ {
+ replicated.wait(timeout);
+ } catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
log("returning from waitUntillAllReplicated call");
}