[infinispan-commits] Infinispan SVN: r2334 - in trunk/core/src: main/java/org/infinispan/factories and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Sep 7 07:35:11 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-09-07 07:35:10 -0400 (Tue, 07 Sep 2010)
New Revision: 2334
Added:
trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java
trunk/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java
Modified:
trunk/core/src/main/java/org/infinispan/config/Configuration.java
trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
trunk/core/src/main/java/org/infinispan/factories/ReplicationQueueFactory.java
trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
trunk/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java
Log:
[ISPN-608] (Make replication queue implementation pluggable)
Modified: trunk/core/src/main/java/org/infinispan/config/Configuration.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/config/Configuration.java 2010-09-07 11:31:01 UTC (rev 2333)
+++ trunk/core/src/main/java/org/infinispan/config/Configuration.java 2010-09-07 11:35:10 UTC (rev 2334)
@@ -28,8 +28,10 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.SurvivesRestarts;
import org.infinispan.factories.annotations.Start;
+import org.infinispan.remoting.ReplicationQueueImpl;
import org.infinispan.transaction.lookup.GenericTransactionManagerLookup;
import org.infinispan.transaction.lookup.TransactionManagerLookup;
+import org.infinispan.util.Util;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.CacheException;
import javax.xml.bind.annotation.XmlAccessType;
@@ -264,6 +266,10 @@
setReplQueueInterval(timeUnit.toMillis(replQueueInterval));
}
+ public void setReplQueueClass(String classname) {
+ this.clustering.async.setReplQueueClass(classname);
+ }
+
public void setExposeJmxStatistics(boolean useMbean) {
jmxStatistics.setEnabled(useMbean);
@@ -537,6 +543,10 @@
return clustering.async.replQueueInterval;
}
+ public String getReplQueueClass() {
+ return this.clustering.async.replQueueClass;
+ }
+
public boolean isExposeJmxStatistics() {
return jmxStatistics.enabled;
}
@@ -1179,6 +1189,12 @@
* <a href="http://community.jboss.org/docs/DOC-15725">here</a>" */
protected Boolean asyncMarshalling=false;
+ /**
+ * @configRef desc="This overrides the replication queue implementation class. Overriding the default allows
+ * you to add behavior to the queue, typically by subclassing the default implementation."
+ */
+ protected String replQueueClass = ReplicationQueueImpl.class.getName();
+
private AsyncType(boolean readFromXml) {
super();
this.readFromXml = readFromXml;
@@ -1204,6 +1220,8 @@
return false;
if (useReplQueue != null ? !useReplQueue.equals(asyncType.useReplQueue) : asyncType.useReplQueue != null)
return false;
+ if (!Util.safeEquals(replQueueClass, asyncType.replQueueClass))
+ return false;
return true;
}
@@ -1215,6 +1233,7 @@
result = 31 * result + (replQueueMaxElements != null ? replQueueMaxElements.hashCode() : 0);
result = 31 * result + (replQueueInterval != null ? replQueueInterval.hashCode() : 0);
result = 31 * result + (asyncMarshalling != null ? asyncMarshalling.hashCode() : 0);
+ result = 31 * result + (replQueueClass != null ? replQueueClass.hashCode() : 0);
return result;
}
@@ -1245,6 +1264,12 @@
testImmutability("asyncMarshalling");
this.asyncMarshalling = asyncMarshalling;
}
+
+ @XmlAttribute
+ public void setReplQueueClass(String replQueueClass) {
+ testImmutability("replQueueClass");
+ this.replQueueClass = replQueueClass;
+ }
}
/**
Modified: trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java 2010-09-07 11:31:01 UTC (rev 2333)
+++ trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java 2010-09-07 11:35:10 UTC (rev 2334)
@@ -37,6 +37,9 @@
import org.infinispan.util.Util;
import org.infinispan.container.EntryFactory;
+import static org.infinispan.util.Util.getInstance;
+import static org.infinispan.util.Util.loadClass;
+
/**
* Simple factory that just uses reflection and an empty constructor of the component type.
*
@@ -47,22 +50,22 @@
CacheLoaderManager.class, InvocationContextContainer.class, PassivationManager.class,
BatchContainer.class, TransactionLog.class, EvictionManager.class, InvocationContextContainer.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
+
@Override
+ @SuppressWarnings("unchecked")
public <T> T construct(Class<T> componentType) {
if (componentType.isInterface()) {
Class componentImpl;
if (componentType.equals(StreamingMarshaller.class)) {
- VersionAwareMarshaller versionAwareMarshaller = Util.getInstance(VersionAwareMarshaller.class);
+ VersionAwareMarshaller versionAwareMarshaller = getInstance(VersionAwareMarshaller.class);
return componentType.cast(versionAwareMarshaller);
- } else if (componentType.equals(InvocationContextContainer.class)) {
- componentImpl = InvocationContextContainerImpl.class;
} else {
// add an "Impl" to the end of the class name and try again
- componentImpl = Util.loadClass(componentType.getName() + "Impl");
+ componentImpl = loadClass(componentType.getName() + "Impl");
}
- return componentType.cast(Util.getInstance(componentImpl));
+ return componentType.cast(getInstance(componentImpl));
} else {
- return Util.getInstance(componentType);
+ return getInstance(componentType);
}
}
}
Modified: trunk/core/src/main/java/org/infinispan/factories/ReplicationQueueFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/ReplicationQueueFactory.java 2010-09-07 11:31:01 UTC (rev 2333)
+++ trunk/core/src/main/java/org/infinispan/factories/ReplicationQueueFactory.java 2010-09-07 11:35:10 UTC (rev 2334)
@@ -23,6 +23,7 @@
import org.infinispan.factories.annotations.DefaultFactoryFor;
import org.infinispan.remoting.ReplicationQueue;
+import org.infinispan.util.Util;
/**
* Factory for ReplicationQueue.
@@ -33,9 +34,14 @@
@DefaultFactoryFor(classes = ReplicationQueue.class)
public class ReplicationQueueFactory extends EmptyConstructorNamedCacheFactory implements AutoInstantiableFactory {
@Override
+ @SuppressWarnings("unchecked")
public <T> T construct(Class<T> componentType) {
if ((!configuration.getCacheMode().isSynchronous()) && configuration.isUseReplQueue()) {
- return super.construct(componentType);
+ String type = configuration.getReplQueueClass();
+ if (type == null)
+ return super.construct(componentType);
+ else
+ return (T) super.construct(Util.loadClass(type));
} else {
return null;
}
Modified: trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java 2010-09-07 11:31:01 UTC (rev 2333)
+++ trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java 2010-09-07 11:35:10 UTC (rev 2334)
@@ -30,6 +30,7 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
+import org.infinispan.lifecycle.Lifecycle;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.util.logging.Log;
@@ -49,113 +50,33 @@
* @author Mircea.Markus at jboss.com
* @since 4.0
*/
-public class ReplicationQueue {
- private static final Log log = LogFactory.getLog(ReplicationQueue.class);
+public interface ReplicationQueue extends Lifecycle {
- /**
- * Max elements before we flush
- */
- private long maxElements = 500;
/**
- * Holds the replication jobs.
+ * @return true if this replication queue is enabled, false otherwise.
*/
- private final BlockingQueue<ReplicableCommand> elements = new LinkedBlockingQueue<ReplicableCommand>();
+ boolean isEnabled();
/**
- * For periodical replication
+ * Adds a new command to the replication queue.
+ *
+ * @param job command to add to the queue
*/
- private ScheduledExecutorService scheduledExecutor = null;
- private RpcManager rpcManager;
- private Configuration configuration;
- private boolean enabled;
- private CommandsFactory commandsFactory;
+ void add(ReplicableCommand job);
- public boolean isEnabled() {
- return enabled;
- }
-
- @Inject
- private void injectDependencies(@ComponentName(KnownComponentNames.ASYNC_REPLICATION_QUEUE_EXECUTOR) ScheduledExecutorService executor,
- RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory) {
- this.rpcManager = rpcManager;
- this.configuration = configuration;
- this.commandsFactory = commandsFactory;
- this.scheduledExecutor = executor;
- }
-
/**
- * Starts the asynchronous flush queue.
+ * Flushes existing jobs in the replication queue.
*/
- @Start
- public void start() {
- long interval = configuration.getReplQueueInterval();
- log.trace("Starting replication queue, with interval {0} and maxElements {1}", interval, maxElements);
- this.maxElements = configuration.getReplQueueMaxElements();
- // check again
- enabled = configuration.isUseReplQueue();
- if (enabled && interval > 0) {
- scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
- public void run() {
- flush();
- }
- }, interval, interval, TimeUnit.MILLISECONDS);
- }
- }
+ void flush();
/**
- * Stops the asynchronous flush queue.
+ * @return the number of elements in the replication queue.
*/
- @Stop(priority = 9) // Stop before transport
- public void stop() {
- if (scheduledExecutor != null) {
- scheduledExecutor.shutdownNow();
- }
- scheduledExecutor = null;
- }
+ int getElementsCount();
-
/**
- * Adds a new method call.
+ * Resets the replication queue, typically used when a cache is restarted.
*/
- public void add(ReplicableCommand job) {
- if (job == null)
- throw new NullPointerException("job is null");
- try {
- elements.put(job);
- if (elements.size() >= maxElements) flush();
- } catch (InterruptedException ie) {
- Thread.interrupted();
- }
- }
-
- /**
- * Flushes existing method calls.
- */
- public void flush() {
- List<ReplicableCommand> toReplicate = new LinkedList<ReplicableCommand>();
- elements.drainTo(toReplicate);
- if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", toReplicate.size());
-
- int toReplicateSize = toReplicate.size();
- if (toReplicateSize > 0) {
- try {
- log.trace("Flushing {0} elements", toReplicateSize);
- MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
- // send to all live caches in the cluster
- rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.getAsyncResponseMode(configuration), configuration.getSyncReplTimeout());
- }
- catch (Throwable t) {
- log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
- }
- }
- }
-
- public int getElementsCount() {
- return elements.size();
- }
-
- public void reset() {
- elements.clear();
- }
+ void reset();
}
Copied: trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java (from rev 2333, branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java 2010-09-07 11:35:10 UTC (rev 2334)
@@ -0,0 +1,143 @@
+package org.infinispan.remoting;
+
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.commands.remote.MultipleRpcCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.factories.KnownComponentNames;
+import org.infinispan.factories.annotations.ComponentName;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.factories.annotations.Stop;
+import org.infinispan.remoting.rpc.ResponseMode;
+import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A default implementation of the ReplicationQueue interface.
+ *
+ * @author Manik Surtani
+ * @version 4.2
+ */
+public class ReplicationQueueImpl implements ReplicationQueue {
+ private static final Log log = LogFactory.getLog(ReplicationQueue.class);
+
+ /**
+ * Max elements before we flush
+ */
+ private long maxElements = 500;
+
+ /**
+ * Holds the replication jobs.
+ */
+ private final BlockingQueue<ReplicableCommand> elements = new LinkedBlockingQueue<ReplicableCommand>();
+
+ /**
+ * For periodical replication
+ */
+ private ScheduledExecutorService scheduledExecutor = null;
+ private RpcManager rpcManager;
+ private Configuration configuration;
+ private boolean enabled;
+ private CommandsFactory commandsFactory;
+
+ /**
+ *
+ * @return true if this replication queue is enabled, false otherwise.
+ */
+ @Override
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ @Inject
+ private void injectDependencies(@ComponentName(KnownComponentNames.ASYNC_REPLICATION_QUEUE_EXECUTOR) ScheduledExecutorService executor,
+ RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory) {
+ this.rpcManager = rpcManager;
+ this.configuration = configuration;
+ this.commandsFactory = commandsFactory;
+ this.scheduledExecutor = executor;
+ }
+
+ /**
+ * Starts the asynchronous flush queue.
+ */
+ @Start
+ public void start() {
+ long interval = configuration.getReplQueueInterval();
+ log.trace("Starting replication queue, with interval {0} and maxElements {1}", interval, maxElements);
+ this.maxElements = configuration.getReplQueueMaxElements();
+ // check again
+ enabled = configuration.isUseReplQueue();
+ if (enabled && interval > 0) {
+ scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+ public void run() {
+ flush();
+ }
+ }, interval, interval, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Stops the asynchronous flush queue.
+ */
+ @Stop(priority = 9)
+ // Stop before transport
+ public void stop() {
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdownNow();
+ }
+ scheduledExecutor = null;
+ }
+
+
+ @Override
+ public void add(ReplicableCommand job) {
+ if (job == null)
+ throw new NullPointerException("job is null");
+ try {
+ elements.put(job);
+ if (elements.size() >= maxElements) flush();
+ } catch (InterruptedException ie) {
+ Thread.interrupted();
+ }
+ }
+
+ @Override
+ public void flush() {
+ List<ReplicableCommand> toReplicate = new LinkedList<ReplicableCommand>();
+ elements.drainTo(toReplicate);
+ if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", toReplicate.size());
+
+ int toReplicateSize = toReplicate.size();
+ if (toReplicateSize > 0) {
+ try {
+ log.trace("Flushing {0} elements", toReplicateSize);
+ MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
+ // send to all live caches in the cluster
+ rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.getAsyncResponseMode(configuration), configuration.getSyncReplTimeout());
+ }
+ catch (Throwable t) {
+ log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
+ }
+ }
+ }
+
+ @Override
+ public int getElementsCount() {
+ return elements.size();
+ }
+
+ @Override
+ public void reset() {
+ elements.clear();
+ }
+}
Copied: trunk/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java (from rev 2333, branches/4.2.x/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java 2010-09-07 11:35:10 UTC (rev 2334)
@@ -0,0 +1,35 @@
+package org.infinispan.replication;
+
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.ReplicationQueue;
+import org.infinispan.remoting.ReplicationQueueImpl;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.awt.color.CMMException;
+
+ at Test(groups = "functional", testName = "replication.CustomReplQueueTest")
+public class CustomReplQueueTest extends SingleCacheManagerTest {
+ @Override
+ protected EmbeddedCacheManager createCacheManager() throws Exception {
+ Configuration cfg = new Configuration();
+ cfg.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+ cfg.setUseReplQueue(true);
+ cfg.setReplQueueClass(TestReplQueueClass.class.getName());
+ EmbeddedCacheManager ecm = TestCacheManagerFactory.createCacheManager(GlobalConfiguration.getClusteredDefault(), cfg);
+ return ecm;
+ }
+
+ public void testReplQueueImplType() {
+ ReplicationQueue rq = TestingUtil.extractComponent(cache, ReplicationQueue.class);
+ assert rq instanceof TestReplQueueClass;
+ }
+
+ public static class TestReplQueueClass extends ReplicationQueueImpl {
+ }
+}
Modified: trunk/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java 2010-09-07 11:31:01 UTC (rev 2333)
+++ trunk/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java 2010-09-07 11:35:10 UTC (rev 2334)
@@ -23,10 +23,21 @@
protected EmbeddedCacheManager cacheManager;
protected Cache<Object, Object> cache;
+ protected void setup() throws Exception {
+ cacheManager = createCacheManager();
+ if (cache == null) cache = cacheManager.getCache();
+ }
+
+ protected void teardown() {
+ TestingUtil.killCacheManagers(cacheManager);
+ cache = null;
+ cacheManager = null;
+ }
+
@BeforeClass()
protected void createBeforeClass() throws Exception {
try {
- if (cleanup == CleanupPhase.AFTER_TEST) cacheManager = createCacheManager();
+ if (cleanup == CleanupPhase.AFTER_TEST) setup();
} catch (Exception e) {
log.error("Unexpected!", e);
throw e;
@@ -36,7 +47,7 @@
@BeforeMethod
protected void createBeforeMethod() throws Exception {
try {
- if (cleanup == CleanupPhase.AFTER_METHOD) cacheManager = createCacheManager();
+ if (cleanup == CleanupPhase.AFTER_METHOD) setup();
} catch (Exception e) {
log.error("Unexpected!", e);
throw e;
@@ -46,7 +57,7 @@
@AfterClass(alwaysRun=true)
protected void destroyAfterClass() {
try {
- if (cleanup == CleanupPhase.AFTER_TEST) TestingUtil.killCacheManagers(cacheManager);
+ if (cleanup == CleanupPhase.AFTER_TEST) teardown();
} catch (Exception e) {
log.error("Unexpected!", e);
}
@@ -54,7 +65,7 @@
@AfterMethod(alwaysRun=true)
protected void destroyAfterMethod() {
- if (cleanup == CleanupPhase.AFTER_METHOD) TestingUtil.killCacheManagers(cacheManager);
+ if (cleanup == CleanupPhase.AFTER_METHOD) teardown();
}
@AfterMethod(alwaysRun=true)
More information about the infinispan-commits
mailing list