[infinispan-commits] Infinispan SVN: r2129 - branches/4.1.x/core/src/main/java/org/infinispan/remoting.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Aug 2 08:07:28 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-08-02 08:07:27 -0400 (Mon, 02 Aug 2010)
New Revision: 2129
Modified:
branches/4.1.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
Log:
[ISPN-504] (replication queue is being sync with synchronized)
Modified: branches/4.1.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java 2010-08-02 12:02:56 UTC (rev 2128)
+++ branches/4.1.x/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java 2010-08-02 12:07:27 UTC (rev 2129)
@@ -38,6 +38,8 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -59,7 +61,7 @@
/**
* Holds the replication jobs.
*/
- private final List<ReplicableCommand> elements = new LinkedList<ReplicableCommand>();
+ private final BlockingQueue<ReplicableCommand> elements = new LinkedBlockingQueue<ReplicableCommand>();
/**
* For periodical replication
@@ -87,7 +89,7 @@
* Starts the asynchronous flush queue.
*/
@Start
- public synchronized void start() {
+ public void start() {
long interval = configuration.getReplQueueInterval();
log.trace("Starting replication queue, with interval {0} and maxElements {1}", interval, maxElements);
this.maxElements = configuration.getReplQueueMaxElements();
@@ -106,7 +108,7 @@
* Stops the asynchronous flush queue.
*/
@Stop(priority = 9) // Stop before transport
- public synchronized void stop() {
+ public void stop() {
if (scheduledExecutor != null) {
scheduledExecutor.shutdownNow();
}
@@ -120,10 +122,11 @@
public void add(ReplicableCommand job) {
if (job == null)
throw new NullPointerException("job is null");
- synchronized (elements) {
- elements.add(job);
- if (elements.size() >= maxElements)
- flush();
+ try {
+ elements.put(job);
+ if (elements.size() >= maxElements) flush();
+ } catch (InterruptedException ie) {
+ Thread.interrupted();
}
}
@@ -131,12 +134,9 @@
* Flushes existing method calls.
*/
public void flush() {
- List<ReplicableCommand> toReplicate;
- synchronized (elements) {
- if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", elements.size());
- toReplicate = new ArrayList<ReplicableCommand>(elements);
- elements.clear();
- }
+ List<ReplicableCommand> toReplicate = new LinkedList<ReplicableCommand>();
+ elements.drainTo(toReplicate);
+ if (log.isTraceEnabled()) log.trace("flush(): flushing repl queue (num elements={0})", toReplicate.size());
int toReplicateSize = toReplicate.size();
if (toReplicateSize > 0) {
@@ -159,4 +159,4 @@
public void reset() {
elements.clear();
}
-}
\ No newline at end of file
+}
More information about the infinispan-commits
mailing list