[hornetq-commits] JBoss hornetq SVN: r12200 - in trunk: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 27 10:03:26 EST 2012


Author: borges
Date: 2012-02-27 10:03:25 -0500 (Mon, 27 Feb 2012)
New Revision: 12200

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/TransactionCallback.java
Log:
Fix: guard volatile increment against race.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2012-02-27 15:03:02 UTC (rev 12199)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2012-02-27 15:03:25 UTC (rev 12200)
@@ -27,16 +27,16 @@
 import org.hornetq.utils.ExecutorFactory;
 
 /**
- * 
+ *
  * Each instance of OperationContextImpl is associated with an executor (usually an ordered Executor).
- * 
- * Tasks are hold until the operations are complete and executed in the natural order as soon as the operations are returned 
+ *
+ * Tasks are hold until the operations are complete and executed in the natural order as soon as the operations are returned
  * from replication and storage.
- * 
+ *
  * If there are no pending IO operations, the tasks are just executed at the callers thread without any context switch.
- * 
+ *
  * So, if you are doing operations that are not dependent on IO (e.g NonPersistentMessages) you wouldn't have any context switch.
- * 
+ *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
 public class OperationContextImpl implements OperationContext
@@ -49,12 +49,12 @@
    {
       OperationContextImpl.threadLocalContext.set(null);
    }
-   
+
    public static OperationContext getContext()
    {
       return getContext(null);
    }
-   
+
    public static OperationContext getContext(final ExecutorFactory executorFactory)
    {
       OperationContext token = OperationContextImpl.threadLocalContext.get();
@@ -83,19 +83,17 @@
    private int minimalStore = Integer.MAX_VALUE;
 
    private int minimalReplicated = Integer.MAX_VALUE;
-   
+
    private int minimalPage = Integer.MAX_VALUE;
 
-   private volatile int storeLineUp = 0;
+   private final AtomicInteger storeLineUp = new AtomicInteger(0);
+   private final AtomicInteger replicationLineUp = new AtomicInteger(0);
+   private final AtomicInteger pageLineUp = new AtomicInteger(0);
 
-   private volatile int replicationLineUp = 0;
-   
-   private volatile int pageLineUp = 0;
-
    private int stored = 0;
 
    private int replicated = 0;
-   
+
    private int paged = 0;
 
    private int errorCode = -1;
@@ -111,12 +109,12 @@
       super();
       this.executor = executor;
    }
-   
+
    public void pageSyncLineUp()
    {
-      pageLineUp++;
+      pageLineUp.incrementAndGet();
    }
-   
+
    public synchronized void pageSyncDone()
    {
       paged++;
@@ -125,12 +123,12 @@
 
    public void storeLineUp()
    {
-      storeLineUp++;
+      storeLineUp.incrementAndGet();
    }
 
    public void replicationLineUp()
    {
-      replicationLineUp++;
+      replicationLineUp.incrementAndGet();
    }
 
    public synchronized void replicationDone()
@@ -154,13 +152,14 @@
          if (tasks == null)
          {
             tasks = new LinkedList<TaskHolder>();
-            minimalReplicated = replicationLineUp;
-            minimalStore = storeLineUp;
-            minimalPage = pageLineUp;
+            minimalReplicated = replicationLineUp.intValue();
+            minimalStore = storeLineUp.intValue();
+            minimalPage = pageLineUp.intValue();
          }
 
          // On this case, we can just execute the context directly
-         if (replicationLineUp == replicated && storeLineUp == stored && pageLineUp == paged)
+         if (replicationLineUp.intValue() == replicated && storeLineUp.intValue() == stored &&
+                  pageLineUp.intValue() == paged)
          {
             // We want to avoid the executor if everything is complete...
             // However, we can't execute the context if there are executions pending
@@ -281,14 +280,9 @@
       }
    }
 
-   class TaskHolder
+   final class TaskHolder
    {
-      
-      
-      
-      /* (non-Javadoc)
-       * @see java.lang.Object#toString()
-       */
+
       @Override
       public String toString()
       {
@@ -302,19 +296,17 @@
                 "]";
       }
 
-      int storeLined;
+      final int storeLined;
+      final int replicationLined;
+      final int pageLined;
 
-      int replicationLined;
-      
-      int pageLined;
+      final IOAsyncTask task;
 
-      IOAsyncTask task;
-
       TaskHolder(final IOAsyncTask task)
       {
-         storeLined = storeLineUp;
-         replicationLined = replicationLineUp;
-         pageLined = pageLineUp;
+         storeLined = storeLineUp.intValue();
+         replicationLined = replicationLineUp.intValue();
+         pageLined = pageLineUp.intValue();
          this.task = task;
       }
    }
@@ -360,7 +352,7 @@
             buffer.append("Task = " + hold + "\n");
          }
       }
-      
+
       return "OperationContextImpl [minimalStore=" + minimalStore +
              ", storeLineUp=" +
              storeLineUp +
@@ -388,6 +380,6 @@
              "]" + buffer.toString();
    }
 
-   
 
+
 }

Modified: trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/TransactionCallback.java	2012-02-27 15:03:02 UTC (rev 12199)
+++ trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/TransactionCallback.java	2012-02-27 15:03:25 UTC (rev 12200)
@@ -39,7 +39,10 @@
 
    public void countUp()
    {
-      up++;
+      synchronized (this)
+      {
+         up++;
+      }
       countLatch.countUp();
    }
 



More information about the hornetq-commits mailing list