Author: manik.surtani(a)jboss.com
Date: 2008-03-18 20:31:19 -0400 (Tue, 18 Mar 2008)
New Revision: 5447
Added:
core/trunk/src/main/java/org/jboss/cache/cluster/
core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
core/trunk/src/test/java/org/jboss/cache/cluster/
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
Removed:
core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
Modified:
core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
Log:
JBCACHE-295 - Add more unit tests for ReplicationQueue
Deleted: core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java 2008-03-19 00:21:05 UTC
(rev 5446)
+++ core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java 2008-03-19 00:31:19 UTC
(rev 5447)
@@ -1,177 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at
gnu.org.
- */
-package org.jboss.cache;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.config.Configuration;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.factories.annotations.Stop;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodCallFactory;
-import org.jboss.cache.marshall.MethodDeclarations;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-
-/**
- * Periodically (or when certain size is exceeded) takes elements and replicates them.
- *
- * @author <a href="mailto:bela@jboss.org">Bela Ban</a> May 24,
2003
- * @version $Revision$
- */
-public class ReplicationQueue
-{
-
- private static Log log = LogFactory.getLog(ReplicationQueue.class);
-
- /**
- * We flush every 5 seconds. Inactive if -1 or 0
- */
- private long interval = 5000;
-
- /**
- * Max elements before we flush
- */
- private long max_elements = 500;
-
- /**
- * Holds the replication jobs: LinkedList<MethodCall>
- */
- private final List<MethodCall> elements = new LinkedList<MethodCall>();
-
- /**
- * For periodical replication
- */
- private Timer timer = null;
-
- /**
- * The timer task, only calls flush() when executed by Timer
- */
- private MyTask task = null;
- private RPCManager rpcManager;
- private Configuration configuration;
- private boolean enabled;
-
-
- public boolean isEnabled()
- {
- return enabled;
- }
-
- @Inject
- private void injectDependencies(RPCManager rpcManager, Configuration configuration)
- {
- this.rpcManager = rpcManager;
- this.configuration = configuration;
- enabled = configuration.isUseReplQueue() &&
(configuration.getBuddyReplicationConfig() == null ||
!configuration.getBuddyReplicationConfig().isEnabled());
- }
-
- /**
- * Starts the asynchronous flush queue.
- */
- @Start
- public synchronized void start()
- {
- this.interval = configuration.getReplQueueInterval();
- this.max_elements = configuration.getReplQueueMaxElements();
- // check again
- enabled = configuration.isUseReplQueue() &&
(configuration.getBuddyReplicationConfig() == null ||
!configuration.getBuddyReplicationConfig().isEnabled());
- if (enabled)
- {
- if (interval > 0)
- {
- if (task == null)
- task = new MyTask();
- if (timer == null)
- {
- timer = new Timer(true);
- timer.schedule(task,
- 500, // delay before initial flush
- interval); // interval between flushes
- }
- }
- }
- }
-
- /**
- * Stops the asynchronous flush queue.
- */
- @Stop
- public synchronized void stop()
- {
- if (task != null)
- {
- task.cancel();
- task = null;
- }
- if (timer != null)
- {
- timer.cancel();
- timer = null;
- }
- }
-
-
- /**
- * Adds a new method call.
- */
- public void add(MethodCall job)
- {
- if (job == null)
- throw new NullPointerException("job is null");
- synchronized (elements)
- {
- elements.add(job);
- if (elements.size() >= max_elements)
- flush();
- }
- }
-
- /**
- * Flushes existing method calls.
- */
- public void flush()
- {
- List<MethodCall> l;
- synchronized (elements)
- {
- if (log.isTraceEnabled())
- log.trace("flush(): flushing repl queue (num elements=" +
elements.size() + ")");
- l = new ArrayList<MethodCall>(elements);
- elements.clear();
- }
-
- if (l.size() > 0)
- {
- try
- {
- // send to all live nodes in the cluster
- rpcManager.callRemoteMethods(null,
MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000,
false);
- }
- catch (Throwable t)
- {
- log.error("failed replicating " + l.size() + " elements in
replication queue", t);
- }
- }
- }
-
- class MyTask extends TimerTask
- {
- public void run()
- {
- flush();
- }
- }
-
-}
Added: core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
(rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java 2008-03-19
00:31:19 UTC (rev 5447)
@@ -0,0 +1,169 @@
+package org.jboss.cache.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.factories.annotations.Stop;
+import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodCallFactory;
+import org.jboss.cache.marshall.MethodDeclarations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Periodically (or when certain size is exceeded) takes elements and replicates them.
+ *
+ * @author <a href="mailto:bela@jboss.org">Bela Ban</a> May 24,
2003
+ * @version $Revision: 5197 $
+ */
+public class ReplicationQueue
+{
+
+ private static Log log = LogFactory.getLog(ReplicationQueue.class);
+
+ /**
+ * We flush every 5 seconds. Inactive if -1 or 0
+ */
+ private long interval = 5000;
+
+ /**
+ * Max elements before we flush
+ */
+ private long max_elements = 500;
+
+ /**
+ * Holds the replication jobs: LinkedList<MethodCall>
+ */
+ final List<MethodCall> elements = new LinkedList<MethodCall>();
+
+ /**
+ * For periodical replication
+ */
+ private Timer timer = null;
+
+ /**
+ * The timer task, only calls flush() when executed by Timer
+ */
+ private ReplicationQueue.MyTask task = null;
+ private RPCManager rpcManager;
+ private Configuration configuration;
+ private boolean enabled;
+
+
+ public boolean isEnabled()
+ {
+ return enabled;
+ }
+
+ @Inject
+ private void injectDependencies(RPCManager rpcManager, Configuration configuration)
+ {
+ this.rpcManager = rpcManager;
+ this.configuration = configuration;
+ enabled = configuration.isUseReplQueue() &&
(configuration.getBuddyReplicationConfig() == null ||
!configuration.getBuddyReplicationConfig().isEnabled());
+ }
+
+ /**
+ * Starts the asynchronous flush queue.
+ */
+ @Start
+ public synchronized void start()
+ {
+ this.interval = configuration.getReplQueueInterval();
+ this.max_elements = configuration.getReplQueueMaxElements();
+ // check again
+ enabled = configuration.isUseReplQueue() &&
(configuration.getBuddyReplicationConfig() == null ||
!configuration.getBuddyReplicationConfig().isEnabled());
+ if (enabled)
+ {
+ if (interval > 0)
+ {
+ if (task == null)
+ task = new ReplicationQueue.MyTask();
+ if (timer == null)
+ {
+ timer = new Timer(true);
+ timer.schedule(task,
+ 500, // delay before initial flush
+ interval); // interval between flushes
+ }
+ }
+ }
+ }
+
+ /**
+ * Stops the asynchronous flush queue.
+ */
+ @Stop
+ public synchronized void stop()
+ {
+ if (task != null)
+ {
+ task.cancel();
+ task = null;
+ }
+ if (timer != null)
+ {
+ timer.cancel();
+ timer = null;
+ }
+ }
+
+
+ /**
+ * Adds a new method call.
+ */
+ public void add(MethodCall job)
+ {
+ if (job == null)
+ throw new NullPointerException("job is null");
+ synchronized (elements)
+ {
+ elements.add(job);
+ if (elements.size() >= max_elements)
+ flush();
+ }
+ }
+
+ /**
+ * Flushes existing method calls.
+ */
+ public void flush()
+ {
+ List<MethodCall> l;
+ synchronized (elements)
+ {
+ if (log.isTraceEnabled())
+ log.trace("flush(): flushing repl queue (num elements=" +
elements.size() + ")");
+ l = new ArrayList<MethodCall>(elements);
+ elements.clear();
+ }
+
+ if (l.size() > 0)
+ {
+ try
+ {
+ // send to all live nodes in the cluster
+ rpcManager.callRemoteMethods(null,
MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000,
false);
+ }
+ catch (Throwable t)
+ {
+ log.error("failed replicating " + l.size() + " elements in
replication queue", t);
+ }
+ }
+ }
+
+ class MyTask extends TimerTask
+ {
+ public void run()
+ {
+ flush();
+ }
+ }
+}
\ No newline at end of file
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java 2008-03-19
00:21:05 UTC (rev 5446)
+++
core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java 2008-03-19
00:31:19 UTC (rev 5447)
@@ -1,6 +1,6 @@
package org.jboss.cache.factories;
-import org.jboss.cache.ReplicationQueue;
+import org.jboss.cache.cluster.ReplicationQueue;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.DefaultFactoryFor;
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-03-19
00:21:05 UTC (rev 5446)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-03-19
00:31:19 UTC (rev 5447)
@@ -6,8 +6,8 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.RPCManager;
-import org.jboss.cache.ReplicationQueue;
import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.cluster.ReplicationQueue;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.config.Option;
import org.jboss.cache.factories.annotations.Inject;
Copied: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java (from
rev 5441, core/trunk/src/test/java/org/jboss/cache/replicated/ReplicationQueueTest.java)
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
(rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java 2008-03-19
00:31:19 UTC (rev 5447)
@@ -0,0 +1,165 @@
+package org.jboss.cache.cluster;
+
+import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
+import org.jboss.cache.Cache;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.misc.TestingUtil;
+import org.jgroups.Address;
+import static org.testng.AssertJUnit.assertNotNull;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+@Test(groups = "functional")
+public class ReplicationQueueTest
+{
+ private static final int COUNT = 10;
+ Cache cache, cache2;
+ ReplicationQueue replQ;
+ ComponentRegistry registry;
+ RPCManager originalRpcManager;
+
+ @BeforeMethod
+ public void setUp() throws CloneNotSupportedException
+ {
+ cache = new DefaultCacheFactory().createCache(false);
+ cache.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+ cache.getConfiguration().setUseReplQueue(true);
+ cache.getConfiguration().setReplQueueMaxElements(COUNT);
+ cache.getConfiguration().setReplQueueInterval(-1);
+ cache.start();
+ registry = TestingUtil.extractComponentRegistry(cache);
+ replQ = registry.getComponent(ReplicationQueue.class);
+ originalRpcManager = cache.getConfiguration().getRuntimeConfig().getRPCManager();
+ cache2 = new DefaultCacheFactory().createCache(cache.getConfiguration().clone());
+
+ TestingUtil.blockUntilViewsReceived(60000, cache, cache2);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ // reset the original RPCManager
+ injectRpcManager(originalRpcManager);
+ TestingUtil.killCaches(cache, cache2);
+ }
+
+ private void injectRpcManager(RPCManager manager)
+ {
+ registry.registerComponent(RPCManager.class.getName(), manager, RPCManager.class);
+ }
+
+ public void testQueueHoldAndFlush() throws Exception
+ {
+ assert replQ != null;
+
+ // mock the RPCManager used in the cache
+ RPCManager mockRpcManager = EasyMock.createStrictMock(RPCManager.class);
+ injectRpcManager(mockRpcManager);
+
+ // expect basic cluster related calls
+
expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
+ replay(mockRpcManager);
+
+ // check that nothing on the RPCManager will be called until we hit the replication
queue threshold.
+ for (int i = 0; i < COUNT - 1; i++) cache.put("/a/b/c/" + i,
"k", "v");
+ assert replQ.elements.size() == COUNT - 1;
+
+ // verify that no calls have been made on the mockRpcManager
+ verify(mockRpcManager);
+
+ // reset the mock
+ reset(mockRpcManager);
+
+ // now try the last PUT which should result in the queue being flushed.
+
expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
+ expect(mockRpcManager.callRemoteMethods((List<Address>) anyObject(),
(MethodCall) anyObject(), anyBoolean(), anyBoolean(), anyInt(),
anyBoolean())).andReturn(Collections.emptyList()).once();
+ replay(mockRpcManager);
+
+ cache.put("/a/b/c/LAST", "k", "v");
+ assert replQ.elements.size() == 0;
+
+ // verify that the rpc call was only made once.
+ verify(mockRpcManager);
+ }
+
+ public void testFlushConcurrency() throws Exception
+ {
+ // will create multiple threads to constantly perform a cache update, and measure
the number of expected invocations on the RPC manager.
+ final int numThreads = 25;
+ final int numLoopsPerThread = 1000;
+
+ int totalInvocations = numThreads * numLoopsPerThread;
+
+ assert totalInvocations % COUNT == 0 : "NumThreads and NumLoopsPerThread must
multiply to be a multiple of COUNT";
+
+ int expectedReplications = totalInvocations / COUNT;
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // mock the RPCManager used in the cache
+ RPCManager mockRpcManager = EasyMock.createStrictMock(RPCManager.class);
+ injectRpcManager(mockRpcManager);
+
+ // expect basic cluster related calls
+
expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
+ expect(mockRpcManager.callRemoteMethods((List<Address>) anyObject(),
(MethodCall) anyObject(), anyBoolean(), anyBoolean(), anyInt(),
anyBoolean())).andReturn(Collections.emptyList()).times(expectedReplications);
+ replay(mockRpcManager);
+
+ Thread[] threads = new Thread[numThreads];
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ threads[i] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+ for (int j = 0; j < numLoopsPerThread; j++)
+ {
+ cache.put("/a/b/c/" + getName() + "/" + j,
"k", "v");
+ }
+ }
+ };
+ threads[i].start();
+ }
+
+ // start the threads
+ latch.countDown();
+
+ // wait for threads to join
+ for (Thread t : threads) t.join();
+
+ // now test results
+ verify(mockRpcManager);
+
+ assert replQ.elements.size() == 0;
+ }
+
+ public void testFailure() throws InterruptedException
+ {
+ for (int i = 0; i < COUNT; i++)
+ {
+ System.out.println("on put i = " + i);
+ cache.put("/a/b/c" + i, "key", "value");
+ assertNotNull(cache.get("/a/b/c" + i, "key"));
+ }
+ TestingUtil.sleepThread(500);
+ for (int i = 0; i < COUNT; i++) assertNotNull("on get i = " + i,
cache2.get("/a/b/c" + i, "key"));
+ }
+}