[infinispan-commits] Infinispan SVN: r159 - trunk/core/src/test/java/org/infinispan/test.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Apr 22 06:51:44 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-04-22 06:51:44 -0400 (Wed, 22 Apr 2009)
New Revision: 159

Modified:
   trunk/core/src/test/java/org/infinispan/test/ReplListener.java
Log:
Ability to record local events

Modified: trunk/core/src/test/java/org/infinispan/test/ReplListener.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/ReplListener.java	2009-04-22 09:51:08 UTC (rev 158)
+++ trunk/core/src/test/java/org/infinispan/test/ReplListener.java	2009-04-22 10:51:44 UTC (rev 159)
@@ -13,6 +13,8 @@
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * A listener that listens for replication events on a cache it is watching.  Typical usage: <code> ReplListener r =
@@ -21,8 +23,10 @@
 public class ReplListener {
    Cache<?, ?> c;
    volatile Set<Class<? extends VisitableCommand>> expectedCommands;
-   Set<Class<? extends VisitableCommand>> eagerCommands;
+   Set<Class<? extends VisitableCommand>> eagerCommands = new HashSet<Class<? extends VisitableCommand>>();
    boolean recordCommandsEagerly;
+   boolean watchLocal;
+   Lock eagerCommandsLock = new ReentrantLock();
    CountDownLatch latch = new CountDownLatch(1);
 
    /**
@@ -48,8 +52,21 @@
     * @param recordCommandsEagerly whether to record commands eagerly
     */
    public ReplListener(Cache<?, ?> c, boolean recordCommandsEagerly) {
+      this(c, recordCommandsEagerly, false);
+   }
+
+   /**
+    * Same as {@link #ReplListener(org.infinispan.Cache, boolean)} except that this constructor allows you to set the
+    * watchLocal parameter.  If true, even local events are recorded (not just ones that originate remotely).
+    *
+    * @param c                     cache on which to attach listener
+    * @param recordCommandsEagerly whether to record commands eagerly
+    * @param watchLocal            if true, local events are watched for as well
+    */
+   public ReplListener(Cache<?, ?> c, boolean recordCommandsEagerly, boolean watchLocal) {
       this.c = c;
       this.recordCommandsEagerly = recordCommandsEagerly;
+      this.watchLocal = watchLocal;
       this.c.getAdvancedCache().addInterceptor(new ReplListenerInterceptor(), 1);
    }
 
@@ -96,7 +113,15 @@
       }
       this.expectedCommands.addAll(Arrays.asList(expectedCommands));
 
-      if (recordCommandsEagerly) this.expectedCommands.removeAll(eagerCommands);
+      if (recordCommandsEagerly) {
+         eagerCommandsLock.lock();
+         try {
+            this.expectedCommands.removeAll(eagerCommands);
+            eagerCommands.clear();
+         } finally {
+            eagerCommandsLock.unlock();
+         }
+      }
    }
 
    /**
@@ -113,8 +138,8 @@
    public void waitForRpc(long time, TimeUnit unit) {
       assert expectedCommands != null : "there are no replication expectations; please use ReplListener.expect() before calling this method";
       try {
-         if (!latch.await(time, unit)) {
-            assert false : "Waiting for more than " + time + " " + unit + " and following commands did not replicate: " + expectedCommands;
+         if (!expectedCommands.isEmpty() && !latch.await(time, unit)) {
+            assert false : "Waiting for more than " + time + " " + unit + " and following commands did not replicate: " + expectedCommands + " on cache [" + c.getCacheManager().getAddress() + "]";
          }
       }
       catch (InterruptedException e) {
@@ -135,7 +160,7 @@
       protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
          // first pass up chain
          Object o = invokeNextInterceptor(ctx, cmd);
-         if (!ctx.isOriginLocal()) markAsVisited(cmd);
+         if (!ctx.isOriginLocal() || watchLocal) markAsVisited(cmd);
          return o;
       }
 
@@ -143,7 +168,7 @@
       public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand cmd) throws Throwable {
          // first pass up chain
          Object o = invokeNextInterceptor(ctx, cmd);
-         if (!ctx.isOriginLocal()) {
+         if (!ctx.isOriginLocal() || watchLocal) {
             markAsVisited(cmd);
             for (WriteCommand mod : cmd.getModifications()) markAsVisited(mod);
          }
@@ -156,7 +181,12 @@
             if (expectedCommands.isEmpty()) latch.countDown();
          } else {
             if (recordCommandsEagerly) {
-               eagerCommands.add(cmd.getClass());
+               eagerCommandsLock.lock();
+               try {
+                  eagerCommands.add(cmd.getClass());
+               } finally {
+                  eagerCommandsLock.unlock();
+               }
             } else {
                System.out.println("Received unexpected command: " + cmd);
             }




More information about the infinispan-commits mailing list