[infinispan-commits] Infinispan SVN: r2129 - branches/4.1.x/core/src/main/java/org/infinispan/remoting.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Aug 2 08:07:28 EDT 2010


Author: manik.surtani at jboss.com
Date: 2010-08-02 08:07:27 -0400 (Mon, 02 Aug 2010)
New Revision: 2129

Modified:
   branches/4.1.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
Log:
[ISPN-504] (replication queue is being sync with synchronized)

Modified: branches/4.1.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java	2010-08-02 12:02:56 UTC (rev 2128)
+++ branches/4.1.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java	2010-08-02 12:07:27 UTC (rev 2129)
@@ -38,6 +38,8 @@
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -59,7 +61,7 @@
    /**
     * Holds the replication jobs.
     */
-   private final List<ReplicableCommand> elements = new LinkedList<ReplicableCommand>();
+   private final BlockingQueue<ReplicableCommand> elements = new LinkedBlockingQueue<ReplicableCommand>();
 
    /**
     * For periodical replication
@@ -87,7 +89,7 @@
     * Starts the asynchronous flush queue.
     */
    @Start
-   public synchronized void start() {
+   public void start() {
       long interval = configuration.getReplQueueInterval();
       log.trace("Starting replication queue, with interval {0} and maxElements {1}", interval, maxElements);
       this.maxElements = configuration.getReplQueueMaxElements();
@@ -106,7 +108,7 @@
     * Stops the asynchronous flush queue.
     */
    @Stop(priority = 9) // Stop before transport
-   public synchronized void stop() {
+   public void stop() {
       if (scheduledExecutor != null) {
          scheduledExecutor.shutdownNow();
       }
@@ -120,10 +122,11 @@
    public void add(ReplicableCommand job) {
       if (job == null)
          throw new NullPointerException("job is null");
-      synchronized (elements) {
-         elements.add(job);
-         if (elements.size() >= maxElements)
-            flush();
+      try {
+         elements.put(job);
+         if (elements.size() >= maxElements) flush();
+      } catch (InterruptedException ie) {
+         Thread.interrupted();
       }
    }
 
@@ -131,12 +134,9 @@
     * Flushes existing method calls.
     */
    public void flush() {
-      List<ReplicableCommand> toReplicate;
-      synchronized (elements) {
-         if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", elements.size());
-         toReplicate = new ArrayList<ReplicableCommand>(elements);
-         elements.clear();
-      }
+      List<ReplicableCommand> toReplicate = new LinkedList<ReplicableCommand>();
+      elements.drainTo(toReplicate);
+      if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", toReplicate.size());
 
       int toReplicateSize = toReplicate.size();
       if (toReplicateSize > 0) {
@@ -159,4 +159,4 @@
    public void reset() {
       elements.clear();
    }
-}
\ No newline at end of file
+}



More information about the infinispan-commits mailing list