[infinispan-commits] Infinispan SVN: r2572 - in branches/4.2.x/core/src: test/java/org/infinispan/replication and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Oct 22 07:51:26 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-10-22 07:51:25 -0400 (Fri, 22 Oct 2010)
New Revision: 2572

Added:
   branches/4.2.x/core/src/test/java/org/infinispan/replication/ConcurrentFlushReplQueueTest.java
Modified:
   branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java
Log:
ISPN-691 - ReplicationQueue has an out-of-order issue - Synchronized flush.

Modified: branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java	2010-10-22 10:58:09 UTC (rev 2571)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java	2010-10-22 11:51:25 UTC (rev 2572)
@@ -112,9 +112,8 @@
    }
 
    @Override
-   public int flush() {
-      List<ReplicableCommand> toReplicate = new LinkedList<ReplicableCommand>();
-      elements.drainTo(toReplicate);
+   public synchronized int flush() {
+      List<ReplicableCommand> toReplicate = drainReplQueue();
       if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", toReplicate.size());
 
       int toReplicateSize = toReplicate.size();
@@ -133,6 +132,12 @@
       return toReplicateSize;
    }
 
+   protected List<ReplicableCommand> drainReplQueue() {
+      List<ReplicableCommand> toReplicate = new LinkedList<ReplicableCommand>();
+      elements.drainTo(toReplicate);
+      return toReplicate;
+   }
+
    @Override
    public int getElementsCount() {
       return elements.size();

Added: branches/4.2.x/core/src/test/java/org/infinispan/replication/ConcurrentFlushReplQueueTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/replication/ConcurrentFlushReplQueueTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/replication/ConcurrentFlushReplQueueTest.java	2010-10-22 11:51:25 UTC (rev 2572)
@@ -0,0 +1,93 @@
+package org.infinispan.replication;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.remoting.ReplicationQueueImpl;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Verifies that concurrent flushes are handled properly. These can occur when both flushes due to queue max size
+ * being exceeded and interval based queue flushes occur at exactly the same time. The test verifies that order of
+ * operations is guaranteed under these circumstances.
+ *
+ * @author Galder Zamarreño
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "replication.ConcurrentFlushReplQueueTest")
+public class ConcurrentFlushReplQueueTest extends MultipleCacheManagersTest {
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration cfg = new Configuration();
+      cfg.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+      cfg.setUseReplQueue(true);
+      cfg.setReplQueueInterval(100);
+      cfg.setReplQueueMaxElements(2);
+      cfg.setReplQueueClass(MockReplQueue.class.getName());
+      CacheContainer first = TestCacheManagerFactory.createCacheManager(GlobalConfiguration.getClusteredDefault(), cfg);
+      CacheContainer second = TestCacheManagerFactory.createCacheManager(GlobalConfiguration.getClusteredDefault(), cfg);
+      registerCacheManager(first, second);
+   }
+
+   public void testConcurrentFlush(Method m) throws Exception {
+      Cache cache1 = cache(0);
+      Cache cache2 = cache(1);
+      CountDownLatch intervalFlushLatch = new CountDownLatch(1);
+      CountDownLatch secondPutLatch = new CountDownLatch(1);
+      CountDownLatch removeCompletedLatch = new CountDownLatch(1);
+      MockReplQueue.intervalFlushLatch = intervalFlushLatch;
+      MockReplQueue.secondPutLatch = secondPutLatch;
+      MockReplQueue.removeCompletedLatch = removeCompletedLatch; 
+      final String k = "k-" + m.getName();
+      final String v = "v-" + m.getName();
+      cache1.put(k, v);
+      // Wait for periodic repl queue task to try repl the single modification
+      secondPutLatch.await();
+      // Put something random so that after remove call, the element number exceeds
+      cache1.put("k-blah","v-blah");
+      cache1.remove(k);
+      // Wait for remove to go over draining the queue
+      removeCompletedLatch.await();
+      // Once remove executed, now let the interval flush continue
+      intervalFlushLatch.countDown();
+      // Wait for periodic flush to send modifications over the wire
+      TestingUtil.sleepThread(500);
+      assert !cache2.containsKey(k);
+   }
+
+   public static class MockReplQueue extends ReplicationQueueImpl {
+      static CountDownLatch intervalFlushLatch;
+      static CountDownLatch secondPutLatch;
+      static CountDownLatch removeCompletedLatch;
+
+      @Override
+      protected List<ReplicableCommand> drainReplQueue() {
+         List<ReplicableCommand> drained = super.drainReplQueue();
+         try {
+            if (drained.size() > 0 && Thread.currentThread().getName().startsWith("Scheduled-")) {
+               secondPutLatch.countDown();
+                // Wait a max of 5 seconds, because if a remove could have gone through,
+               // it would have done it in that time. If it hasn't and the test passes,
+               // it means that correct synchronization is in place.
+               intervalFlushLatch.await(5, TimeUnit.SECONDS);
+            } else if (drained.size() > 0) {
+               removeCompletedLatch.countDown();
+            }
+         } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+         }
+         return drained;
+      }
+   }
+}



More information about the infinispan-commits mailing list