[jbosscache-commits] JBoss Cache SVN: r5447 - in core/trunk/src: main/java/org/jboss/cache/cluster and 4 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Mar 18 20:31:19 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-03-18 20:31:19 -0400 (Tue, 18 Mar 2008)
New Revision: 5447

Added:
   core/trunk/src/main/java/org/jboss/cache/cluster/
   core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
   core/trunk/src/test/java/org/jboss/cache/cluster/
   core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
Removed:
   core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
Log:
JBCACHE-295 -  Add more unit tests for ReplicationQueue

Deleted: core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java	2008-03-19 00:21:05 UTC (rev 5446)
+++ core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java	2008-03-19 00:31:19 UTC (rev 5447)
@@ -1,177 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.cache;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.config.Configuration;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.factories.annotations.Stop;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodCallFactory;
-import org.jboss.cache.marshall.MethodDeclarations;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-
-/**
- * Periodically (or when certain size is exceeded) takes elements and replicates them.
- *
- * @author <a href="mailto:bela at jboss.org">Bela Ban</a> May 24, 2003
- * @version $Revision$
- */
-public class ReplicationQueue
-{
-
-   private static Log log = LogFactory.getLog(ReplicationQueue.class);
-
-   /**
-    * We flush every 5 seconds. Inactive if -1 or 0
-    */
-   private long interval = 5000;
-
-   /**
-    * Max elements before we flush
-    */
-   private long max_elements = 500;
-
-   /**
-    * Holds the replication jobs: LinkedList<MethodCall>
-    */
-   private final List<MethodCall> elements = new LinkedList<MethodCall>();
-
-   /**
-    * For periodical replication
-    */
-   private Timer timer = null;
-
-   /**
-    * The timer task, only calls flush() when executed by Timer
-    */
-   private MyTask task = null;
-   private RPCManager rpcManager;
-   private Configuration configuration;
-   private boolean enabled;
-
-
-   public boolean isEnabled()
-   {
-      return enabled;
-   }
-
-   @Inject
-   private void injectDependencies(RPCManager rpcManager, Configuration configuration)
-   {
-      this.rpcManager = rpcManager;
-      this.configuration = configuration;
-      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
-   }
-
-   /**
-    * Starts the asynchronous flush queue.
-    */
-   @Start
-   public synchronized void start()
-   {
-      this.interval = configuration.getReplQueueInterval();
-      this.max_elements = configuration.getReplQueueMaxElements();
-      // check again
-      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
-      if (enabled)
-      {
-         if (interval > 0)
-         {
-            if (task == null)
-               task = new MyTask();
-            if (timer == null)
-            {
-               timer = new Timer(true);
-               timer.schedule(task,
-                     500, // delay before initial flush
-                     interval); // interval between flushes
-            }
-         }
-      }
-   }
-
-   /**
-    * Stops the asynchronous flush queue.
-    */
-   @Stop
-   public synchronized void stop()
-   {
-      if (task != null)
-      {
-         task.cancel();
-         task = null;
-      }
-      if (timer != null)
-      {
-         timer.cancel();
-         timer = null;
-      }
-   }
-
-
-   /**
-    * Adds a new method call.
-    */
-   public void add(MethodCall job)
-   {
-      if (job == null)
-         throw new NullPointerException("job is null");
-      synchronized (elements)
-      {
-         elements.add(job);
-         if (elements.size() >= max_elements)
-            flush();
-      }
-   }
-
-   /**
-    * Flushes existing method calls.
-    */
-   public void flush()
-   {
-      List<MethodCall> l;
-      synchronized (elements)
-      {
-         if (log.isTraceEnabled())
-            log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")");
-         l = new ArrayList<MethodCall>(elements);
-         elements.clear();
-      }
-
-      if (l.size() > 0)
-      {
-         try
-         {
-            // send to all live nodes in the cluster
-            rpcManager.callRemoteMethods(null, MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000, false);
-         }
-         catch (Throwable t)
-         {
-            log.error("failed replicating " + l.size() + " elements in replication queue", t);
-         }
-      }
-   }
-
-   class MyTask extends TimerTask
-   {
-      public void run()
-      {
-         flush();
-      }
-   }
-
-}

Added: core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java	2008-03-19 00:31:19 UTC (rev 5447)
@@ -0,0 +1,169 @@
+package org.jboss.cache.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.factories.annotations.Stop;
+import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodCallFactory;
+import org.jboss.cache.marshall.MethodDeclarations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Periodically (or when certain size is exceeded) takes elements and replicates them.
+ *
+ * @author <a href="mailto:bela at jboss.org">Bela Ban</a> May 24, 2003
+ * @version $Revision: 5197 $
+ */
+public class ReplicationQueue
+{
+
+   private static Log log = LogFactory.getLog(ReplicationQueue.class);
+
+   /**
+    * We flush every 5 seconds. Inactive if -1 or 0
+    */
+   private long interval = 5000;
+
+   /**
+    * Max elements before we flush
+    */
+   private long max_elements = 500;
+
+   /**
+    * Holds the replication jobs: LinkedList<MethodCall>
+    */
+   final List<MethodCall> elements = new LinkedList<MethodCall>();
+
+   /**
+    * For periodical replication
+    */
+   private Timer timer = null;
+
+   /**
+    * The timer task, only calls flush() when executed by Timer
+    */
+   private ReplicationQueue.MyTask task = null;
+   private RPCManager rpcManager;
+   private Configuration configuration;
+   private boolean enabled;
+
+
+   public boolean isEnabled()
+   {
+      return enabled;
+   }
+
+   @Inject
+   private void injectDependencies(RPCManager rpcManager, Configuration configuration)
+   {
+      this.rpcManager = rpcManager;
+      this.configuration = configuration;
+      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
+   }
+
+   /**
+    * Starts the asynchronous flush queue.
+    */
+   @Start
+   public synchronized void start()
+   {
+      this.interval = configuration.getReplQueueInterval();
+      this.max_elements = configuration.getReplQueueMaxElements();
+      // check again
+      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
+      if (enabled)
+      {
+         if (interval > 0)
+         {
+            if (task == null)
+               task = new ReplicationQueue.MyTask();
+            if (timer == null)
+            {
+               timer = new Timer(true);
+               timer.schedule(task,
+                     500, // delay before initial flush
+                     interval); // interval between flushes
+            }
+         }
+      }
+   }
+
+   /**
+    * Stops the asynchronous flush queue.
+    */
+   @Stop
+   public synchronized void stop()
+   {
+      if (task != null)
+      {
+         task.cancel();
+         task = null;
+      }
+      if (timer != null)
+      {
+         timer.cancel();
+         timer = null;
+      }
+   }
+
+
+   /**
+    * Adds a new method call.
+    */
+   public void add(MethodCall job)
+   {
+      if (job == null)
+         throw new NullPointerException("job is null");
+      synchronized (elements)
+      {
+         elements.add(job);
+         if (elements.size() >= max_elements)
+            flush();
+      }
+   }
+
+   /**
+    * Flushes existing method calls.
+    */
+   public void flush()
+   {
+      List<MethodCall> l;
+      synchronized (elements)
+      {
+         if (log.isTraceEnabled())
+            log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")");
+         l = new ArrayList<MethodCall>(elements);
+         elements.clear();
+      }
+
+      if (l.size() > 0)
+      {
+         try
+         {
+            // send to all live nodes in the cluster
+            rpcManager.callRemoteMethods(null, MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000, false);
+         }
+         catch (Throwable t)
+         {
+            log.error("failed replicating " + l.size() + " elements in replication queue", t);
+         }
+      }
+   }
+
+   class MyTask extends TimerTask
+   {
+      public void run()
+      {
+         flush();
+      }
+   }
+}
\ No newline at end of file

Modified: core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java	2008-03-19 00:21:05 UTC (rev 5446)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java	2008-03-19 00:31:19 UTC (rev 5447)
@@ -1,6 +1,6 @@
 package org.jboss.cache.factories;
 
-import org.jboss.cache.ReplicationQueue;
+import org.jboss.cache.cluster.ReplicationQueue;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.annotations.DefaultFactoryFor;
 

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-03-19 00:21:05 UTC (rev 5446)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-03-19 00:31:19 UTC (rev 5447)
@@ -6,8 +6,8 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.RPCManager;
-import org.jboss.cache.ReplicationQueue;
 import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.cluster.ReplicationQueue;
 import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.config.Option;
 import org.jboss.cache.factories.annotations.Inject;

Copied: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java (from rev 5441, core/trunk/src/test/java/org/jboss/cache/replicated/ReplicationQueueTest.java)
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java	2008-03-19 00:31:19 UTC (rev 5447)
@@ -0,0 +1,165 @@
+package org.jboss.cache.cluster;
+
+import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
+import org.jboss.cache.Cache;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.misc.TestingUtil;
+import org.jgroups.Address;
+import static org.testng.AssertJUnit.assertNotNull;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+ at Test(groups = "functional")
+public class ReplicationQueueTest
+{
+   private static final int COUNT = 10;
+   Cache cache, cache2;
+   ReplicationQueue replQ;
+   ComponentRegistry registry;
+   RPCManager originalRpcManager;
+
+   @BeforeMethod
+   public void setUp() throws CloneNotSupportedException
+   {
+      cache = new DefaultCacheFactory().createCache(false);
+      cache.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+      cache.getConfiguration().setUseReplQueue(true);
+      cache.getConfiguration().setReplQueueMaxElements(COUNT);
+      cache.getConfiguration().setReplQueueInterval(-1);
+      cache.start();
+      registry = TestingUtil.extractComponentRegistry(cache);
+      replQ = registry.getComponent(ReplicationQueue.class);
+      originalRpcManager = cache.getConfiguration().getRuntimeConfig().getRPCManager();
+      cache2 = new DefaultCacheFactory().createCache(cache.getConfiguration().clone());
+
+      TestingUtil.blockUntilViewsReceived(60000, cache, cache2);
+   }
+
+   @AfterMethod
+   public void tearDown()
+   {
+      // reset the original RPCManager
+      injectRpcManager(originalRpcManager);
+      TestingUtil.killCaches(cache, cache2);
+   }
+
+   private void injectRpcManager(RPCManager manager)
+   {
+      registry.registerComponent(RPCManager.class.getName(), manager, RPCManager.class);
+   }
+
+   public void testQueueHoldAndFlush() throws Exception
+   {
+      assert replQ != null;
+
+      // mock the RPCManager used in the cache
+      RPCManager mockRpcManager = EasyMock.createStrictMock(RPCManager.class);
+      injectRpcManager(mockRpcManager);
+
+      // expect basic cluster related calls
+      expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
+      replay(mockRpcManager);
+
+      // check that nothing on the RPCManager will be called until we hit the replication queue threshold.
+      for (int i = 0; i < COUNT - 1; i++) cache.put("/a/b/c/" + i, "k", "v");
+      assert replQ.elements.size() == COUNT - 1;
+
+      // verify that no calls have been made on the mockRpcManager
+      verify(mockRpcManager);
+
+      // reset the mock
+      reset(mockRpcManager);
+
+      // now try the last PUT which should result in the queue being flushed.
+      expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
+      expect(mockRpcManager.callRemoteMethods((List<Address>) anyObject(), (MethodCall) anyObject(), anyBoolean(), anyBoolean(), anyInt(), anyBoolean())).andReturn(Collections.emptyList()).once();
+      replay(mockRpcManager);
+
+      cache.put("/a/b/c/LAST", "k", "v");
+      assert replQ.elements.size() == 0;
+
+      // verify that the rpc call was only made once.
+      verify(mockRpcManager);
+   }
+
+   public void testFlushConcurrency() throws Exception
+   {
+      // will create multiple threads to constantly perform a cache update, and measure the number of expected invocations on the RPC manager.
+      final int numThreads = 25;
+      final int numLoopsPerThread = 1000;
+
+      int totalInvocations = numThreads * numLoopsPerThread;
+
+      assert totalInvocations % COUNT == 0 : "NumThreads and NumLoopsPerThread must multiply to be a multiple of COUNT";
+
+      int expectedReplications = totalInvocations / COUNT;
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      // mock the RPCManager used in the cache
+      RPCManager mockRpcManager = EasyMock.createStrictMock(RPCManager.class);
+      injectRpcManager(mockRpcManager);
+
+      // expect basic cluster related calls
+      expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
+      expect(mockRpcManager.callRemoteMethods((List<Address>) anyObject(), (MethodCall) anyObject(), anyBoolean(), anyBoolean(), anyInt(), anyBoolean())).andReturn(Collections.emptyList()).times(expectedReplications);
+      replay(mockRpcManager);
+
+      Thread[] threads = new Thread[numThreads];
+
+      for (int i = 0; i < numThreads; i++)
+      {
+         threads[i] = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  latch.await();
+               }
+               catch (InterruptedException e)
+               {
+                  // do nothing
+               }
+               for (int j = 0; j < numLoopsPerThread; j++)
+               {
+                  cache.put("/a/b/c/" + getName() + "/" + j, "k", "v");
+               }
+            }
+         };
+         threads[i].start();
+      }
+
+      // start the threads
+      latch.countDown();
+
+      // wait for threads to join
+      for (Thread t : threads) t.join();
+
+      // now test results
+      verify(mockRpcManager);
+
+      assert replQ.elements.size() == 0;
+   }
+
+   public void testFailure() throws InterruptedException
+   {
+      for (int i = 0; i < COUNT; i++)
+      {
+         System.out.println("on put i = " + i);
+         cache.put("/a/b/c" + i, "key", "value");
+         assertNotNull(cache.get("/a/b/c" + i, "key"));
+      }
+      TestingUtil.sleepThread(500);
+      for (int i = 0; i < COUNT; i++) assertNotNull("on get i = " + i, cache2.get("/a/b/c" + i, "key"));
+   }
+}




More information about the jbosscache-commits mailing list