JBoss Cache SVN: r5920 - core/trunk/src/main/java/org/jboss/cache/lock.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-05-30 11:10:19 -0400 (Fri, 30 May 2008)
New Revision: 5920
Modified:
core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java
Log:
fixed TcpCacheLoader test
Modified: core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java 2008-05-30 14:27:41 UTC (rev 5919)
+++ core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java 2008-05-30 15:10:19 UTC (rev 5920)
@@ -261,7 +261,7 @@
public boolean ownsLock(Fqn fqn, Object owner)
{
- return ownsLock(dataContainer.peek(fqn), owner);
+ return ownsLock(dataContainer.peek(fqn, true, true), owner);
}
public boolean ownsLock(NodeSPI node, Object owner)
16 years, 7 months
JBoss Cache SVN: r5919 - in core/trunk/src: main/java/org/jboss/cache/interceptors and 11 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-05-30 10:27:41 -0400 (Fri, 30 May 2008)
New Revision: 5919
Modified:
core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java
core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java
core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java
core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java
core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java
core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java
Log:
JBCACHE-1338 - removed some Thread.sleep() statements and finalized the ReplicationListener - now it also works with region based marshalling
Modified: core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -287,10 +287,10 @@
*/
protected void processAddedNodes(EvictedEventNode evictedEventNode) throws EvictionException
{
- processAddedNodes(evictedEventNode, evictedEventNode.getElementDifference(), evictedEventNode.isResetElementCount());
+ processAddedNodes(evictedEventNode, evictedEventNode.getElementDifference());
}
- protected void processAddedNodes(EvictedEventNode evictedEventNode, int numAddedElements, boolean resetElementCount) throws EvictionException
+ protected void processAddedNodes(EvictedEventNode evictedEventNode, int numAddedElements) throws EvictionException
{
Fqn fqn = evictedEventNode.getFqn();
@@ -304,14 +304,7 @@
{
ne.setModifiedTimeStamp(evictedEventNode.getCreationTimestamp());
ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1);
- if (resetElementCount)
- {
- ne.setNumberOfElements(numAddedElements);
- }
- else
- {
- ne.setNumberOfElements(ne.getNumberOfElements() + numAddedElements);
- }
+ ne.setNumberOfElements(ne.getNumberOfElements() + numAddedElements);
if (trace)
{
log.trace("Queue already contains " + ne.getFqn() + " processing it as visited");
@@ -410,7 +403,7 @@
{
log.debug("Visiting node that was not added to eviction queues. Assuming that it has 1 element.");
}
- this.processAddedNodes(evictedEventNode, 1, false);
+ this.processAddedNodes(evictedEventNode, 1);
return;
}
// note this method will visit and modify the node statistics by reference!
@@ -451,7 +444,7 @@
{
log.trace("Adding element " + fqn + " for a node that doesn't exist yet. Process as an add.");
}
- this.processAddedNodes(evictedEventNode, 1, false);
+ this.processAddedNodes(evictedEventNode, 1);
return;
}
Modified: core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -18,9 +18,8 @@
public class EvictedEventNode implements Cloneable
{
private Fqn fqn_;
- private NodeEventType type_;
- private int elementDifference_;
- private boolean resetElementCount_;
+ private NodeEventType type;
+ private int elementDifference;
private long inUseTimeout;
private long creationTimestamp;
@@ -53,24 +52,14 @@
this.inUseTimeout = inUseTimeout;
}
- public boolean isResetElementCount()
- {
- return this.resetElementCount_;
- }
-
- public void setResetElementCount(boolean resetElementCount)
- {
- this.resetElementCount_ = resetElementCount;
- }
-
public int getElementDifference()
{
- return elementDifference_;
+ return elementDifference;
}
public void setElementDifference(int elementDifference_)
{
- this.elementDifference_ = elementDifference_;
+ this.elementDifference = elementDifference_;
}
public Fqn getFqn()
@@ -85,18 +74,18 @@
public void setEventType(NodeEventType event)
{
- type_ = event;
+ type = event;
}
public NodeEventType getEventType()
{
- return type_;
+ return type;
}
@Override
public String toString()
{
- return "EvictedEN[fqn=" + fqn_ + " event=" + type_ + " diff=" + elementDifference_ + "]";
+ return "EvictedEN[fqn=" + fqn_ + " event=" + type + " diff=" + elementDifference + "]";
}
public EvictedEventNode clone(Fqn cloneFqn)
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -116,7 +116,6 @@
size = command.getData().size();
}
EvictedEventNode event = new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT, size);
- event.setResetElementCount(false);
registerEvictionEventToRegionManager(event, r);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -14,8 +14,8 @@
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
* @since 2.1.1
*/
-class RegionalizedMethodCall
+public class RegionalizedMethodCall
{
- ReplicableCommand command;
- Fqn region;
+ public ReplicableCommand command;
+ public Fqn region;
}
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -41,7 +41,7 @@
caches.add(createCache(1, "TEST", false, true));
TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
Fqn<String> test = BuddyFqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(), main);
@@ -50,17 +50,18 @@
caches.add(createCache(1, "TEST", false, true));
TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
assertNull("State not transferred", caches.get(2).get(test, "name"));
// Make 2 the buddy of 0
caches.get(1).stop();
- caches.set(1, null);
+ caches.remove(1);
+ TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
- assertEquals("State transferred", "Joe", caches.get(2).get(test, "name"));
+ assertEquals("State transferred", "Joe", caches.get(1).get(test, "name"));
}
public void testRegionBasedStateTransfer() throws Exception
@@ -88,7 +89,7 @@
caches.get(3).start();
TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
Fqn fqnA = Fqn.fromString("/a");
Fqn fqnD = Fqn.fromString("/d");
@@ -160,7 +161,7 @@
caches.get(1).start();
TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
Fqn test = BuddyFqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(), main);
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -58,7 +58,7 @@
c1.activate();
cache1.put(A_B, "name", JOE);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
System.out.println("Cache dump BEFORE activation");
System.out.println("cache1 " + CachePrinter.printCacheDetails(cache1));
@@ -95,7 +95,7 @@
cache1.put(A_B, "name", JOE);
- TestingUtil.sleepThread(getSleepTimeout());
+// TestingUtil.sleepThread(getSleepTimeout());
assertNull("Should be no replication to inactive region", cache2.get(A_B, "name"));
assertNull("Should be no replication to inactive backup region", cache2.get(backupFqn, "name"));
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -47,7 +47,7 @@
caches = createCaches(2, 3, false);
TestingUtil.blockUntilViewsReceived(5000, caches.toArray(new Cache[0]));
- TestingUtil.sleepThread(2000);
+// TestingUtil.sleepThread(2000);
System.out.println("*** Testing cache 0");
assertIsBuddy(caches.get(0), caches.get(1), false);
Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -6,6 +6,7 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.RPCManager;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationQueueNotifier;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.ComponentRegistry;
@@ -102,7 +103,6 @@
assert totalInvocations % COUNT == 0 : "NumThreads and NumLoopsPerThread must multiply to be a multiple of COUNT";
- int expectedReplications = totalInvocations / COUNT;
final CountDownLatch latch = new CountDownLatch(1);
// mock the RPCManager used in the cache
@@ -148,7 +148,9 @@
// now test results
verify(mockRpcManager);
- TestingUtil.sleepThread(250); // make sure the queue flushes
+ ReplicationQueueNotifier notifier = new ReplicationQueueNotifier(cache);
+ notifier.waitUntillAllReplicated(250);
+// TestingUtil.sleepThread(250); // make sure the queue flushes
assert replQ.elements.size() == 0;
}
@@ -161,7 +163,9 @@
cache.put("/a/b/c" + i, "key", "value");
assertNotNull(cache.get("/a/b/c" + i, "key"));
}
- TestingUtil.sleepThread(500);
+ ReplicationQueueNotifier notifier = new ReplicationQueueNotifier(cache);
+ notifier.waitUntillAllReplicated(500);
+// TestingUtil.sleepThread(500);
for (int i = 0; i < COUNT; i++) assertNotNull("on get i = " + i, cache2.get("/a/b/c" + i, "key"));
}
}
Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -3,6 +3,7 @@
import org.jboss.cache.Cache;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationQueueNotifier;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.testng.annotations.AfterMethod;
@@ -42,7 +43,8 @@
// outside of tx scope
cache.put("/a", "k", "v");
- TestingUtil.sleepThread(200);
+ ReplicationQueueNotifier replicationQueueNotifier = new ReplicationQueueNotifier(cache);
+ replicationQueueNotifier.waitUntillAllReplicated(200);
assert cache2.get("/a", "k").equals("v");
@@ -51,8 +53,10 @@
cache2.put("/a", "k", "v2");
txManager.commit();
- TestingUtil.sleepThread(200);
+ ReplicationQueueNotifier replicationQueueNotifier2 = new ReplicationQueueNotifier(cache2);
+ replicationQueueNotifier2.waitUntillAllReplicated(200);
+
assert cache.get("/a", "k").equals("v2");
}
}
Modified: core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -54,7 +54,9 @@
dataMap.put("k2", "v2");
Map expected = new HashMap(dataMap);
expected.putAll(node.getDataDirect());
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(testFqn, true, NodeModifiedEvent.ModificationType.PUT_MAP, node.getData(), null);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(testFqn, false, NodeModifiedEvent.ModificationType.PUT_MAP, expected, null);
control.replay();
Modified: core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -44,9 +44,11 @@
{
nodes.adfNode.put("existingKey", "existingValue");
expect(container.peekStrict(globalTransaction, fqn, false)).andReturn(nodes.adfNode);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.PUT_DATA, nodes.adfNode.getDataDirect(), ctx);
Map expected = new HashMap();
expected.put("k", "v");
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.PUT_DATA, expected, ctx);
control.replay();
assert null == command.perform(ctx) : "no pre existing value";
@@ -68,9 +70,11 @@
{
nodes.adfNode.put("k", "oldValue");
expect(container.peekStrict(globalTransaction, fqn, false)).andReturn(nodes.adfNode);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.PUT_DATA, nodes.adfNode.getDataDirect(), ctx);
Map expected = new HashMap();
expected.put("k", "v");
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.PUT_DATA, expected, ctx);
control.replay();
assert "oldValue".equals(command.perform(ctx)) : "no pre existing value";
Modified: core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -41,9 +41,11 @@
expected.put("newKey","newValue");
nodes.adfgNode.putAll(expected);
expect(container.peek(fqn, false, false)).andReturn(nodes.adfgNode);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
expected = new HashMap();
expected.put(key,null);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
control.replay();
assert null == command.perform(ctx);
@@ -66,7 +68,9 @@
expected.put(key,"newValue");
nodes.adfgNode.putAll(expected);
expect(container.peek(fqn, false, false)).andReturn(nodes.adfgNode);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
+ expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
control.replay();
assert "newValue" == command.perform(ctx);
Modified: core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -10,6 +10,7 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.EvictionController;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.EvictionConfig;
import org.jboss.cache.config.EvictionRegionConfig;
@@ -105,8 +106,9 @@
e.printStackTrace();
}
}
+ EvictionController evictionController = new EvictionController(cache);
+ evictionController.startEviction();
- TestingUtil.sleepThread(wakeupIntervalMillis + 500);
try
{
String val = (String) cache.get(rootStr + "3", rootStr + "3");
@@ -136,7 +138,8 @@
cache.put(fqn, str, str);
}
- TestingUtil.sleepThread(wakeupIntervalMillis + 10000);
+ EvictionController evictionController = new EvictionController(cache);
+ evictionController.startEviction();
assertEquals("Number of nodes", maxNodes + 2, cache.getNumberOfNodes());
for (int i = 0; i < maxNodes; i++)
{
@@ -181,7 +184,8 @@
int period = wakeupIntervalMillis + 500;
log("sleeping for " + period + "ms");
- TestingUtil.sleepThread(period);// it really depends the eviction thread time.
+ EvictionController evictionController = new EvictionController(cache);
+ evictionController.startEviction();
try
{
for (int i = 0; i < 5; i++)
@@ -197,7 +201,7 @@
assertNotNull(cache.get(fqn, str));
}
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
// since it is FIFO if we leave it alone and revisit, cache should remain the same.
for (int i = 5; i < 10; i++)
{
@@ -235,9 +239,9 @@
}
}
- int period = (wakeupIntervalMillis + 500);
- log("period is " + period);
- TestingUtil.sleepThread(period);
+ EvictionController evictionController = new EvictionController(cache);
+ evictionController.startEviction();
+
String str1 = rootStr + "7";
Fqn fqn1 = Fqn.fromString(str1);
String str2 = rootStr + "7/7";
@@ -247,11 +251,11 @@
assertNotNull(cache.get(fqn2, str2));
assertNotNull(cache.get(fqn1, str1));
cache.removeNode(fqn2);
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
assertNull(cache.get(fqn2, str2));
assertNotNull(cache.get(fqn1, str1));
cache.removeNode(fqn1);
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
assertNull(cache.get(fqn1, str1));
assertNull(cache.get(fqn2, str2));
@@ -262,12 +266,12 @@
assertNotNull(cache.get(fqn3, str3));
assertNotNull(cache.get(fqn4, str4));
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
// remove the node above fqn4 /org/jboss/test/5 will cascade the delete into /org/jboss/test/5/5
cache.removeNode(fqn4);
- TestingUtil.sleepThread(period);
+ evictionController.startEviction();
assertNull(cache.get(fqn3, str3));
assertNull(cache.get(fqn4, str4));
}
Modified: core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -344,7 +344,6 @@
PutDataMapCommand putCommand = commandsFactory.buildPutDataMapCommand(null, fqn, data);
invoker.invoke(putCommand);
event = regionManager.getRegion(fqn.toString(), false).takeLastEventNode();
- assertFalse(event.isResetElementCount());
assertEquals(NodeEventType.ADD_NODE_EVENT, event.getEventType());
assertEquals(fqn, event.getFqn());
assertEquals(100, event.getElementDifference());
@@ -366,7 +365,6 @@
assertEquals(NodeEventType.ADD_NODE_EVENT, event.getEventType());
assertEquals(fqn, event.getFqn());
assertEquals(100, event.getElementDifference());
- assertTrue(event.isResetElementCount());
assertNull(regionManager.getRegion(fqn.toString(), false).takeLastEventNode());
Modified: core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -14,6 +14,7 @@
import org.jboss.cache.Node;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.config.CacheLoaderConfig;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.XmlConfigurationParser;
@@ -119,25 +120,34 @@
cache2.start();
Fqn fqn = Fqn.fromString("/a/b");
+ ReplicationListener replListener1 = new ReplicationListener(cache1);
+ ReplicationListener replListener2 = new ReplicationListener(cache2);
+
+ replListener2.expectAny();
cache1.put(fqn, "key", "value");
- TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+ replListener2.waitForReplicationToOccur(500);
+// TestingUtil.sleepThread(500);// give it time to broadcast the evict call
// test that this has NOT replicated, but rather has been invalidated:
assertEquals("value", cache1.get(fqn, "key"));
assertNull("Should NOT have replicated!", cache2.getNode(fqn));
+ replListener1.expectAny();
// now make sure cache2 is in sync with cache1:
cache2.put(fqn, "key", "value");
- TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+// TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+ replListener1.waitForReplicationToOccur(500);
// since the node already exists even PL will not remove it - but will invalidate it's data
Node n = cache1.getNode(fqn);
assertHasBeenInvalidated(n, "Should have been invalidated");
assertEquals("value", cache2.get(fqn, "key"));
+ replListener2.expectAny();
// now test the invalidation:
cache1.put(fqn, "key2", "value2");
assertEquals("value2", cache1.get(fqn, "key2"));
- TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+// TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+ replListener2.waitForReplicationToOccur(500);
// since the node already exists even PL will not remove it - but will invalidate it's data
n = cache2.getNode(fqn);
Modified: core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -10,7 +10,10 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.config.Configuration;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;
@@ -194,6 +197,7 @@
public void testPessAsyncRepl() throws Exception
{
createCaches(false, false);
+ ReplicationListener replListener = new ReplicationListener(cache2);
mgr1.begin();
cache1.put(fqn, key, "value");
@@ -202,10 +206,11 @@
assertNull(cache2.get(fqn, key));
assertNull(loader1.get(fqn));
assertNull(loader2.get(fqn));
+
+ replListener.expect(PrepareCommand.class);
mgr1.commit();
+ replListener.waitForReplicationToOccur(500);
- TestingUtil.sleepThread(500);
-
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
assertEquals("value", loader1.get(fqn).get(key));
@@ -221,8 +226,6 @@
mgr1.rollback();
- TestingUtil.sleepThread(500);
-
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
assertEquals("value", loader1.get(fqn).get(key));
@@ -266,16 +269,20 @@
{
createCaches(false, true);
+ ReplicationListener replListener = new ReplicationListener(cache2);
+
mgr1.begin();
+ replListener.expect(CommitCommand.class);
cache1.put(fqn, key, "value");
assertEquals("value", cache1.get(fqn, key));
assertNull(cache2.get(fqn, key));
assertNull(loader1.get(fqn));
assertNull(loader2.get(fqn));
+
mgr1.commit();
- TestingUtil.sleepThread(500);
+ replListener.waitForReplicationToOccur(500);
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
@@ -291,7 +298,6 @@
assertEquals("value", loader2.get(fqn).get(key));
mgr1.rollback();
- TestingUtil.sleepThread(500);
assertEquals("value", cache1.get(fqn, key));
assertEquals("value", cache2.get(fqn, key));
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -13,23 +13,19 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.marshall.data.Address;
import org.jboss.cache.marshall.data.Person;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import javax.transaction.*;
import java.lang.reflect.Method;
/**
@@ -48,6 +44,8 @@
Throwable ex;
private Fqn<String> aop = Fqn.fromString("/aop");
protected boolean useMarshalledValues = false;
+ ReplicationListener replListener1;
+ ReplicationListener replListener2;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception
@@ -60,6 +58,8 @@
cache2 = createCache("TestCache");
+ replListener1 = new ReplicationListener(cache1);
+ replListener2 = new ReplicationListener(cache2);
addr = new Address();
addr.setCity("San Jose");
ben = new Person();
@@ -109,11 +109,16 @@
}
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(aop, "person", ben);
+ replListener2.waitForReplicationToOccur(500);
+
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/alias"), "person", ben);
+ replListener2.waitForReplicationToOccur(500);
+
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
Object ben2 = null;
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
@@ -133,9 +138,10 @@
// Set it back to the cache
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
+ replListener1.expect(PutKeyValueCommand.class);
cache2.put(aop, "person", ben2);
+ replListener1.waitForReplicationToOccur(1000);
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
Object ben3 = cache1.get(aop, "person");
if (useMarshalledValues) resetContextClassLoader();
@@ -161,10 +167,16 @@
Object scopedBen2 = getPersonFromClassloader(clb);
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/aop/1"), "person", ben);
+ replListener2.waitForReplicationToOccur(1000);
+
+ replListener2.expect(PutKeyValueCommand.class);
cache1.put(Fqn.fromString("/aop/2"), "person", scopedBen1);
+ replListener2.waitForReplicationToOccur(1000);
+
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
Object ben2 = null;
// Can't cast it to Person. CCE will resutl.
@@ -180,11 +192,11 @@
public void testTxPut() throws Exception
{
+ replListener2.expectAny();
beginTransaction();
cache1.put(aop, "person", ben);
-// cache1.put(aop, "person1", ben);
commit();
- TestingUtil.sleepThread(1000);
+ replListener2.waitForReplicationToOccur(1000);
Person ben2 = (Person) cache2.get(aop, "person");
assertNotNull("Person from 2nd cache should not be null ", ben2);
assertEquals(ben.toString(), ben2.toString());
@@ -204,12 +216,13 @@
}
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+ replListener2.expectAny();
beginTransaction();
cache1.put(aop, "person", ben);
commit();
+ replListener2.waitForReplicationToOccur(1000);
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
Object ben2 = null;
// Can't cast it to Person. CCE will resutl.
@@ -229,9 +242,10 @@
// Set it back to the cache
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
+ replListener1.expectAny();
cache2.put(aop, "person", ben2);
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
+ replListener1.waitForReplicationToOccur(100);
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
Object ben3 = cache1.get(aop, "person");
if (useMarshalledValues) resetContextClassLoader();
@@ -256,8 +270,9 @@
Fqn base = Fqn.fromString("/aop");
Fqn fqn = Fqn.fromRelativeElements(base, custom1);
+ replListener2.expectAny();
cache1.put(fqn, "key", "value");
- TestingUtil.sleepThread(1000);
+ replListener2.waitForReplicationToOccur(1000);
Fqn fqn2 = Fqn.fromRelativeElements(base, custom2);
Object val = cache2.get(fqn2, "key");
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -5,6 +5,7 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.testng.annotations.AfterMethod;
@@ -27,6 +28,7 @@
public class InvalidRegionForStateTransferTest
{
Cache<Object, Object> c1, c2;
+ ReplicationListener replListener2;
@BeforeMethod
public void setUp() throws CloneNotSupportedException
@@ -47,6 +49,7 @@
c1.start();
c2 = new DefaultCacheFactory<Object, Object>().createCache(c1.getConfiguration().clone());
+ replListener2 = new ReplicationListener(c2);
TestingUtil.blockUntilViewsReceived(60000, c1, c2);
}
@@ -63,11 +66,12 @@
c1.getRegion(fqn.getParent(), true).registerContextClassLoader(getClass().getClassLoader());
c2.getRegion(fqn.getParent(), true).registerContextClassLoader(getClass().getClassLoader());
+ replListener2.expectAny();
// write something; will cause a stale region to be stored in C2's cache marshaller
c1.put(fqn, "k", "v");
assert c1.get(fqn, "k").equals("v");
- TestingUtil.sleepThread(250); // async repl
+ replListener2.waitForReplicationToOccur(250);
// assert that this made it to c2
assert c2.get(fqn, "k").equals("v");
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -294,7 +294,7 @@
cache1.put(Fqn.fromString("/aop/2"), map);
cache1.removeNode(Fqn.fromString("/aop/2"));
if (useMarshalledValues) resetContextClassLoader();
- TestingUtil.sleepThread(1000);
+// TestingUtil.sleepThread(1000);
}
public void testTxMethodCall() throws Exception
@@ -322,7 +322,7 @@
cache1.removeNode(Fqn.fromString("/aop/2"));
tm.commit();
- TestingUtil.sleepThread(1000);
+// TestingUtil.sleepThread(1000);
if (useMarshalledValues) resetContextClassLoader();
}
Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -9,6 +9,7 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.loader.SamplePojo;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.transaction.TransactionSetup;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
@@ -22,12 +23,14 @@
public class AsyncCacheTest extends AbstractOptimisticTestCase
{
private CacheSPI<Object, Object> cache, cache2;
+ ReplicationListener replListener2;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception
{
cache = createReplicatedCache(Configuration.CacheMode.REPL_ASYNC);
cache2 = createReplicatedCache(Configuration.CacheMode.REPL_ASYNC);
+ replListener2 = new ReplicationListener(cache2);
}
@AfterMethod(alwaysRun = true)
@@ -63,11 +66,13 @@
SamplePojo pojo = new SamplePojo(21, "test");
+ replListener2.expectAny();
cache.put("/one/two", "key1", pojo);
//GlobalTransaction globalTransaction = cache.getCurrentTransaction(tx);
assertNotNull(mgr.getTransaction());
mgr.commit();
+ replListener2.waitForReplicationToOccur(1000);
assertNull(mgr.getTransaction());
@@ -79,8 +84,6 @@
assertTrue(cache.exists(Fqn.fromString("/one")));
assertEquals(pojo, cache.get(Fqn.fromString("/one/two"), "key1"));
- // allow changes to replicate since this is async
- TestingUtil.sleepThread((long) 1000);
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -106,10 +109,12 @@
SamplePojo pojo = new SamplePojo(21, "test");
+ replListener2.expectAny();
cache.put("/one/two", "key1", pojo);
assertNotNull(mgr.getTransaction());
mgr.commit();
+ replListener2.waitForReplicationToOccur(1000);
assertNull(mgr.getTransaction());
@@ -121,9 +126,7 @@
assertTrue(cache.exists(Fqn.fromString("/one")));
assertEquals(pojo, cache.get(Fqn.fromString("/one/two"), "key1"));
- // let the async calls complete
- TestingUtil.sleepThread((long) 1000);
-
+
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -2,10 +2,13 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.loader.SamplePojo;
import org.jboss.cache.lock.LockManager;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
@@ -174,6 +177,7 @@
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
+ ReplicationListener replListener2 = new ReplicationListener(cache2);
LockManager lockManager = TestingUtil.extractLockManager(cache);
LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
@@ -187,9 +191,11 @@
SamplePojo pojo = new SamplePojo(21, "test");
+ replListener2.expectAny();
cache.put("/one/two", "key1", pojo);
mgr.commit();
+ replListener2.waitForReplicationToOccur(1000);
// cache asserts
assertNull(mgr.getTransaction());
@@ -208,9 +214,6 @@
assertNotNull(cache.getNode("/one").getChild("two"));
assertNotNull(cache.get(Fqn.fromString("/one/two"), "key1"));
- // let async calls propagate
- TestingUtil.sleepThread((long) 1000);
-
// cache2 asserts
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -235,7 +238,9 @@
{
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
+ ReplicationListener replListener = new ReplicationListener(cache);
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
+ ReplicationListener replListener2 = new ReplicationListener(cache2);
LockManager lockManager = TestingUtil.extractLockManager(cache);
LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
@@ -249,9 +254,11 @@
SamplePojo pojo = new SamplePojo(21, "test");
+ replListener2.expectAny();
cache.put("/one/two", "key1", pojo);
mgr.commit();
+ replListener2.waitForReplicationToOccur(1000);
// cache asserts
assertNull(mgr.getTransaction());
@@ -270,9 +277,6 @@
assertNotNull(cache.getNode("/one").getChild("two"));
assertNotNull(cache.get(Fqn.fromString("/one/two"), "key1"));
- // let async calls propagate
- TestingUtil.sleepThread((long) 1000);
-
// cache2 asserts
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -289,16 +293,14 @@
assertNotNull(cache2.getNode("/one").getChild("two"));
assertNotNull(cache2.get(Fqn.fromString("/one/two"), "key1"));
+ replListener2.expect(RemoveNodeCommand.class);
cache.removeNode("/one/two");
+ replListener2.waitForReplicationToOccur(1000);
+
assertEquals(false, cache.exists("/one/two"));
assertEquals(null, cache.get("/one/two", "key1"));
-
- // let async calls propagate
- TestingUtil.sleepThread((long) 1000);
-
assertEquals(false, cache2.exists("/one/two"));
assertEquals(null, cache2.get("/one/two", "key1"));
-
destroyCache(cache);
destroyCache(cache2);
}
@@ -307,6 +309,7 @@
{
groupIncreaser++;
CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
+ ReplicationListener replListener = new ReplicationListener(cache);
CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
LockManager lockManager = TestingUtil.extractLockManager(cache);
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -48,7 +48,7 @@
{
try
{
- Method method = EvictionTimerTask.class.getDeclaredMethod("processRegions", new Class[]{null});
+ Method method = EvictionTimerTask.class.getDeclaredMethod("processRegions", new Class[]{});
method.setAccessible(true);
method.invoke(timerTask);
}
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -2,11 +2,16 @@
import org.jboss.cache.Cache;
import org.jboss.cache.RPCManager;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Fqn;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
+import org.jboss.cache.marshall.Marshaller;
+import org.jboss.cache.marshall.RegionalizedMethodCall;
+import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
import org.jboss.cache.util.TestingUtil;
import org.jgroups.blocks.RpcDispatcher;
@@ -16,6 +21,9 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.InputStream;
/**
* Utility class that notifies when certain commands were asynchronously replicated on secondary cache.
@@ -50,9 +58,18 @@
ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
- RpcDispatcher.Marshaller realMarshaller = (RpcDispatcher.Marshaller) TestingUtil.extractField(RpcDispatcher.class, realDispatcher, "req_marshaller");
- MarshallerDelegate delegate = new MarshallerDelegate(realMarshaller);
- TestingUtil.replaceField(delegate, "req_marshaller", realDispatcher, RpcDispatcher.class);
+ if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
+ {
+ Marshaller realMarshaller = (Marshaller) TestingUtil.extractField(InactiveRegionAwareRpcDispatcher.class, realDispatcher, "requestMarshaller");
+ RegionMarshallerDelegate delegate = new RegionMarshallerDelegate(realMarshaller);
+ TestingUtil.replaceField(delegate, "requestMarshaller", realDispatcher, InactiveRegionAwareRpcDispatcher.class);
+ }
+ else
+ {
+ RpcDispatcher.Marshaller realMarshaller = (RpcDispatcher.Marshaller) TestingUtil.extractField(RpcDispatcher.class, realDispatcher, "req_marshaller");
+ MarshallerDelegate delegate = new MarshallerDelegate(realMarshaller);
+ TestingUtil.replaceField(delegate, "req_marshaller", realDispatcher, RpcDispatcher.class);
+ }
}
private class MarshallerDelegate implements RpcDispatcher.Marshaller
@@ -72,21 +89,47 @@
public Object objectFromByteBuffer(byte bytes[]) throws Exception
{
Object result = marshaller.objectFromByteBuffer(bytes);
- System.out.println("Received result = " + result);
if (result instanceof ReplicateCommand && expectedCommands != null)
{
ReplicateCommand replicateCommand = (ReplicateCommand) result;
+ return new ReplicateCommandDelegate(replicateCommand);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * We want the notification to be performed only *after* the remote command is executed.
+ */
+ private class ReplicateCommandDelegate extends ReplicateCommand
+ {
+ ReplicateCommand realOne;
+
+ private ReplicateCommandDelegate(ReplicateCommand realOne)
+ {
+ this.realOne = realOne;
+ }
+
+ @Override
+ public Object perform(InvocationContext ctx) throws Throwable
+ {
+ try
+ {
+ return realOne.perform(ctx);
+ }
+ finally
+ {
Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
while (it.hasNext())
{
Class<? extends ReplicableCommand> replicableCommandClass = it.next();
- if (replicateCommand.containsCommandType(replicableCommandClass))
+ if (realOne.containsCommandType(replicableCommandClass))
{
it.remove();
}
- else if (replicateCommand.getSingleModification() instanceof PrepareCommand) //explicit transaction
+ else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
{
- PrepareCommand prepareCommand = (PrepareCommand) replicateCommand.getSingleModification();
+ PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
if (prepareCommand.containsModificationType(replicableCommandClass))
{
it.remove();
@@ -98,8 +141,66 @@
latch.countDown();
}
}
+ }
+ }
+
+ /**
+ * Needed for region based marshalling.
+ */
+ private class RegionMarshallerDelegate implements Marshaller
+ {
+ private Marshaller realOne;
+
+ private RegionMarshallerDelegate(Marshaller realOne)
+ {
+ this.realOne = realOne;
+ }
+
+ public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
+ {
+ realOne.objectToObjectStream(obj, out);
+ }
+
+ public Object objectFromObjectStream(ObjectInputStream in) throws Exception
+ {
+ return realOne.objectFromObjectStream(in);
+ }
+
+ public Object objectFromStream(InputStream is) throws Exception
+ {
+ return realOne.objectFromStream(is);
+ }
+
+ public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
+ {
+ realOne.objectToObjectStream(obj, out, region);
+ }
+
+ public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
+ {
+ RegionalizedMethodCall result = realOne.regionalizedMethodCallFromByteBuffer(buffer);
+ if (result.command instanceof ReplicateCommand && expectedCommands != null)
+ {
+ ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
+ result.command = new ReplicateCommandDelegate(replicateCommand);
+ }
return result;
}
+
+ public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
+ {
+ return realOne.regionalizedMethodCallFromObjectStream(in);
+ }
+
+ public byte[] objectToByteBuffer(Object o) throws Exception
+ {
+ return realOne.objectToByteBuffer(o);
+ }
+
+ public Object objectFromByteBuffer(byte[] bytes) throws Exception
+ {
+ return realOne.objectFromByteBuffer(bytes);
+ }
}
/**
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java 2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java 2008-05-30 14:27:41 UTC (rev 5919)
@@ -52,11 +52,17 @@
}
}
- public void waitUntillAllReplicated(long timeout) throws Exception
+ public void waitUntillAllReplicated(long timeout)
{
synchronized (replicated)
{
- replicated.wait(timeout);
+ try
+ {
+ replicated.wait(timeout);
+ } catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
log("returning from waitUntillAllReplicated call");
}
16 years, 7 months
JBoss Cache SVN: r5918 - in core/trunk/src: main/java/org/jboss/cache/buddyreplication and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-29 11:08:09 -0400 (Thu, 29 May 2008)
New Revision: 5918
Added:
core/trunk/src/main/java/org/jboss/cache/RegionEmptyException.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java
Log:
JBCACHE-1349: Buddy replication state transfer fails if a marshalling region is empty
Added: core/trunk/src/main/java/org/jboss/cache/RegionEmptyException.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionEmptyException.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/RegionEmptyException.java 2008-05-29 15:08:09 UTC (rev 5918)
@@ -0,0 +1,29 @@
+package org.jboss.cache;
+
+/**
+ * Exception to represent a region being empty when state was expected in that region.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.2.0
+ */
+public class RegionEmptyException extends CacheException
+{
+ public RegionEmptyException()
+ {
+ }
+
+ public RegionEmptyException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public RegionEmptyException(String msg)
+ {
+ super(msg);
+ }
+
+ public RegionEmptyException(String msg, Throwable cause)
+ {
+ super(msg, cause);
+ }
+}
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-05-29 14:49:10 UTC (rev 5917)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-05-29 15:08:09 UTC (rev 5918)
@@ -15,6 +15,7 @@
import org.jboss.cache.Node;
import org.jboss.cache.RPCManager;
import org.jboss.cache.Region;
+import org.jboss.cache.RegionEmptyException;
import org.jboss.cache.RegionManager;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.VisitableCommand;
@@ -788,10 +789,7 @@
{
Fqn f = r.getFqn();
state = acquireState(f);
- if (state != null)
- {
- stateMap.put(f, state);
- }
+ if (state != null) stateMap.put(f, state);
}
}
else if (!configuration.isInactiveOnStartup())
@@ -885,7 +883,10 @@
byte[] state = generateState(fqn, timeouts[i], force);
if (log.isDebugEnabled())
{
- log.debug("acquireState(): got state");
+ if (state == null)
+ log.debug("acquireState(): Got null state. Region is probably empty.");
+ else
+ log.debug("acquireState(): Got state");
}
return state;
}
@@ -950,7 +951,14 @@
{
ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
out = new MarshalledValueOutputStream(baos);
- stateTransferManager.getState(out, fqn, timeout, force, false);
+ try
+ {
+ stateTransferManager.getState(out, fqn, timeout, force, false);
+ }
+ catch (RegionEmptyException ree)
+ {
+ return null;
+ }
result = baos.getRawBuffer();
}
finally
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java 2008-05-29 14:49:10 UTC (rev 5917)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java 2008-05-29 15:08:09 UTC (rev 5918)
@@ -12,6 +12,7 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.NodeSPI;
+import org.jboss.cache.RegionEmptyException;
import org.jboss.cache.RegionManager;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
@@ -20,6 +21,7 @@
import org.jboss.cache.lock.LockManager;
import static org.jboss.cache.lock.LockType.READ;
import org.jboss.cache.lock.TimeoutException;
+import org.jboss.cache.marshall.InactiveRegionException;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.marshall.NodeDataMarker;
@@ -83,7 +85,7 @@
public void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
{
// can't give state for regions currently being activated/inactivated
- boolean canProvideState = (!regionManager.isInactive(fqn) && cache.peek(fqn, false, false) != null);
+ boolean canProvideState = (!regionManager.isInactive(fqn) && cache.peek(fqn, false) != null);
boolean fetchTransientState = configuration.isFetchInMemoryState();
CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
@@ -126,19 +128,20 @@
if (regionManager.isInactive(fqn))
{
exceptionMessage += " Region for fqn " + fqn + " is inactive.";
+ e = new InactiveRegionException(exceptionMessage);
}
+ // this is not really an exception. Just provide empty state. The exception is just a signal. Yes, lousy. - JBCACHE-1349
if (cache.peek(fqn, false, false) == null)
{
- exceptionMessage += " There is no cache node at fqn " + fqn;
+ e = new RegionEmptyException();
}
- e = new CacheException(exceptionMessage);
}
if (!fetchPersistentState && !fetchTransientState)
{
e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state");
}
marshaller.objectToObjectStream(e, out);
- throw e;
+ if (e != null) throw e;
}
}
Added: core/trunk/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java 2008-05-29 15:08:09 UTC (rev 5918)
@@ -0,0 +1,88 @@
+package org.jboss.cache.buddyreplication;
+
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Region;
+import org.jboss.cache.notifications.annotation.BuddyGroupChanged;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.event.Event;
+import org.jboss.cache.util.CachePrinter;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * To test http://jira.jboss.org/jira/browse/JBCACHE-1349
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ */
+@Test(groups = "functional")
+public class EmptyRegionTest extends BuddyReplicationTestsBase
+{
+ CacheSPI c1, c2;
+ Fqn regionFqn = Fqn.fromString("/a/b/c");
+ Fqn region2Fqn = Fqn.fromString("/d/e/f");
+ Region region, region2;
+ CountDownLatch buddyJoinLatch = new CountDownLatch(1);
+
+ @BeforeTest
+ public void setUp() throws Exception
+ {
+ c1 = createCache(1, null, false, false, false);
+ c1.getConfiguration().setUseRegionBasedMarshalling(true);
+ c1.getConfiguration().setFetchInMemoryState(true);
+ c2 = (CacheSPI) new DefaultCacheFactory().createCache(c1.getConfiguration().clone(), false);
+ c1.start();
+ region = c1.getRegion(regionFqn, true);
+ region2 = c1.getRegion(region2Fqn, true);
+ region.registerContextClassLoader(getClass().getClassLoader());
+ region2.registerContextClassLoader(getClass().getClassLoader());
+ c1.put(region2Fqn, "key", "value");
+
+ c2.create();
+ c2.addCacheListener(new BuddyJoinListener());
+ }
+
+ @AfterTest
+ public void tearDown()
+ {
+ TestingUtil.killCaches(c1, c2);
+ }
+
+ public void testEmptyRegion() throws InterruptedException
+ {
+ // region on c1 is empty - with no root node.
+ assert c1.getNode(regionFqn) == null : "Node should not exist";
+ assert c1.getRegion(regionFqn, false) != null : "Region should exist";
+ assert c1.getRegion(regionFqn, false).isActive() : "Region should be active";
+
+ // now start c2
+ c2.start();
+
+ // wait for buddy join notifications to complete.
+ buddyJoinLatch.await(60, TimeUnit.SECONDS);
+
+ // should not throw any exceptions!!
+
+ System.out.println("Cache1 " + CachePrinter.printCacheDetails(c1));
+ System.out.println("Cache2 " + CachePrinter.printCacheDetails(c2));
+
+ // make sure region2 stuff did get transmitted!
+ assert c2.peek(BuddyFqnTransformer.getBackupFqn(c1.getLocalAddress(), region2Fqn), false) != null : "Region2 state should have transferred!";
+ }
+
+ @CacheListener
+ public class BuddyJoinListener
+ {
+ @BuddyGroupChanged
+ public void buddyJoined(Event e)
+ {
+ buddyJoinLatch.countDown();
+ }
+ }
+}
16 years, 7 months
JBoss Cache SVN: r5917 - core/branches/2.1.X/src/test/java/org/jboss/cache/buddyreplication.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-29 10:49:10 -0400 (Thu, 29 May 2008)
New Revision: 5917
Added:
core/branches/2.1.X/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java
Log:
Added test for JBCACHE-1349
Added: core/branches/2.1.X/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java
===================================================================
--- core/branches/2.1.X/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java (rev 0)
+++ core/branches/2.1.X/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java 2008-05-29 14:49:10 UTC (rev 5917)
@@ -0,0 +1,88 @@
+package org.jboss.cache.buddyreplication;
+
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Region;
+import org.jboss.cache.misc.TestingUtil;
+import org.jboss.cache.notifications.annotation.BuddyGroupChanged;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.event.Event;
+import org.jboss.cache.util.CachePrinter;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * To test http://jira.jboss.org/jira/browse/JBCACHE-1349
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ */
+@Test(groups = "functional", enabled = false)
+// disabled since there is no intention to fix this in this branch.
+public class EmptyRegionTest extends BuddyReplicationTestsBase
+{
+ CacheSPI c1, c2;
+ Fqn regionFqn = Fqn.fromString("/a/b/c");
+ Fqn region2Fqn = Fqn.fromString("/d/e/f");
+ Region region, region2;
+ CountDownLatch buddyJoinLatch = new CountDownLatch(1);
+
+ @BeforeTest
+ public void setUp() throws Exception
+ {
+ c1 = createCache(1, null, false, false, false);
+ c1.getConfiguration().setUseRegionBasedMarshalling(true);
+ c1.getConfiguration().setFetchInMemoryState(true);
+ c2 = (CacheSPI) new DefaultCacheFactory().createCache(c1.getConfiguration().clone(), false);
+ c1.start();
+ region = c1.getRegion(regionFqn, true);
+ region2 = c1.getRegion(region2Fqn, true);
+ region.registerContextClassLoader(getClass().getClassLoader());
+ c1.put(region2Fqn, "key", "value");
+
+ c2.create();
+ c2.addCacheListener(new BuddyJoinListener());
+ }
+
+ @AfterTest
+ public void tearDown()
+ {
+ TestingUtil.killCaches(c1, c2);
+ }
+
+ public void testEmptyRegion() throws InterruptedException
+ {
+ // region on c1 is empty - with no root node.
+ assert c1.getNode(regionFqn) == null : "Node should not exist";
+ assert c1.getRegion(regionFqn, false) != null : "Region should exist";
+ assert c1.getRegion(regionFqn, false).isActive() : "Region should be active";
+
+ // now start c2
+ c2.start();
+
+ // wait for buddy join notifications to complete.
+ buddyJoinLatch.await(60, TimeUnit.SECONDS);
+
+ // should not throw any exceptions!!
+
+ System.out.println("Cache1 " + CachePrinter.printCacheDetails(c1));
+ System.out.println("Cache2 " + CachePrinter.printCacheDetails(c2));
+
+ // make sure region2 stuff did get transmitted!
+ assert c2.peek(BuddyManager.getBackupFqn(c1.getLocalAddress(), region2Fqn), false) != null : "Region2 state should have transferred!";
+ }
+
+ @CacheListener
+ public class BuddyJoinListener
+ {
+ @BuddyGroupChanged
+ public void buddyJoined(Event e)
+ {
+ buddyJoinLatch.countDown();
+ }
+ }
+}
16 years, 7 months
JBoss Cache SVN: r5916 - in core/trunk/src: test/java/org/jboss/cache/util/internals and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-05-29 09:56:53 -0400 (Thu, 29 May 2008)
New Revision: 5916
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Log:
JBCACHE-1338 - changes after code review
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-05-29 13:51:25 UTC (rev 5915)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-05-29 13:56:53 UTC (rev 5916)
@@ -104,7 +104,7 @@
throw new RuntimeException("Needs to be overridden!");
}
- public RegionalizedMethodCall regionalizedCommandFromByteBuffer(byte[] buffer) throws Exception
+ public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
{
throw new RuntimeException("Needs to be overridden!");
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-05-29 13:51:25 UTC (rev 5915)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-05-29 13:56:53 UTC (rev 5916)
@@ -35,11 +35,11 @@
protected ComponentRegistry componentRegistry;
protected boolean trace;
- public CommandAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
+ public CommandAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object serverObj,
InvocationContextContainer container, InterceptorChain interceptorChain,
ComponentRegistry componentRegistry)
{
- super(channel, l, l2, server_obj);
+ super(channel, l, l2, serverObj);
this.invocationContextContainer = container;
this.componentRegistry = componentRegistry;
this.interceptorChain = interceptorChain;
@@ -179,19 +179,4 @@
{
return getClass().getSimpleName() + "[Outgoing marshaller: " + req_marshaller + "; incoming marshaller: " + rsp_marshaller + "]";
}
-
- public InvocationContextContainer getInvocationContextContainer()
- {
- return invocationContextContainer;
- }
-
- public InterceptorChain getInterceptorChain()
- {
- return interceptorChain;
- }
-
- public ComponentRegistry getComponentRegistry()
- {
- return componentRegistry;
- }
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2008-05-29 13:51:25 UTC (rev 5915)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2008-05-29 13:56:53 UTC (rev 5916)
@@ -53,7 +53,7 @@
try
{
// we will ALWAYS be using the marshaller to unmarshall requests.
- rmc = requestMarshaller.regionalizedCommandFromByteBuffer(req.getBuffer());
+ rmc = requestMarshaller.regionalizedMethodCallFromByteBuffer(req.getBuffer());
command = rmc.command;
}
catch (Throwable e)
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-05-29 13:51:25 UTC (rev 5915)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-05-29 13:56:53 UTC (rev 5916)
@@ -78,7 +78,7 @@
* @throws Exception if there are issues
* @since 2.1.1
*/
- RegionalizedMethodCall regionalizedCommandFromByteBuffer(byte[] buffer) throws Exception;
+ RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception;
/**
* Returns a RegionalizedMethodCall from an object input stream. Only use if you <i>know</i> that the byte buffer contains a
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java 2008-05-29 13:51:25 UTC (rev 5915)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java 2008-05-29 13:56:53 UTC (rev 5916)
@@ -4,7 +4,7 @@
import org.jboss.cache.commands.ReplicableCommand;
/**
- * A regionalized MethodCall object, created when {@link Marshaller#regionalizedCommandFromByteBuffer(byte[])} or
+ * A regionalized MethodCall object, created when {@link Marshaller#regionalizedMethodCallFromByteBuffer(byte[])} or
* {@link org.jboss.cache.marshall.Marshaller#regionalizedMethodCallFromObjectStream(java.io.ObjectInputStream)} is called.
* <p/>
* Specifically used by the {@link org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher} so that the region used to unmarshall
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-05-29 13:51:25 UTC (rev 5915)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-05-29 13:56:53 UTC (rev 5916)
@@ -190,7 +190,7 @@
}
@Override
- public RegionalizedMethodCall regionalizedCommandFromByteBuffer(byte[] buf) throws Exception
+ public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buf) throws Exception
{
Marshaller marshaller;
int versionId;
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-05-29 13:51:25 UTC (rev 5915)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-05-29 13:56:53 UTC (rev 5916)
@@ -31,7 +31,8 @@
* </pre>
* Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the object returns to the
* non-initialized state and *can* be reused through expect-wait cycle.
- * Note: this class might be used aswell for sync caches, though it does not really make sence using it in these scenarios
+ * <b>Note</b>: this class might be used aswell for sync caches, e.g. a test could have subclasses which use sync and
+ * async replication
*
* @author Mircea.Markus(a)jboss.com
* @since 2.2
@@ -104,8 +105,6 @@
/**
* Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
* if replication does not occur in the give timeout then an exception is being thrown.
- *
- * @param timeoutMillis timeout in milliseconds
*/
public void waitForReplicationToOccur(long timeoutMillis)
{
@@ -152,6 +151,9 @@
this.expectedCommands.addAll(Arrays.asList(expectedCommands));
}
+ /**
+ * Waits untill first command is replicated.
+ */
public void expectAny()
{
expect();
16 years, 7 months
JBoss Cache SVN: r5915 - in core/trunk/src: main/java/org/jboss/cache/lock and 1 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-29 09:51:25 -0400 (Thu, 29 May 2008)
New Revision: 5915
Added:
core/trunk/src/test/java/org/jboss/cache/loader/ConcurrentPutRemoveEvictTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java
Log:
JBCACHE-1355 - race conditions in the CacheLoaderInterceptor
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java 2008-05-29 13:02:52 UTC (rev 5914)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java 2008-05-29 13:51:25 UTC (rev 5915)
@@ -28,6 +28,7 @@
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.lock.LockManager;
import org.jboss.cache.lock.LockType;
+import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.notifications.NotifierImpl;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
@@ -243,8 +244,23 @@
private void loadIfNeeded(InvocationContext ctx, Fqn fqn, Object key, boolean allKeys, boolean initNode, boolean acquireLock, TransactionEntry entry, boolean recursive, boolean isMove, boolean bypassLoadingData) throws Throwable
{
NodeSPI n = dataContainer.peek(fqn, true, true);
+ Object lockOwner = lockManager.getLockOwner(ctx);
+ boolean needLock = n != null && !lockManager.ownsLock(fqn, lockOwner);
+ boolean mustLoad = false;
+ try
+ {
+ if (needLock)
+ {
+ if (!lockManager.lock(n, LockType.READ, lockOwner))
+ throw new TimeoutException("Unable to acquire lock on " + fqn + ". Lock info: " + lockManager.printLockInfo(n));
+ }
+ mustLoad = mustLoad(n, key, allKeys || isMove);
+ }
+ finally
+ {
+ if (needLock) lockManager.unlock(n, lockOwner);
+ }
- boolean mustLoad = mustLoad(n, key, allKeys || isMove);
if (trace)
{
log.trace("load element " + fqn + " mustLoad=" + mustLoad);
Modified: core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java 2008-05-29 13:02:52 UTC (rev 5914)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java 2008-05-29 13:51:25 UTC (rev 5915)
@@ -47,6 +47,16 @@
boolean lock(Fqn fqn, LockType lockType, Object owner, long timeout);
/**
+ * As {@link #lock(org.jboss.cache.Fqn, LockType, Object)} except that a NodeSPI is passed in instead of an Fqn.
+ *
+ * @param node node to lock
+ * @param lockType type of lock to acquire
+ * @param owner owner to acquire the lock for
+ * @return true if the lock was acquired, false otherwise.
+ */
+ boolean lock(NodeSPI node, LockType lockType, Object owner);
+
+ /**
* As {@link #lock(org.jboss.cache.Fqn, LockType, Object, long)} except that a NodeSPI is passed in instead of an Fqn.
*
* @param node node to lock
@@ -233,6 +243,15 @@
boolean ownsLock(Fqn fqn, Object owner);
/**
+ * Tests whether a given owner owns any sort of lock on a particular Fqn.
+ *
+ * @param node to test
+ * @param owner owner
+ * @return true if the owner does own the specified lock type on the specified node, false otherwise.
+ */
+ boolean ownsLock(NodeSPI node, Object owner);
+
+ /**
* Returns true if the node is locked (either for reading or writing) by anyone, and false otherwise.
*
* @param n node to inspect
Modified: core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java 2008-05-29 13:02:52 UTC (rev 5914)
+++ core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java 2008-05-29 13:51:25 UTC (rev 5915)
@@ -93,6 +93,11 @@
return acquireLock(fqn, lockType, owner, timeout) != null;
}
+ public boolean lock(NodeSPI node, LockType lockType, Object owner)
+ {
+ return acquireLock(node, lockType, owner, lockAcquisitionTimeout) != null;
+ }
+
public boolean lock(NodeSPI node, LockType lockType, Object owner, long timeout)
{
return acquireLock(node, lockType, owner, timeout) != null;
@@ -156,6 +161,7 @@
public void unlock(NodeSPI node, Object owner)
{
+ if (node == null) return;
unlock(node.getLock(), owner);
}
@@ -255,7 +261,11 @@
public boolean ownsLock(Fqn fqn, Object owner)
{
- NodeSPI node = dataContainer.peek(fqn);
+ return ownsLock(dataContainer.peek(fqn), owner);
+ }
+
+ public boolean ownsLock(NodeSPI node, Object owner)
+ {
return node != null && node.getLock().isOwner(owner);
}
Added: core/trunk/src/test/java/org/jboss/cache/loader/ConcurrentPutRemoveEvictTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/ConcurrentPutRemoveEvictTest.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/loader/ConcurrentPutRemoveEvictTest.java 2008-05-29 13:51:25 UTC (rev 5915)
@@ -0,0 +1,131 @@
+package org.jboss.cache.loader;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * To test JBCACHE-1355
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.2.0
+ */
+@Test(groups = "functional")
+public class ConcurrentPutRemoveEvictTest extends AbstractCacheLoaderTestBase
+{
+ Cache<String, String> cache;
+ Fqn fqn = Fqn.fromString("/a");
+ String key = "key";
+ boolean run = true;
+ Set<Exception> exceptions = new HashSet<Exception>();
+
+ @BeforeTest
+ public void setUp() throws Exception
+ {
+ CacheLoaderConfig cacheLoaderConfig = getSingleCacheLoaderConfig("", DummyInMemoryCacheLoader.class.getName(), "", false, false, false);
+ Configuration cfg = new Configuration();
+ cfg.setCacheLoaderConfig(cacheLoaderConfig);
+ cache = new DefaultCacheFactory<String, String>().createCache(cfg);
+ cache.put(fqn, key, "value");
+ }
+
+ @AfterTest
+ public void tearDown()
+ {
+ TestingUtil.killCaches(cache);
+ }
+
+ public void doTest() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>();
+
+ threads.add(new Getter());
+ threads.add(new RandomAdder());
+ threads.add(new Evicter());
+
+ for (Thread t : threads) t.start();
+
+ // let these run for a while.
+ TestingUtil.sleepThread(10000);
+
+ run = false;
+
+ for (Thread t : threads) t.join();
+
+ if (!exceptions.isEmpty())
+ {
+ for (Exception e : exceptions) throw e;
+ }
+ }
+
+ private class RandomAdder extends Thread
+ {
+ public void run()
+ {
+ int i = 0;
+ while (run)
+ {
+ try
+ {
+ cache.put(fqn, key + (i++), "");
+ }
+ catch (Exception e)
+ {
+ // ignore
+ }
+ }
+ }
+ }
+
+
+ private class Getter extends Thread
+ {
+ public void run()
+ {
+ while (run)
+ {
+ try
+ {
+ // note that we sometimes get a null back. This is incorrect and inconsistent, but has to do with locks being held
+ // on nodes. Very similar to http://jira.jboss.org/jira/browse/JBCACHE-1165
+ String value = cache.get(fqn, key);
+ System.out.println("Thread " + getName() + " got value " + value);
+ }
+ catch (Exception e)
+ {
+ exceptions.add(e);
+ }
+ }
+ }
+ }
+
+ private class Evicter extends Thread
+ {
+ public void run()
+ {
+ while (run)
+ {
+ try
+ {
+ cache.evict(fqn);
+ }
+ catch (Exception e)
+ {
+ // who cares
+ }
+ }
+ }
+ }
+
+}
16 years, 7 months
JBoss Cache SVN: r5914 - core/trunk/src/main/java/org/jboss/cache/config.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-29 09:02:52 -0400 (Thu, 29 May 2008)
New Revision: 5914
Modified:
core/trunk/src/main/java/org/jboss/cache/config/PluggableConfigurationComponent.java
Log:
Test for empty or null class names
Modified: core/trunk/src/main/java/org/jboss/cache/config/PluggableConfigurationComponent.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/PluggableConfigurationComponent.java 2008-05-29 12:44:10 UTC (rev 5913)
+++ core/trunk/src/main/java/org/jboss/cache/config/PluggableConfigurationComponent.java 2008-05-29 13:02:52 UTC (rev 5914)
@@ -24,6 +24,7 @@
public void setClassName(String className)
{
+ if (className == null || className.length() == 0) return;
testImmutability("className");
this.className = className;
}
16 years, 7 months
JBoss Cache SVN: r5913 - core/trunk/src/main/java/org/jboss/cache/lock.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-29 08:44:10 -0400 (Thu, 29 May 2008)
New Revision: 5913
Modified:
core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java
Log:
Querying locks should happen on all nodes, even deleted and invalid ones.
Modified: core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java 2008-05-29 12:43:40 UTC (rev 5912)
+++ core/trunk/src/main/java/org/jboss/cache/lock/NodeBasedLockManager.java 2008-05-29 12:44:10 UTC (rev 5913)
@@ -238,7 +238,9 @@
public boolean ownsLock(Fqn fqn, LockType lockType, Object owner)
{
- NodeLock lock = dataContainer.peek(fqn).getLock();
+ NodeSPI n = dataContainer.peek(fqn, true, true);
+ if (n == null) return false;
+ NodeLock lock = n.getLock();
switch (lockType)
{
case READ:
16 years, 7 months
JBoss Cache SVN: r5912 - core/trunk/src/test/java/org/jboss/cache/transaction.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-29 08:43:40 -0400 (Thu, 29 May 2008)
New Revision: 5912
Modified:
core/trunk/src/test/java/org/jboss/cache/transaction/TransactionTest.java
Log:
Removed unnecessary try-catch blocks
Modified: core/trunk/src/test/java/org/jboss/cache/transaction/TransactionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/TransactionTest.java 2008-05-29 12:35:27 UTC (rev 5911)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/TransactionTest.java 2008-05-29 12:43:40 UTC (rev 5912)
@@ -19,8 +19,8 @@
import org.jboss.cache.lock.LockManager;
import static org.jboss.cache.lock.LockType.READ;
import static org.jboss.cache.lock.LockType.WRITE;
-import org.jboss.cache.util.TestingUtil;
import org.jboss.cache.util.CachePrinter;
+import org.jboss.cache.util.TestingUtil;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -366,44 +366,30 @@
assertLocked(gtx, "/a/b/c", true);
}
- public void testNodeRemoval()
+ public void testNodeRemoval() throws SystemException, NotSupportedException
{
GlobalTransaction gtx;
- try
- {
- cache.put("/a/b/c", null);
- tx.begin();
- gtx = cache.getCurrentTransaction();
- cache.removeNode("/a/b/c");// need to remove the node, not just the data in the node.
- assertLocked(gtx, "/a", false);
- assertLocked(gtx, "/a/b", true);
- assertLocked(gtx, "/a/b/c", true);
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- fail(t.toString());
- }
+ cache.put("/a/b/c", null);
+ tx.begin();
+ gtx = cache.getCurrentTransaction();
+ cache.removeNode("/a/b/c");// need to remove the node, not just the data in the node.
+ assertLocked(gtx, "/a", false);
+ assertLocked(gtx, "/a/b", true);
+ assertLocked(gtx, "/a/b/c", true);
+ tx.rollback();
}
- public void testNodeRemoval2()
+ public void testNodeRemoval2() throws SystemException, NotSupportedException
{
GlobalTransaction gtx;
- try
- {
- cache.put("/a/b/c", null);
- tx.begin();
- gtx = cache.getCurrentTransaction();
- cache.removeNode("/a/b");// need to remove the node, not just the data in the node.
- assertLocked(gtx, "/a", true);
- assertLocked(gtx, "/a/b", true);
- assertLocked(gtx, "/a/b/c", true);
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- fail(t.toString());
- }
+ cache.put("/a/b/c", null);
+ tx.begin();
+ gtx = cache.getCurrentTransaction();
+ cache.removeNode("/a/b");// need to remove the node, not just the data in the node.
+ assertLocked(gtx, "/a", true);
+ assertLocked(gtx, "/a/b", true);
+ assertLocked(gtx, "/a/b/c", true);
+ tx.rollback();
}
public void testIntermediateNodeCreationOnWrite() throws Exception
@@ -417,7 +403,6 @@
assertLocked(gtx, "/a/b", true);
assertLocked(gtx, "/a/b/c", true);
tx.rollback();
-
}
public void testIntermediateNodeCreationOnRead() throws Exception
@@ -537,11 +522,11 @@
assertTrue("node " + fqn + " is not locked", lockManager.isLocked(n));
if (writeLocked)
{
- assertTrue("node " + fqn + " is not write-locked by owner " + owner, lockManager.ownsLock(Fqn.fromString(fqn), WRITE, owner));
+ assertTrue("node " + fqn + " is not write-locked by owner " + owner + ". Lock details: " + lockManager.printLockInfo(n), lockManager.ownsLock(Fqn.fromString(fqn), WRITE, owner));
}
else
{
- assertTrue("node " + fqn + " is not read-locked by owner " + owner, lockManager.ownsLock(Fqn.fromString(fqn), READ, owner));
+ assertTrue("node " + fqn + " is not read-locked by owner " + owner + ". Lock details: " + lockManager.printLockInfo(n), lockManager.ownsLock(Fqn.fromString(fqn), READ, owner));
}
}
16 years, 7 months
JBoss Cache SVN: r5911 - core/trunk/src/test/java/org/jboss/cache/util/internals.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-29 08:35:27 -0400 (Thu, 29 May 2008)
New Revision: 5911
Modified:
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Log:
Fixed typos
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-05-29 12:20:13 UTC (rev 5910)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-05-29 12:35:27 UTC (rev 5911)
@@ -58,9 +58,9 @@
{
RpcDispatcher.Marshaller marshaller;
- private MarshallerDelegate(RpcDispatcher.Marshaller maeshaller)
+ private MarshallerDelegate(RpcDispatcher.Marshaller marshaller)
{
- this.marshaller = maeshaller;
+ this.marshaller = marshaller;
}
public byte[] objectToByteBuffer(Object obj) throws Exception
@@ -68,9 +68,9 @@
return marshaller.objectToByteBuffer(obj);
}
- public Object objectFromByteBuffer(byte abyte0[]) throws Exception
+ public Object objectFromByteBuffer(byte bytes[]) throws Exception
{
- Object result = marshaller.objectFromByteBuffer(abyte0);
+ Object result = marshaller.objectFromByteBuffer(bytes);
System.out.println("Received result = " + result);
if (result instanceof ReplicateCommand && expectedCommands != null)
{
16 years, 7 months