Author: borges
Date: 2012-02-27 10:03:25 -0500 (Mon, 27 Feb 2012)
New Revision: 12200
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/TransactionCallback.java
Log:
Fix: guard volatile increment against race.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2012-02-27
15:03:02 UTC (rev 12199)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2012-02-27
15:03:25 UTC (rev 12200)
@@ -27,16 +27,16 @@
import org.hornetq.utils.ExecutorFactory;
/**
- *
+ *
* Each instance of OperationContextImpl is associated with an executor (usually an
ordered Executor).
- *
- * Tasks are hold until the operations are complete and executed in the natural order as
soon as the operations are returned
+ *
+ * Tasks are hold until the operations are complete and executed in the natural order as
soon as the operations are returned
* from replication and storage.
- *
+ *
* If there are no pending IO operations, the tasks are just executed at the callers
thread without any context switch.
- *
+ *
* So, if you are doing operations that are not dependent on IO (e.g
NonPersistentMessages) you wouldn't have any context switch.
- *
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public class OperationContextImpl implements OperationContext
@@ -49,12 +49,12 @@
{
OperationContextImpl.threadLocalContext.set(null);
}
-
+
public static OperationContext getContext()
{
return getContext(null);
}
-
+
public static OperationContext getContext(final ExecutorFactory executorFactory)
{
OperationContext token = OperationContextImpl.threadLocalContext.get();
@@ -83,19 +83,17 @@
private int minimalStore = Integer.MAX_VALUE;
private int minimalReplicated = Integer.MAX_VALUE;
-
+
private int minimalPage = Integer.MAX_VALUE;
- private volatile int storeLineUp = 0;
+ private final AtomicInteger storeLineUp = new AtomicInteger(0);
+ private final AtomicInteger replicationLineUp = new AtomicInteger(0);
+ private final AtomicInteger pageLineUp = new AtomicInteger(0);
- private volatile int replicationLineUp = 0;
-
- private volatile int pageLineUp = 0;
-
private int stored = 0;
private int replicated = 0;
-
+
private int paged = 0;
private int errorCode = -1;
@@ -111,12 +109,12 @@
super();
this.executor = executor;
}
-
+
public void pageSyncLineUp()
{
- pageLineUp++;
+ pageLineUp.incrementAndGet();
}
-
+
public synchronized void pageSyncDone()
{
paged++;
@@ -125,12 +123,12 @@
public void storeLineUp()
{
- storeLineUp++;
+ storeLineUp.incrementAndGet();
}
public void replicationLineUp()
{
- replicationLineUp++;
+ replicationLineUp.incrementAndGet();
}
public synchronized void replicationDone()
@@ -154,13 +152,14 @@
if (tasks == null)
{
tasks = new LinkedList<TaskHolder>();
- minimalReplicated = replicationLineUp;
- minimalStore = storeLineUp;
- minimalPage = pageLineUp;
+ minimalReplicated = replicationLineUp.intValue();
+ minimalStore = storeLineUp.intValue();
+ minimalPage = pageLineUp.intValue();
}
// On this case, we can just execute the context directly
- if (replicationLineUp == replicated && storeLineUp == stored &&
pageLineUp == paged)
+ if (replicationLineUp.intValue() == replicated && storeLineUp.intValue()
== stored &&
+ pageLineUp.intValue() == paged)
{
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
@@ -281,14 +280,9 @@
}
}
- class TaskHolder
+ final class TaskHolder
{
-
-
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
+
@Override
public String toString()
{
@@ -302,19 +296,17 @@
"]";
}
- int storeLined;
+ final int storeLined;
+ final int replicationLined;
+ final int pageLined;
- int replicationLined;
-
- int pageLined;
+ final IOAsyncTask task;
- IOAsyncTask task;
-
TaskHolder(final IOAsyncTask task)
{
- storeLined = storeLineUp;
- replicationLined = replicationLineUp;
- pageLined = pageLineUp;
+ storeLined = storeLineUp.intValue();
+ replicationLined = replicationLineUp.intValue();
+ pageLined = pageLineUp.intValue();
this.task = task;
}
}
@@ -360,7 +352,7 @@
buffer.append("Task = " + hold + "\n");
}
}
-
+
return "OperationContextImpl [minimalStore=" + minimalStore +
", storeLineUp=" +
storeLineUp +
@@ -388,6 +380,6 @@
"]" + buffer.toString();
}
-
+
}
Modified:
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
---
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/TransactionCallback.java 2012-02-27
15:03:02 UTC (rev 12199)
+++
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/TransactionCallback.java 2012-02-27
15:03:25 UTC (rev 12200)
@@ -39,7 +39,10 @@
public void countUp()
{
- up++;
+ synchronized (this)
+ {
+ up++;
+ }
countLatch.countUp();
}