[jboss-svn-commits] JBL Code SVN: r29060 - labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Aug 26 04:57:55 EDT 2009


Author: jhalliday
Date: 2009-08-26 04:57:55 -0400 (Wed, 26 Aug 2009)
New Revision: 29060

Modified:
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
Log:
Prototype for async reaper insert/remove. JBTM-611


Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java	2009-08-26 08:31:39 UTC (rev 29059)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java	2009-08-26 08:57:55 UTC (rev 29060)
@@ -42,6 +42,7 @@
 import com.arjuna.common.util.logging.*;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Class to record transactions with non-zero timeout values, and class to
@@ -198,11 +199,31 @@
 	{
 	    if (tsLogger.arjLogger.debugAllowed())
 	    {
-		tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
-					 VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
-					 "TransactionReaper::check ()");
+            tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+                    FacilityCode.FAC_ATOMIC_ACTION,  "TransactionReaper::check ()");
 	    }
 
+
+        synchronized (this)
+        {
+            // purge the pending inerts before doing anything else. This may hold up other inserts
+            // for a while. Future versions may prefer to insert only a portion of the pending set.
+            Set<Map.Entry<Reapable,ReaperElement>> entrySet = _pendingInsertions.entrySet();
+            if(entrySet != null) {
+                Iterator<Map.Entry<Reapable,ReaperElement>> queueIter = entrySet.iterator();
+                while(queueIter.hasNext()) {
+                    Map.Entry<Reapable,ReaperElement> entry = queueIter.next();
+                    ReaperElement element = entry.getValue();
+                    // inert is also locked on (this), but remove is not. So, we are careful to check that
+                    // we don't insert an element that's been removed from the pending set by a concurrent thread.
+                    if(entrySet.remove(entry)) {
+                        synchronousInsert(element);
+                    }
+                }
+            }
+        }
+
+
 	    do
 	    {
 		final ReaperElement e ;
@@ -757,47 +778,76 @@
         {
             return _listeners.remove(listener);
         }
-        
-	/**
-	 * timeout is given in seconds, but we work in milliseconds.
-	 */
 
-	public final boolean insert(Reapable control, int timeout)
-	{
-		if (tsLogger.arjLogger.debugAllowed())
-		{
-			tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
-					VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
-					"TransactionReaper::insert ( " + control + ", " + timeout
-							+ " )");
-		}
 
-		/*
-		 * Ignore if the timeout is zero, since this means the transaction
-		 * should never timeout.
-		 */
+    /**
+     * timeout is given in seconds, but we work in milliseconds.
+     */
+    public final boolean insert(Reapable control, int timeout)
+    {
+        if (tsLogger.arjLogger.debugAllowed())
+        {
+            tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+                    VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+                    "TransactionReaper::insert ( " + control + ", " + timeout
+                            + " )");
+        }
 
-		if (timeout == 0)
-			return true;
+        /*
+         * Ignore if the timeout is zero, since this means the transaction
+         * should never timeout.
+         */
 
+        if (timeout == 0)
+            return true;
+
+
+
+        ReaperElement e = new ReaperElement(control, timeout);
+
+        boolean asyncInsert = false;
+
+        synchronized (this)
+        {
+            if(_transactions.size() > 0) {
+                ReaperElement first = (ReaperElement)_transactions.first();
+                // if the new element would timeout after the earliest one we already have,
+                // we can delay its insertion until that earlier timeout. Hopefully the new tx will
+                // complete before then and we'll never have to insert it at all.
+                if(first != null && e.compareTo(first) > 0) {
+                    _pendingInsertions.put(control, e);
+                    asyncInsert = true;
+                }
+            }
+
+            if(asyncInsert) {
+                return true;
+            } else {
+                return synchronousInsert(e);
+            }
+        }
+    }
+
+
+	private final boolean synchronousInsert(ReaperElement elementToInsert)
+	{
+
 		/**
 		 * Ignore if it's already in the list with a different timeout.
 		 * (This should never happen)
 		 */
-		if (_timeouts.containsKey(control)) {
-			return false; // remove this, rewrite put instead.
+		if (_timeouts.containsKey(elementToInsert._control)) {
+			return false; // TODO remove this, rewrite put instead.
 		}
 
-		ReaperElement e = new ReaperElement(control, timeout);
-
 		synchronized (this)
 		{
-			TransactionReaper._lifetime += timeout;
+			TransactionReaper._lifetime += elementToInsert._timeout;
 
-			_timeouts.put(control, e);
-			boolean rtn = _transactions.add(e);
+			_timeouts.put(elementToInsert._control, elementToInsert);
+			boolean rtn = _transactions.add(elementToInsert);
 
-			if(_dynamic && _transactions.first() == e)
+			if(_dynamic && _transactions.first() == elementToInsert)
 			{
 				notifyAll(); // force recalc of next wakeup time, taking into account the newly inserted element
 			}
@@ -806,7 +856,20 @@
 		}
 	}
 
-	public final boolean remove(java.lang.Object control)
+    public final boolean remove(java.lang.Object control)
+    {
+        // _pendingInsertions is a concurrent structure, so we don't lock it here. That means we need to be careful
+        // when transferring elements from pending to the real structures, see check() above. The synchronous remove
+        // must also take care to lock, or things may leak if a remove attempt happens concurrent to a pending copy. 
+
+        if(_pendingInsertions.remove(control) != null) {
+            return true;
+        } else {
+            return synchronousRemove(control);
+        }
+    }
+
+	public final boolean synchronousRemove(java.lang.Object control)
 	{
 		if (tsLogger.arjLogger.debugAllowed())
 		{
@@ -1240,6 +1303,8 @@
 	private SortedSet _transactions = Collections.synchronizedSortedSet(new TreeSet()); // C of ReaperElement
 	private Map _timeouts = Collections.synchronizedMap(new HashMap()); // key = Reapable, value = ReaperElement
 
+    private final Map<Reapable, ReaperElement> _pendingInsertions = new ConcurrentHashMap<Reapable, ReaperElement>();
+
 	private List _workQueue = new LinkedList(); // C of ReaperElement
 	
 	private Vector<ReaperMonitor> _listeners = new Vector<ReaperMonitor>();



More information about the jboss-svn-commits mailing list