[jbosscache-commits] JBoss Cache SVN: r5919 - in core/trunk/src: main/java/org/jboss/cache/interceptors and 11 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri May 30 10:27:41 EDT 2008


Author: mircea.markus
Date: 2008-05-30 10:27:41 -0400 (Fri, 30 May 2008)
New Revision: 5919

Modified:
   core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java
   core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
   core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
   core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
   core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java
   core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java
   core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java
   core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java
   core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java
   core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
   core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java
Log:
JBCACHE-1338 - removed some Thread.sleep() statements and finalized the ReplicationListener - now it also works with region based marshalling

Modified: core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -287,10 +287,10 @@
     */
    protected void processAddedNodes(EvictedEventNode evictedEventNode) throws EvictionException
    {
-      processAddedNodes(evictedEventNode, evictedEventNode.getElementDifference(), evictedEventNode.isResetElementCount());
+      processAddedNodes(evictedEventNode, evictedEventNode.getElementDifference());
    }
 
-   protected void processAddedNodes(EvictedEventNode evictedEventNode, int numAddedElements, boolean resetElementCount) throws EvictionException
+   protected void processAddedNodes(EvictedEventNode evictedEventNode, int numAddedElements) throws EvictionException
    {
       Fqn fqn = evictedEventNode.getFqn();
 
@@ -304,14 +304,7 @@
       {
          ne.setModifiedTimeStamp(evictedEventNode.getCreationTimestamp());
          ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1);
-         if (resetElementCount)
-         {
-            ne.setNumberOfElements(numAddedElements);
-         }
-         else
-         {
-            ne.setNumberOfElements(ne.getNumberOfElements() + numAddedElements);
-         }
+         ne.setNumberOfElements(ne.getNumberOfElements() + numAddedElements);
          if (trace)
          {
             log.trace("Queue already contains " + ne.getFqn() + " processing it as visited");
@@ -410,7 +403,7 @@
          {
             log.debug("Visiting node that was not added to eviction queues. Assuming that it has 1 element.");
          }
-         this.processAddedNodes(evictedEventNode, 1, false);
+         this.processAddedNodes(evictedEventNode, 1);
          return;
       }
       // note this method will visit and modify the node statistics by reference!
@@ -451,7 +444,7 @@
          {
             log.trace("Adding element " + fqn + " for a node that doesn't exist yet. Process as an add.");
          }
-         this.processAddedNodes(evictedEventNode, 1, false);
+         this.processAddedNodes(evictedEventNode, 1);
          return;
       }
 

Modified: core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/EvictedEventNode.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -18,9 +18,8 @@
 public class EvictedEventNode implements Cloneable
 {
    private Fqn fqn_;
-   private NodeEventType type_;
-   private int elementDifference_;
-   private boolean resetElementCount_;
+   private NodeEventType type;
+   private int elementDifference;
 
    private long inUseTimeout;
    private long creationTimestamp;
@@ -53,24 +52,14 @@
       this.inUseTimeout = inUseTimeout;
    }
 
-   public boolean isResetElementCount()
-   {
-      return this.resetElementCount_;
-   }
-
-   public void setResetElementCount(boolean resetElementCount)
-   {
-      this.resetElementCount_ = resetElementCount;
-   }
-
    public int getElementDifference()
    {
-      return elementDifference_;
+      return elementDifference;
    }
 
    public void setElementDifference(int elementDifference_)
    {
-      this.elementDifference_ = elementDifference_;
+      this.elementDifference = elementDifference_;
    }
 
    public Fqn getFqn()
@@ -85,18 +74,18 @@
 
    public void setEventType(NodeEventType event)
    {
-      type_ = event;
+      type = event;
    }
 
    public NodeEventType getEventType()
    {
-      return type_;
+      return type;
    }
 
    @Override
    public String toString()
    {
-      return "EvictedEN[fqn=" + fqn_ + " event=" + type_ + " diff=" + elementDifference_ + "]";
+      return "EvictedEN[fqn=" + fqn_ + " event=" + type + " diff=" + elementDifference + "]";
    }
 
    public EvictedEventNode clone(Fqn cloneFqn)

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -116,7 +116,6 @@
                size = command.getData().size();
             }
             EvictedEventNode event = new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT, size);
-            event.setResetElementCount(false);
             registerEvictionEventToRegionManager(event, r);
          }
       }

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/RegionalizedMethodCall.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -14,8 +14,8 @@
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
  * @since 2.1.1
  */
-class RegionalizedMethodCall
+public class RegionalizedMethodCall
 {
-   ReplicableCommand command;
-   Fqn region;
+   public ReplicableCommand command;
+   public Fqn region;
 }

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyAssignmentStateTransferTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -41,7 +41,7 @@
       caches.add(createCache(1, "TEST", false, true));
 
       TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
-      TestingUtil.sleepThread(getSleepTimeout());
+//      TestingUtil.sleepThread(getSleepTimeout());
 
       Fqn<String> test = BuddyFqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(), main);
 
@@ -50,17 +50,18 @@
       caches.add(createCache(1, "TEST", false, true));
 
       TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
-      TestingUtil.sleepThread(getSleepTimeout());
+//      TestingUtil.sleepThread(getSleepTimeout());
 
       assertNull("State not transferred", caches.get(2).get(test, "name"));
 
       // Make 2 the buddy of 0
       caches.get(1).stop();
-      caches.set(1, null);
+      caches.remove(1);
+      TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
 
-      TestingUtil.sleepThread(getSleepTimeout());
+//      TestingUtil.sleepThread(getSleepTimeout());
 
-      assertEquals("State transferred", "Joe", caches.get(2).get(test, "name"));
+      assertEquals("State transferred", "Joe", caches.get(1).get(test, "name"));
    }
 
    public void testRegionBasedStateTransfer() throws Exception
@@ -88,7 +89,7 @@
       caches.get(3).start();
 
       TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
-      TestingUtil.sleepThread(getSleepTimeout());
+//      TestingUtil.sleepThread(getSleepTimeout());
 
       Fqn fqnA = Fqn.fromString("/a");
       Fqn fqnD = Fqn.fromString("/d");
@@ -160,7 +161,7 @@
       caches.get(1).start();
 
       TestingUtil.blockUntilViewsReceived(caches.toArray(new CacheSPI[0]), VIEW_BLOCK_TIMEOUT);
-      TestingUtil.sleepThread(getSleepTimeout());
+//      TestingUtil.sleepThread(getSleepTimeout());
 
       Fqn test = BuddyFqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(), main);
 

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyBackupActivationInactivationTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -58,7 +58,7 @@
       c1.activate();
       cache1.put(A_B, "name", JOE);
 
-      TestingUtil.sleepThread(getSleepTimeout());
+//      TestingUtil.sleepThread(getSleepTimeout());
 
       System.out.println("Cache dump BEFORE activation");
       System.out.println("cache1 " + CachePrinter.printCacheDetails(cache1));
@@ -95,7 +95,7 @@
 
       cache1.put(A_B, "name", JOE);
 
-      TestingUtil.sleepThread(getSleepTimeout());
+//      TestingUtil.sleepThread(getSleepTimeout());
       assertNull("Should be no replication to inactive region", cache2.get(A_B, "name"));
 
       assertNull("Should be no replication to inactive backup region", cache2.get(backupFqn, "name"));

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -47,7 +47,7 @@
       caches = createCaches(2, 3, false);
 
       TestingUtil.blockUntilViewsReceived(5000, caches.toArray(new Cache[0]));
-      TestingUtil.sleepThread(2000);
+//      TestingUtil.sleepThread(2000);
 
       System.out.println("*** Testing cache 0");
       assertIsBuddy(caches.get(0), caches.get(1), false);

Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -6,6 +6,7 @@
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.RPCManager;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationQueueNotifier;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.ComponentRegistry;
@@ -102,7 +103,6 @@
 
       assert totalInvocations % COUNT == 0 : "NumThreads and NumLoopsPerThread must multiply to be a multiple of COUNT";
 
-      int expectedReplications = totalInvocations / COUNT;
       final CountDownLatch latch = new CountDownLatch(1);
 
       // mock the RPCManager used in the cache
@@ -148,7 +148,9 @@
       // now test results
       verify(mockRpcManager);
 
-      TestingUtil.sleepThread(250); // make sure the queue flushes
+      ReplicationQueueNotifier notifier = new ReplicationQueueNotifier(cache);
+      notifier.waitUntillAllReplicated(250);
+//      TestingUtil.sleepThread(250); // make sure the queue flushes
 
       assert replQ.elements.size() == 0;
    }
@@ -161,7 +163,9 @@
          cache.put("/a/b/c" + i, "key", "value");
          assertNotNull(cache.get("/a/b/c" + i, "key"));
       }
-      TestingUtil.sleepThread(500);
+      ReplicationQueueNotifier notifier = new ReplicationQueueNotifier(cache);
+      notifier.waitUntillAllReplicated(500);
+//      TestingUtil.sleepThread(500);
       for (int i = 0; i < COUNT; i++) assertNotNull("on get i = " + i, cache2.get("/a/b/c" + i, "key"));
    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -3,6 +3,7 @@
 import org.jboss.cache.Cache;
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationQueueNotifier;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.testng.annotations.AfterMethod;
@@ -42,7 +43,8 @@
       // outside of tx scope
       cache.put("/a", "k", "v");
 
-      TestingUtil.sleepThread(200);
+      ReplicationQueueNotifier replicationQueueNotifier = new ReplicationQueueNotifier(cache);
+      replicationQueueNotifier.waitUntillAllReplicated(200);
 
       assert cache2.get("/a", "k").equals("v");
 
@@ -51,8 +53,10 @@
       cache2.put("/a", "k", "v2");
       txManager.commit();
 
-      TestingUtil.sleepThread(200);
+      ReplicationQueueNotifier replicationQueueNotifier2 = new ReplicationQueueNotifier(cache2);
+      replicationQueueNotifier2.waitUntillAllReplicated(200);
 
+
       assert cache.get("/a", "k").equals("v2");
    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/commands/write/PutDataMapCommandTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -54,7 +54,9 @@
       dataMap.put("k2", "v2");
       Map expected = new HashMap(dataMap);
       expected.putAll(node.getDataDirect());
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(testFqn, true, NodeModifiedEvent.ModificationType.PUT_MAP, node.getData(), null);
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(testFqn, false, NodeModifiedEvent.ModificationType.PUT_MAP, expected, null);
       
       control.replay();

Modified: core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/commands/write/PutKeyValueCommandTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -44,9 +44,11 @@
    {
       nodes.adfNode.put("existingKey", "existingValue");
       expect(container.peekStrict(globalTransaction, fqn, false)).andReturn(nodes.adfNode);
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.PUT_DATA, nodes.adfNode.getDataDirect(), ctx);
       Map expected = new HashMap();
       expected.put("k", "v");
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.PUT_DATA, expected, ctx);
       control.replay();
       assert null == command.perform(ctx) : "no pre existing value";
@@ -68,9 +70,11 @@
    {
       nodes.adfNode.put("k", "oldValue");
       expect(container.peekStrict(globalTransaction, fqn, false)).andReturn(nodes.adfNode);
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.PUT_DATA, nodes.adfNode.getDataDirect(), ctx);
       Map expected = new HashMap();
       expected.put("k", "v");
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.PUT_DATA, expected, ctx);
       control.replay();
       assert "oldValue".equals(command.perform(ctx)) : "no pre existing value";

Modified: core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/commands/write/RemoveKeyCommandTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -41,9 +41,11 @@
       expected.put("newKey","newValue");
       nodes.adfgNode.putAll(expected);
       expect(container.peek(fqn, false, false)).andReturn(nodes.adfgNode);
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
       expected = new HashMap();
       expected.put(key,null);
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
       control.replay();
       assert null == command.perform(ctx);
@@ -66,7 +68,9 @@
       expected.put(key,"newValue");
       nodes.adfgNode.putAll(expected);
       expect(container.peek(fqn, false, false)).andReturn(nodes.adfgNode);
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
+      expect(notifier.shouldNotifyOnNodeModified()).andReturn(true);
       notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.REMOVE_DATA, expected, ctx);
       control.replay();
       assert "newValue" == command.perform(ctx);

Modified: core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/eviction/FIFOPolicyTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -10,6 +10,7 @@
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.EvictionController;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.EvictionConfig;
 import org.jboss.cache.config.EvictionRegionConfig;
@@ -105,8 +106,9 @@
             e.printStackTrace();
          }
       }
+      EvictionController evictionController = new EvictionController(cache);
+      evictionController.startEviction();
 
-      TestingUtil.sleepThread(wakeupIntervalMillis + 500);
       try
       {
          String val = (String) cache.get(rootStr + "3", rootStr + "3");
@@ -136,7 +138,8 @@
          cache.put(fqn, str, str);
       }
 
-      TestingUtil.sleepThread(wakeupIntervalMillis + 10000);
+      EvictionController evictionController = new EvictionController(cache);
+      evictionController.startEviction();
       assertEquals("Number of nodes", maxNodes + 2, cache.getNumberOfNodes());
       for (int i = 0; i < maxNodes; i++)
       {
@@ -181,7 +184,8 @@
       int period = wakeupIntervalMillis + 500;
 
       log("sleeping for " + period + "ms");
-      TestingUtil.sleepThread(period);// it really depends the eviction thread time.
+      EvictionController evictionController = new EvictionController(cache);
+      evictionController.startEviction();
       try
       {
          for (int i = 0; i < 5; i++)
@@ -197,7 +201,7 @@
             assertNotNull(cache.get(fqn, str));
          }
 
-         TestingUtil.sleepThread(period);
+         evictionController.startEviction();
          // since it is FIFO if we leave it alone and revisit, cache should remain the same.
          for (int i = 5; i < 10; i++)
          {
@@ -235,9 +239,9 @@
          }
       }
 
-      int period = (wakeupIntervalMillis + 500);
-      log("period is " + period);
-      TestingUtil.sleepThread(period);
+      EvictionController evictionController = new EvictionController(cache);
+      evictionController.startEviction();
+
       String str1 = rootStr + "7";
       Fqn fqn1 = Fqn.fromString(str1);
       String str2 = rootStr + "7/7";
@@ -247,11 +251,11 @@
          assertNotNull(cache.get(fqn2, str2));
          assertNotNull(cache.get(fqn1, str1));
          cache.removeNode(fqn2);
-         TestingUtil.sleepThread(period);
+         evictionController.startEviction();
          assertNull(cache.get(fqn2, str2));
          assertNotNull(cache.get(fqn1, str1));
          cache.removeNode(fqn1);
-         TestingUtil.sleepThread(period);
+         evictionController.startEviction();
          assertNull(cache.get(fqn1, str1));
          assertNull(cache.get(fqn2, str2));
 
@@ -262,12 +266,12 @@
          assertNotNull(cache.get(fqn3, str3));
          assertNotNull(cache.get(fqn4, str4));
 
-         TestingUtil.sleepThread(period);
+         evictionController.startEviction();
 
          // remove the node above fqn4 /org/jboss/test/5 will cascade the delete into /org/jboss/test/5/5
          cache.removeNode(fqn4);
 
-         TestingUtil.sleepThread(period);
+         evictionController.startEviction();
          assertNull(cache.get(fqn3, str3));
          assertNull(cache.get(fqn4, str4));
       }

Modified: core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/interceptors/EvictionInterceptorTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -344,7 +344,6 @@
       PutDataMapCommand putCommand = commandsFactory.buildPutDataMapCommand(null, fqn, data);
       invoker.invoke(putCommand);
       event = regionManager.getRegion(fqn.toString(), false).takeLastEventNode();
-      assertFalse(event.isResetElementCount());
       assertEquals(NodeEventType.ADD_NODE_EVENT, event.getEventType());
       assertEquals(fqn, event.getFqn());
       assertEquals(100, event.getElementDifference());
@@ -366,7 +365,6 @@
       assertEquals(NodeEventType.ADD_NODE_EVENT, event.getEventType());
       assertEquals(fqn, event.getFqn());
       assertEquals(100, event.getElementDifference());
-      assertTrue(event.isResetElementCount());
       assertNull(regionManager.getRegion(fqn.toString(), false).takeLastEventNode());
 
 

Modified: core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -14,6 +14,7 @@
 import org.jboss.cache.Node;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
 import org.jboss.cache.config.CacheLoaderConfig;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.XmlConfigurationParser;
@@ -119,25 +120,34 @@
       cache2.start();
 
       Fqn fqn = Fqn.fromString("/a/b");
+      ReplicationListener replListener1 = new ReplicationListener(cache1);
+      ReplicationListener replListener2 = new ReplicationListener(cache2);
+
+      replListener2.expectAny();
       cache1.put(fqn, "key", "value");
-      TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+      replListener2.waitForReplicationToOccur(500);
+//      TestingUtil.sleepThread(500);// give it time to broadcast the evict call
       // test that this has NOT replicated, but rather has been invalidated:
       assertEquals("value", cache1.get(fqn, "key"));
       assertNull("Should NOT have replicated!", cache2.getNode(fqn));
 
+      replListener1.expectAny();
       // now make sure cache2 is in sync with cache1:
       cache2.put(fqn, "key", "value");
-      TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+//      TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+      replListener1.waitForReplicationToOccur(500);
 
       // since the node already exists even PL will not remove it - but will invalidate it's data
       Node n = cache1.getNode(fqn);
       assertHasBeenInvalidated(n, "Should have been invalidated");
       assertEquals("value", cache2.get(fqn, "key"));
 
+      replListener2.expectAny();
       // now test the invalidation:
       cache1.put(fqn, "key2", "value2");
       assertEquals("value2", cache1.get(fqn, "key2"));
-      TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+//      TestingUtil.sleepThread(500);// give it time to broadcast the evict call
+      replListener2.waitForReplicationToOccur(500);
 
       // since the node already exists even PL will not remove it - but will invalidate it's data
       n = cache2.getNode(fqn);

Modified: core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderWithReplicationTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -10,7 +10,10 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
 import org.jboss.cache.config.Configuration;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNull;
@@ -194,6 +197,7 @@
    public void testPessAsyncRepl() throws Exception
    {
       createCaches(false, false);
+      ReplicationListener replListener = new ReplicationListener(cache2);
 
       mgr1.begin();
       cache1.put(fqn, key, "value");
@@ -202,10 +206,11 @@
       assertNull(cache2.get(fqn, key));
       assertNull(loader1.get(fqn));
       assertNull(loader2.get(fqn));
+
+      replListener.expect(PrepareCommand.class);
       mgr1.commit();
+      replListener.waitForReplicationToOccur(500);
 
-      TestingUtil.sleepThread(500);
-
       assertEquals("value", cache1.get(fqn, key));
       assertEquals("value", cache2.get(fqn, key));
       assertEquals("value", loader1.get(fqn).get(key));
@@ -221,8 +226,6 @@
 
       mgr1.rollback();
 
-      TestingUtil.sleepThread(500);
-
       assertEquals("value", cache1.get(fqn, key));
       assertEquals("value", cache2.get(fqn, key));
       assertEquals("value", loader1.get(fqn).get(key));
@@ -266,16 +269,20 @@
    {
       createCaches(false, true);
 
+      ReplicationListener replListener = new ReplicationListener(cache2);
+      
       mgr1.begin();
+      replListener.expect(CommitCommand.class);
       cache1.put(fqn, key, "value");
 
       assertEquals("value", cache1.get(fqn, key));
       assertNull(cache2.get(fqn, key));
       assertNull(loader1.get(fqn));
       assertNull(loader2.get(fqn));
+
       mgr1.commit();
 
-      TestingUtil.sleepThread(500);
+      replListener.waitForReplicationToOccur(500);
 
       assertEquals("value", cache1.get(fqn, key));
       assertEquals("value", cache2.get(fqn, key));
@@ -291,7 +298,6 @@
       assertEquals("value", loader2.get(fqn).get(key));
 
       mgr1.rollback();
-      TestingUtil.sleepThread(500);
 
       assertEquals("value", cache1.get(fqn, key));
       assertEquals("value", cache2.get(fqn, key));

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -13,23 +13,19 @@
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Region;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.marshall.data.Address;
 import org.jboss.cache.marshall.data.Person;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import javax.transaction.*;
 import java.lang.reflect.Method;
 
 /**
@@ -48,6 +44,8 @@
    Throwable ex;
    private Fqn<String> aop = Fqn.fromString("/aop");
    protected boolean useMarshalledValues = false;
+   ReplicationListener replListener1;
+   ReplicationListener replListener2;
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception
@@ -60,6 +58,8 @@
 
       cache2 = createCache("TestCache");
 
+      replListener1 = new ReplicationListener(cache1);
+      replListener2 = new ReplicationListener(cache2);
       addr = new Address();
       addr.setCity("San Jose");
       ben = new Person();
@@ -109,11 +109,16 @@
       }
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(aop, "person", ben);
+      replListener2.waitForReplicationToOccur(500);
+
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(Fqn.fromString("/alias"), "person", ben);
+      replListener2.waitForReplicationToOccur(500);
+
       if (useMarshalledValues) resetContextClassLoader();
 
-      TestingUtil.sleepThread(1000);
       Object ben2 = null;
       // Can't cast it to Person. CCE will resutl.
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
@@ -133,9 +138,10 @@
       // Set it back to the cache
       // Can't cast it to Person. CCE will resutl.
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
+      replListener1.expect(PutKeyValueCommand.class);
       cache2.put(aop, "person", ben2);
+      replListener1.waitForReplicationToOccur(1000);
       if (useMarshalledValues) resetContextClassLoader();
-      TestingUtil.sleepThread(1000);
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
       Object ben3 = cache1.get(aop, "person");
       if (useMarshalledValues) resetContextClassLoader();
@@ -161,10 +167,16 @@
       Object scopedBen2 = getPersonFromClassloader(clb);
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(Fqn.fromString("/aop/1"), "person", ben);
+      replListener2.waitForReplicationToOccur(1000);
+
+      replListener2.expect(PutKeyValueCommand.class);
       cache1.put(Fqn.fromString("/aop/2"), "person", scopedBen1);
+      replListener2.waitForReplicationToOccur(1000);
+      
       if (useMarshalledValues) resetContextClassLoader();
-      TestingUtil.sleepThread(1000);
 
       Object ben2 = null;
       // Can't cast it to Person. CCE will resutl.
@@ -180,11 +192,11 @@
 
    public void testTxPut() throws Exception
    {
+      replListener2.expectAny();
       beginTransaction();
       cache1.put(aop, "person", ben);
-//      cache1.put(aop, "person1", ben);
       commit();
-      TestingUtil.sleepThread(1000);
+      replListener2.waitForReplicationToOccur(1000);
       Person ben2 = (Person) cache2.get(aop, "person");
       assertNotNull("Person from 2nd cache should not be null ", ben2);
       assertEquals(ben.toString(), ben2.toString());
@@ -204,12 +216,13 @@
       }
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
+      replListener2.expectAny();
       beginTransaction();
       cache1.put(aop, "person", ben);
       commit();
+      replListener2.waitForReplicationToOccur(1000);
       if (useMarshalledValues) resetContextClassLoader();
 
-      TestingUtil.sleepThread(1000);
 
       Object ben2 = null;
       // Can't cast it to Person. CCE will resutl.
@@ -229,9 +242,10 @@
       // Set it back to the cache
       // Can't cast it to Person. CCE will resutl.
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
+      replListener1.expectAny();
       cache2.put(aop, "person", ben2);
       if (useMarshalledValues) resetContextClassLoader();
-      TestingUtil.sleepThread(1000);
+      replListener1.waitForReplicationToOccur(100);
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
       Object ben3 = cache1.get(aop, "person");
       if (useMarshalledValues) resetContextClassLoader();
@@ -256,8 +270,9 @@
 
       Fqn base = Fqn.fromString("/aop");
       Fqn fqn = Fqn.fromRelativeElements(base, custom1);
+      replListener2.expectAny();
       cache1.put(fqn, "key", "value");
-      TestingUtil.sleepThread(1000);
+      replListener2.waitForReplicationToOccur(1000);
 
       Fqn fqn2 = Fqn.fromRelativeElements(base, custom2);
       Object val = cache2.get(fqn2, "key");

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/InvalidRegionForStateTransferTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -5,6 +5,7 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Region;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.testng.annotations.AfterMethod;
@@ -27,6 +28,7 @@
 public class InvalidRegionForStateTransferTest
 {
    Cache<Object, Object> c1, c2;
+   ReplicationListener replListener2;
 
    @BeforeMethod
    public void setUp() throws CloneNotSupportedException
@@ -47,6 +49,7 @@
       c1.start();
 
       c2 = new DefaultCacheFactory<Object, Object>().createCache(c1.getConfiguration().clone());
+      replListener2 = new ReplicationListener(c2);
 
       TestingUtil.blockUntilViewsReceived(60000, c1, c2);
    }
@@ -63,11 +66,12 @@
       c1.getRegion(fqn.getParent(), true).registerContextClassLoader(getClass().getClassLoader());
       c2.getRegion(fqn.getParent(), true).registerContextClassLoader(getClass().getClassLoader());
 
+      replListener2.expectAny();
       // write something; will cause a stale region to be stored in C2's cache marshaller
       c1.put(fqn, "k", "v");
       assert c1.get(fqn, "k").equals("v");
 
-      TestingUtil.sleepThread(250); // async repl
+      replListener2.waitForReplicationToOccur(250);
 
       // assert that this made it to c2
       assert c2.get(fqn, "k").equals("v");

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -294,7 +294,7 @@
       cache1.put(Fqn.fromString("/aop/2"), map);
       cache1.removeNode(Fqn.fromString("/aop/2"));
       if (useMarshalledValues) resetContextClassLoader();
-      TestingUtil.sleepThread(1000);
+//      TestingUtil.sleepThread(1000);
    }
 
    public void testTxMethodCall() throws Exception
@@ -322,7 +322,7 @@
       cache1.removeNode(Fqn.fromString("/aop/2"));
       tm.commit();
 
-      TestingUtil.sleepThread(1000);
+//      TestingUtil.sleepThread(1000);
       if (useMarshalledValues) resetContextClassLoader();
    }
 

Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncCacheTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -9,6 +9,7 @@
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.loader.SamplePojo;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
 import org.jboss.cache.transaction.TransactionSetup;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
@@ -22,12 +23,14 @@
 public class AsyncCacheTest extends AbstractOptimisticTestCase
 {
    private CacheSPI<Object, Object> cache, cache2;
+   ReplicationListener replListener2;
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception
    {
       cache = createReplicatedCache(Configuration.CacheMode.REPL_ASYNC);
       cache2 = createReplicatedCache(Configuration.CacheMode.REPL_ASYNC);
+      replListener2 = new ReplicationListener(cache2);
    }
 
    @AfterMethod(alwaysRun = true)
@@ -63,11 +66,13 @@
 
       SamplePojo pojo = new SamplePojo(21, "test");
 
+      replListener2.expectAny();
       cache.put("/one/two", "key1", pojo);
 
       //GlobalTransaction globalTransaction = cache.getCurrentTransaction(tx);
       assertNotNull(mgr.getTransaction());
       mgr.commit();
+      replListener2.waitForReplicationToOccur(1000);
 
       assertNull(mgr.getTransaction());
 
@@ -79,8 +84,6 @@
       assertTrue(cache.exists(Fqn.fromString("/one")));
       assertEquals(pojo, cache.get(Fqn.fromString("/one/two"), "key1"));
 
-      // allow changes to replicate since this is async
-      TestingUtil.sleepThread((long) 1000);
 
       assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
       assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -106,10 +109,12 @@
 
       SamplePojo pojo = new SamplePojo(21, "test");
 
+      replListener2.expectAny();
       cache.put("/one/two", "key1", pojo);
 
       assertNotNull(mgr.getTransaction());
       mgr.commit();
+      replListener2.waitForReplicationToOccur(1000);
 
       assertNull(mgr.getTransaction());
 
@@ -121,9 +126,7 @@
       assertTrue(cache.exists(Fqn.fromString("/one")));
       assertEquals(pojo, cache.get(Fqn.fromString("/one/two"), "key1"));
 
-      // let the async calls complete
-      TestingUtil.sleepThread((long) 1000);
-
+      
       assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
       assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
 

Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -2,10 +2,13 @@
 
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.loader.SamplePojo;
 import org.jboss.cache.lock.LockManager;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.ReplicationListener;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.OptimisticTransactionEntry;
 import org.jboss.cache.transaction.TransactionTable;
@@ -174,6 +177,7 @@
       groupIncreaser++;
       CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
       CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
+      ReplicationListener replListener2 = new ReplicationListener(cache2);
       LockManager lockManager = TestingUtil.extractLockManager(cache);
       LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
 
@@ -187,9 +191,11 @@
 
       SamplePojo pojo = new SamplePojo(21, "test");
 
+      replListener2.expectAny();
       cache.put("/one/two", "key1", pojo);
 
       mgr.commit();
+      replListener2.waitForReplicationToOccur(1000);
 
       // cache asserts
       assertNull(mgr.getTransaction());
@@ -208,9 +214,6 @@
       assertNotNull(cache.getNode("/one").getChild("two"));
       assertNotNull(cache.get(Fqn.fromString("/one/two"), "key1"));
 
-      // let async calls propagate
-      TestingUtil.sleepThread((long) 1000);
-
       // cache2 asserts
       assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
       assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -235,7 +238,9 @@
    {
       groupIncreaser++;
       CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
+      ReplicationListener replListener = new ReplicationListener(cache);
       CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
+      ReplicationListener replListener2 = new ReplicationListener(cache2);
       LockManager lockManager = TestingUtil.extractLockManager(cache);
       LockManager lockManager2 = TestingUtil.extractLockManager(cache2);
 
@@ -249,9 +254,11 @@
 
       SamplePojo pojo = new SamplePojo(21, "test");
 
+      replListener2.expectAny();
       cache.put("/one/two", "key1", pojo);
 
       mgr.commit();
+      replListener2.waitForReplicationToOccur(1000);
 
       // cache asserts
       assertNull(mgr.getTransaction());
@@ -270,9 +277,6 @@
       assertNotNull(cache.getNode("/one").getChild("two"));
       assertNotNull(cache.get(Fqn.fromString("/one/two"), "key1"));
 
-      // let async calls propagate
-      TestingUtil.sleepThread((long) 1000);
-
       // cache2 asserts
       assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
       assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
@@ -289,16 +293,14 @@
       assertNotNull(cache2.getNode("/one").getChild("two"));
       assertNotNull(cache2.get(Fqn.fromString("/one/two"), "key1"));
 
+      replListener2.expect(RemoveNodeCommand.class);
       cache.removeNode("/one/two");
+      replListener2.waitForReplicationToOccur(1000);
+
       assertEquals(false, cache.exists("/one/two"));
       assertEquals(null, cache.get("/one/two", "key1"));
-
-      // let async calls propagate
-      TestingUtil.sleepThread((long) 1000);
-
       assertEquals(false, cache2.exists("/one/two"));
       assertEquals(null, cache2.get("/one/two", "key1"));
-
       destroyCache(cache);
       destroyCache(cache2);
    }
@@ -307,6 +309,7 @@
    {
       groupIncreaser++;
       CacheSPI<Object, Object> cache = createAsyncReplicatedCache();
+      ReplicationListener replListener = new ReplicationListener(cache);
       CacheSPI<Object, Object> cache2 = createAsyncReplicatedCache();
       LockManager lockManager = TestingUtil.extractLockManager(cache);
 

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -48,7 +48,7 @@
    {
       try
       {
-         Method method = EvictionTimerTask.class.getDeclaredMethod("processRegions", new Class[]{null});
+         Method method = EvictionTimerTask.class.getDeclaredMethod("processRegions", new Class[]{});
          method.setAccessible(true);
          method.invoke(timerTask);
       }

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -2,11 +2,16 @@
 
 import org.jboss.cache.Cache;
 import org.jboss.cache.RPCManager;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Fqn;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.commands.remote.ReplicateCommand;
 import org.jboss.cache.commands.tx.PrepareCommand;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
+import org.jboss.cache.marshall.Marshaller;
+import org.jboss.cache.marshall.RegionalizedMethodCall;
+import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
 import org.jboss.cache.util.TestingUtil;
 import org.jgroups.blocks.RpcDispatcher;
 
@@ -16,6 +21,9 @@
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.InputStream;
 
 /**
  * Utility class that notifies when certain commands were asynchronously replicated on secondary cache.
@@ -50,9 +58,18 @@
       ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
       RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
       CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
-      RpcDispatcher.Marshaller realMarshaller = (RpcDispatcher.Marshaller) TestingUtil.extractField(RpcDispatcher.class, realDispatcher, "req_marshaller");
-      MarshallerDelegate delegate = new MarshallerDelegate(realMarshaller);
-      TestingUtil.replaceField(delegate, "req_marshaller", realDispatcher, RpcDispatcher.class);
+      if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
+      {
+         Marshaller realMarshaller = (Marshaller) TestingUtil.extractField(InactiveRegionAwareRpcDispatcher.class, realDispatcher, "requestMarshaller");
+         RegionMarshallerDelegate delegate = new RegionMarshallerDelegate(realMarshaller);
+         TestingUtil.replaceField(delegate, "requestMarshaller", realDispatcher, InactiveRegionAwareRpcDispatcher.class);
+      }
+      else
+      {
+         RpcDispatcher.Marshaller realMarshaller = (RpcDispatcher.Marshaller) TestingUtil.extractField(RpcDispatcher.class, realDispatcher, "req_marshaller");
+         MarshallerDelegate delegate = new MarshallerDelegate(realMarshaller);
+         TestingUtil.replaceField(delegate, "req_marshaller", realDispatcher, RpcDispatcher.class);
+      }
    }
 
    private class MarshallerDelegate implements RpcDispatcher.Marshaller
@@ -72,21 +89,47 @@
       public Object objectFromByteBuffer(byte bytes[]) throws Exception
       {
          Object result = marshaller.objectFromByteBuffer(bytes);
-         System.out.println("Received result = " + result);
          if (result instanceof ReplicateCommand && expectedCommands != null)
          {
             ReplicateCommand replicateCommand = (ReplicateCommand) result;
+            return new ReplicateCommandDelegate(replicateCommand);
+         }
+         return result;
+      }
+   }
+
+   /**
+    * We want the notification to be performed only *after* the remote command is executed.
+    */
+   private class ReplicateCommandDelegate extends ReplicateCommand
+   {
+      ReplicateCommand realOne;
+
+      private ReplicateCommandDelegate(ReplicateCommand realOne)
+      {
+         this.realOne = realOne;
+      }
+
+      @Override
+      public Object perform(InvocationContext ctx) throws Throwable
+      {
+         try
+         {
+            return realOne.perform(ctx);
+         }
+         finally
+         {
             Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
             while (it.hasNext())
             {
                Class<? extends ReplicableCommand> replicableCommandClass = it.next();
-               if (replicateCommand.containsCommandType(replicableCommandClass))
+               if (realOne.containsCommandType(replicableCommandClass))
                {
                   it.remove();
                }
-               else if (replicateCommand.getSingleModification() instanceof PrepareCommand) //explicit transaction
+               else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
                {
-                  PrepareCommand prepareCommand = (PrepareCommand) replicateCommand.getSingleModification();
+                  PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
                   if (prepareCommand.containsModificationType(replicableCommandClass))
                   {
                      it.remove();
@@ -98,8 +141,66 @@
                latch.countDown();
             }
          }
+      }
+   }
+
+   /**
+    * Needed for region based marshalling.
+    */
+   private class RegionMarshallerDelegate implements Marshaller
+   {
+      private Marshaller realOne;
+
+      private RegionMarshallerDelegate(Marshaller realOne)
+      {
+         this.realOne = realOne;
+      }
+
+      public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
+      {
+         realOne.objectToObjectStream(obj, out);
+      }
+
+      public Object objectFromObjectStream(ObjectInputStream in) throws Exception
+      {
+         return realOne.objectFromObjectStream(in);
+      }
+
+      public Object objectFromStream(InputStream is) throws Exception
+      {
+         return realOne.objectFromStream(is);
+      }
+
+      public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
+      {
+         realOne.objectToObjectStream(obj, out, region);
+      }
+
+      public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
+      {
+         RegionalizedMethodCall result = realOne.regionalizedMethodCallFromByteBuffer(buffer);
+         if (result.command instanceof ReplicateCommand && expectedCommands != null)
+         {
+            ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
+            result.command = new ReplicateCommandDelegate(replicateCommand);
+         }
          return result;
       }
+
+      public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
+      {
+         return realOne.regionalizedMethodCallFromObjectStream(in);
+      }
+
+      public byte[] objectToByteBuffer(Object o) throws Exception
+      {
+         return realOne.objectToByteBuffer(o);
+      }
+
+      public Object objectFromByteBuffer(byte[] bytes) throws Exception
+      {
+         return realOne.objectFromByteBuffer(bytes);
+      }
    }
 
    /**

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java	2008-05-29 15:08:09 UTC (rev 5918)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java	2008-05-30 14:27:41 UTC (rev 5919)
@@ -52,11 +52,17 @@
       }
    }
 
-   public void waitUntillAllReplicated(long timeout) throws Exception
+   public void waitUntillAllReplicated(long timeout)
    {
       synchronized (replicated)
       {
-         replicated.wait(timeout);
+         try
+         {
+            replicated.wait(timeout);
+         } catch (InterruptedException e)
+         {
+            throw new RuntimeException(e);
+         }
       }
       log("returning from waitUntillAllReplicated call");
    }




More information about the jbosscache-commits mailing list