[infinispan-commits] Infinispan SVN: r2333 - in branches/4.2.x/core/src: main/java/org/infinispan/factories and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Sep 7 07:31:02 EDT 2010


Author: manik.surtani at jboss.com
Date: 2010-09-07 07:31:01 -0400 (Tue, 07 Sep 2010)
New Revision: 2333

Added:
   branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java
   branches/4.2.x/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java
Modified:
   branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java
   branches/4.2.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
   branches/4.2.x/core/src/main/java/org/infinispan/factories/ReplicationQueueFactory.java
   branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
   branches/4.2.x/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java
Log:
[ISPN-608] (Make replication queue implementation pluggable)

Modified: branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java	2010-09-07 10:45:33 UTC (rev 2332)
+++ branches/4.2.x/core/src/main/java/org/infinispan/config/Configuration.java	2010-09-07 11:31:01 UTC (rev 2333)
@@ -28,8 +28,10 @@
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.SurvivesRestarts;
 import org.infinispan.factories.annotations.Start;
+import org.infinispan.remoting.ReplicationQueueImpl;
 import org.infinispan.transaction.lookup.GenericTransactionManagerLookup;
 import org.infinispan.transaction.lookup.TransactionManagerLookup;
+import org.infinispan.util.Util;
 import org.infinispan.util.concurrent.IsolationLevel;
 import org.infinispan.CacheException;
 import javax.xml.bind.annotation.XmlAccessType;
@@ -264,6 +266,10 @@
       setReplQueueInterval(timeUnit.toMillis(replQueueInterval));
    }
 
+   public void setReplQueueClass(String classname) {
+      this.clustering.async.setReplQueueClass(classname);
+   }
+
   
    public void setExposeJmxStatistics(boolean useMbean) {
       jmxStatistics.setEnabled(useMbean);
@@ -541,6 +547,10 @@
       return clustering.async.replQueueInterval;
    }
 
+   public String getReplQueueClass() {
+      return this.clustering.async.replQueueClass;
+   }
+
    public boolean isExposeJmxStatistics() {
       return jmxStatistics.enabled;
    }
@@ -1205,6 +1215,12 @@
        *             <a href=&quot;http://community.jboss.org/docs/DOC-15725&quot;>here</a>" */
       protected Boolean asyncMarshalling=false;
 
+      /**
+       * @configRef desc="This overrides the replication queue implementation class.  Overriding the default allows
+       *                  you to add behavior to the queue, typically by subclassing the default implementation."
+       */
+      protected String replQueueClass = ReplicationQueueImpl.class.getName();
+
       private AsyncType(boolean readFromXml) {
          super();
          this.readFromXml = readFromXml;
@@ -1230,6 +1246,8 @@
             return false;
          if (useReplQueue != null ? !useReplQueue.equals(asyncType.useReplQueue) : asyncType.useReplQueue != null)
             return false;
+         if (!Util.safeEquals(replQueueClass, asyncType.replQueueClass))
+            return false;
 
          return true;
       }
@@ -1241,6 +1259,7 @@
          result = 31 * result + (replQueueMaxElements != null ? replQueueMaxElements.hashCode() : 0);
          result = 31 * result + (replQueueInterval != null ? replQueueInterval.hashCode() : 0);
          result = 31 * result + (asyncMarshalling != null ? asyncMarshalling.hashCode() : 0);
+         result = 31 * result + (replQueueClass != null ? replQueueClass.hashCode() : 0);
          return result;
       }
 
@@ -1271,6 +1290,12 @@
          testImmutability("asyncMarshalling");
          this.asyncMarshalling = asyncMarshalling;
       }
+
+      @XmlAttribute
+      public void setReplQueueClass(String replQueueClass) {
+         testImmutability("replQueueClass");
+         this.replQueueClass = replQueueClass;
+      }
    }
    
    /**

Modified: branches/4.2.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java	2010-09-07 10:45:33 UTC (rev 2332)
+++ branches/4.2.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java	2010-09-07 11:31:01 UTC (rev 2333)
@@ -37,6 +37,9 @@
 import org.infinispan.util.Util;
 import org.infinispan.container.EntryFactory;
 
+import static org.infinispan.util.Util.getInstance;
+import static org.infinispan.util.Util.loadClass;
+
 /**
  * Simple factory that just uses reflection and an empty constructor of the component type.
  *
@@ -47,22 +50,22 @@
         CacheLoaderManager.class, InvocationContextContainer.class, PassivationManager.class,
         BatchContainer.class, TransactionLog.class, EvictionManager.class, InvocationContextContainer.class})
 public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
+
    @Override
+   @SuppressWarnings("unchecked")
    public <T> T construct(Class<T> componentType) {
       if (componentType.isInterface()) {
          Class componentImpl;
          if (componentType.equals(StreamingMarshaller.class)) {
-            VersionAwareMarshaller versionAwareMarshaller = Util.getInstance(VersionAwareMarshaller.class);
+            VersionAwareMarshaller versionAwareMarshaller = getInstance(VersionAwareMarshaller.class);
             return componentType.cast(versionAwareMarshaller);
-         } else if (componentType.equals(InvocationContextContainer.class)) {
-            componentImpl = InvocationContextContainerImpl.class;
          } else {
             // add an "Impl" to the end of the class name and try again
-            componentImpl = Util.loadClass(componentType.getName() + "Impl");
+            componentImpl = loadClass(componentType.getName() + "Impl");
          }
-         return componentType.cast(Util.getInstance(componentImpl));
+         return componentType.cast(getInstance(componentImpl));
       } else {
-         return Util.getInstance(componentType);
+         return getInstance(componentType);
       }
    }
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/factories/ReplicationQueueFactory.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/factories/ReplicationQueueFactory.java	2010-09-07 10:45:33 UTC (rev 2332)
+++ branches/4.2.x/core/src/main/java/org/infinispan/factories/ReplicationQueueFactory.java	2010-09-07 11:31:01 UTC (rev 2333)
@@ -23,6 +23,7 @@
 
 import org.infinispan.factories.annotations.DefaultFactoryFor;
 import org.infinispan.remoting.ReplicationQueue;
+import org.infinispan.util.Util;
 
 /**
  * Factory for ReplicationQueue.
@@ -33,9 +34,14 @@
 @DefaultFactoryFor(classes = ReplicationQueue.class)
 public class ReplicationQueueFactory extends EmptyConstructorNamedCacheFactory implements AutoInstantiableFactory {
    @Override
+   @SuppressWarnings("unchecked")
    public <T> T construct(Class<T> componentType) {
       if ((!configuration.getCacheMode().isSynchronous()) && configuration.isUseReplQueue()) {
-         return super.construct(componentType);
+         String type = configuration.getReplQueueClass();
+         if (type == null)
+            return super.construct(componentType);
+         else
+            return (T) super.construct(Util.loadClass(type));
       } else {
          return null;
       }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java	2010-09-07 10:45:33 UTC (rev 2332)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java	2010-09-07 11:31:01 UTC (rev 2333)
@@ -30,6 +30,7 @@
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.factories.annotations.Stop;
+import org.infinispan.lifecycle.Lifecycle;
 import org.infinispan.remoting.rpc.ResponseMode;
 import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.util.logging.Log;
@@ -49,113 +50,33 @@
  * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
-public class ReplicationQueue {
-   private static final Log log = LogFactory.getLog(ReplicationQueue.class);
+public interface ReplicationQueue extends Lifecycle {
 
-   /**
-    * Max elements before we flush
-    */
-   private long maxElements = 500;
 
    /**
-    * Holds the replication jobs.
+    * @return true if this replication queue is enabled, false otherwise.
     */
-   private final BlockingQueue<ReplicableCommand> elements = new LinkedBlockingQueue<ReplicableCommand>();
+   boolean isEnabled();
 
    /**
-    * For periodical replication
+    * Adds a new command to the replication queue.
+    *
+    * @param job command to add to the queue
     */
-   private ScheduledExecutorService scheduledExecutor = null;
-   private RpcManager rpcManager;
-   private Configuration configuration;
-   private boolean enabled;
-   private CommandsFactory commandsFactory;
+   void add(ReplicableCommand job);
 
-   public boolean isEnabled() {
-      return enabled;
-   }
-
-   @Inject
-   private void injectDependencies(@ComponentName(KnownComponentNames.ASYNC_REPLICATION_QUEUE_EXECUTOR) ScheduledExecutorService executor,
-                                   RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory) {
-      this.rpcManager = rpcManager;
-      this.configuration = configuration;
-      this.commandsFactory = commandsFactory;
-      this.scheduledExecutor = executor;
-   }
-
    /**
-    * Starts the asynchronous flush queue.
+    * Flushes existing jobs in the replication queue.
     */
-   @Start
-   public void start() {
-      long interval = configuration.getReplQueueInterval();
-      log.trace("Starting replication queue, with interval {0} and maxElements {1}", interval, maxElements);
-      this.maxElements = configuration.getReplQueueMaxElements();
-      // check again
-      enabled = configuration.isUseReplQueue();
-      if (enabled && interval > 0) {
-         scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
-            public void run() {
-               flush();
-            }
-         }, interval, interval, TimeUnit.MILLISECONDS);
-      }
-   }
+   void flush();
 
    /**
-    * Stops the asynchronous flush queue.
+    * @return the number of elements in the replication queue.
     */
-   @Stop(priority = 9) // Stop before transport
-   public void stop() {
-      if (scheduledExecutor != null) {
-         scheduledExecutor.shutdownNow();
-      }
-      scheduledExecutor = null;
-   }
+   int getElementsCount();
 
-
    /**
-    * Adds a new method call.
+    * Resets the replication queue, typically used when a cache is restarted.
     */
-   public void add(ReplicableCommand job) {
-      if (job == null)
-         throw new NullPointerException("job is null");
-      try {
-         elements.put(job);
-         if (elements.size() >= maxElements) flush();
-      } catch (InterruptedException ie) {
-         Thread.interrupted();
-      }
-   }
-
-   /**
-    * Flushes existing method calls.
-    */
-   public void flush() {
-      List<ReplicableCommand> toReplicate = new LinkedList<ReplicableCommand>();
-      elements.drainTo(toReplicate);
-      if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", toReplicate.size());
-
-      int toReplicateSize = toReplicate.size();
-      if (toReplicateSize > 0) {
-         try {
-            log.trace("Flushing {0} elements", toReplicateSize);
-            MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
-            // send to all live caches in the cluster
-            rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.getAsyncResponseMode(configuration), configuration.getSyncReplTimeout());
-         }
-         catch (Throwable t) {
-            log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
-         }
-      }
-   }
-
-   public int getElementsCount() {
-      return elements.size();
-   }
-
-   public void reset() {
-      elements.clear();
-   }
+   void reset();
 }

Added: branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java	                        (rev 0)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java	2010-09-07 11:31:01 UTC (rev 2333)
@@ -0,0 +1,143 @@
+package org.infinispan.remoting;
+
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.commands.remote.MultipleRpcCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.factories.KnownComponentNames;
+import org.infinispan.factories.annotations.ComponentName;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.factories.annotations.Stop;
+import org.infinispan.remoting.rpc.ResponseMode;
+import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A default implementation of the ReplicationQueue interface.
+ *
+ * @author Manik Surtani
+ * @version 4.2
+ */
+public class ReplicationQueueImpl implements ReplicationQueue {
+   private static final Log log = LogFactory.getLog(ReplicationQueue.class);
+
+   /**
+    * Max elements before we flush
+    */
+   private long maxElements = 500;
+
+   /**
+    * Holds the replication jobs.
+    */
+   private final BlockingQueue<ReplicableCommand> elements = new LinkedBlockingQueue<ReplicableCommand>();
+
+   /**
+    * For periodical replication
+    */
+   private ScheduledExecutorService scheduledExecutor = null;
+   private RpcManager rpcManager;
+   private Configuration configuration;
+   private boolean enabled;
+   private CommandsFactory commandsFactory;
+
+   /**
+    *
+    * @return true if this replication queue is enabled, false otherwise.
+    */
+   @Override
+   public boolean isEnabled() {
+      return enabled;
+   }
+
+   @Inject
+   private void injectDependencies(@ComponentName(KnownComponentNames.ASYNC_REPLICATION_QUEUE_EXECUTOR) ScheduledExecutorService executor,
+                                   RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory) {
+      this.rpcManager = rpcManager;
+      this.configuration = configuration;
+      this.commandsFactory = commandsFactory;
+      this.scheduledExecutor = executor;
+   }
+
+   /**
+    * Starts the asynchronous flush queue.
+    */
+   @Start
+   public void start() {
+      long interval = configuration.getReplQueueInterval();
+      log.trace("Starting replication queue, with interval {0} and maxElements {1}", interval, maxElements);
+      this.maxElements = configuration.getReplQueueMaxElements();
+      // check again
+      enabled = configuration.isUseReplQueue();
+      if (enabled && interval > 0) {
+         scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+            public void run() {
+               flush();
+            }
+         }, interval, interval, TimeUnit.MILLISECONDS);
+      }
+   }
+
+   /**
+    * Stops the asynchronous flush queue.
+    */
+   @Stop(priority = 9)
+   // Stop before transport
+   public void stop() {
+      if (scheduledExecutor != null) {
+         scheduledExecutor.shutdownNow();
+      }
+      scheduledExecutor = null;
+   }
+
+
+   @Override
+   public void add(ReplicableCommand job) {
+      if (job == null)
+         throw new NullPointerException("job is null");
+      try {
+         elements.put(job);
+         if (elements.size() >= maxElements) flush();
+      } catch (InterruptedException ie) {
+         Thread.interrupted();
+      }
+   }
+
+   @Override
+   public void flush() {
+      List<ReplicableCommand> toReplicate = new LinkedList<ReplicableCommand>();
+      elements.drainTo(toReplicate);
+      if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", toReplicate.size());
+
+      int toReplicateSize = toReplicate.size();
+      if (toReplicateSize > 0) {
+         try {
+            log.trace("Flushing {0} elements", toReplicateSize);
+            MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
+            // send to all live caches in the cluster
+            rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.getAsyncResponseMode(configuration), configuration.getSyncReplTimeout());
+         }
+         catch (Throwable t) {
+            log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
+         }
+      }
+   }
+
+   @Override
+   public int getElementsCount() {
+      return elements.size();
+   }
+
+   @Override
+   public void reset() {
+      elements.clear();
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/replication/CustomReplQueueTest.java	2010-09-07 11:31:01 UTC (rev 2333)
@@ -0,0 +1,35 @@
+package org.infinispan.replication;
+
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.ReplicationQueue;
+import org.infinispan.remoting.ReplicationQueueImpl;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.awt.color.CMMException;
+
+ at Test(groups = "functional", testName = "replication.CustomReplQueueTest")
+public class CustomReplQueueTest extends SingleCacheManagerTest {
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      Configuration cfg = new Configuration();
+      cfg.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+      cfg.setUseReplQueue(true);
+      cfg.setReplQueueClass(TestReplQueueClass.class.getName());
+      EmbeddedCacheManager ecm = TestCacheManagerFactory.createCacheManager(GlobalConfiguration.getClusteredDefault(), cfg);
+      return ecm;
+   }
+
+   public void testReplQueueImplType() {
+      ReplicationQueue rq = TestingUtil.extractComponent(cache, ReplicationQueue.class);
+      assert rq instanceof TestReplQueueClass;              
+   }
+
+   public static class TestReplQueueClass extends ReplicationQueueImpl {
+   }
+}

Modified: branches/4.2.x/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java	2010-09-07 10:45:33 UTC (rev 2332)
+++ branches/4.2.x/core/src/test/java/org/infinispan/test/SingleCacheManagerTest.java	2010-09-07 11:31:01 UTC (rev 2333)
@@ -23,10 +23,21 @@
    protected EmbeddedCacheManager cacheManager;
    protected Cache<Object, Object> cache;
 
+   protected void setup() throws Exception {
+      cacheManager = createCacheManager();
+      if (cache == null) cache = cacheManager.getCache();
+   }
+
+   protected void teardown() {
+      TestingUtil.killCacheManagers(cacheManager);
+      cache = null;
+      cacheManager = null;
+   }
+
    @BeforeClass()
    protected void createBeforeClass() throws Exception {
       try {
-         if (cleanup == CleanupPhase.AFTER_TEST) cacheManager = createCacheManager();
+         if (cleanup == CleanupPhase.AFTER_TEST) setup();
       } catch (Exception e) {
          log.error("Unexpected!", e);
          throw e;
@@ -36,7 +47,7 @@
    @BeforeMethod
    protected void createBeforeMethod() throws Exception {
       try {
-         if (cleanup == CleanupPhase.AFTER_METHOD) cacheManager = createCacheManager();
+         if (cleanup == CleanupPhase.AFTER_METHOD) setup();
       } catch (Exception e) {
          log.error("Unexpected!", e);
          throw e;
@@ -46,7 +57,7 @@
    @AfterClass(alwaysRun=true)
    protected void destroyAfterClass() {
       try {
-         if (cleanup == CleanupPhase.AFTER_TEST) TestingUtil.killCacheManagers(cacheManager);
+         if (cleanup == CleanupPhase.AFTER_TEST) teardown();
       } catch (Exception e) {
          log.error("Unexpected!", e);
       }
@@ -54,7 +65,7 @@
 
    @AfterMethod(alwaysRun=true)
    protected void destroyAfterMethod() {
-      if (cleanup == CleanupPhase.AFTER_METHOD) TestingUtil.killCacheManagers(cacheManager);
+      if (cleanup == CleanupPhase.AFTER_METHOD) teardown();
    }
 
    @AfterMethod(alwaysRun=true)



More information about the infinispan-commits mailing list