[jboss-svn-commits] JBL Code SVN: r29060 - labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Aug 26 04:57:55 EDT 2009
Author: jhalliday
Date: 2009-08-26 04:57:55 -0400 (Wed, 26 Aug 2009)
New Revision: 29060
Modified:
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
Log:
Prototype for async reaper insert/remove. JBTM-611
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java 2009-08-26 08:31:39 UTC (rev 29059)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java 2009-08-26 08:57:55 UTC (rev 29060)
@@ -42,6 +42,7 @@
import com.arjuna.common.util.logging.*;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Class to record transactions with non-zero timeout values, and class to
@@ -198,11 +199,31 @@
{
if (tsLogger.arjLogger.debugAllowed())
{
- tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "TransactionReaper::check ()");
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION, "TransactionReaper::check ()");
}
+
+ synchronized (this)
+ {
+ // purge the pending inerts before doing anything else. This may hold up other inserts
+ // for a while. Future versions may prefer to insert only a portion of the pending set.
+ Set<Map.Entry<Reapable,ReaperElement>> entrySet = _pendingInsertions.entrySet();
+ if(entrySet != null) {
+ Iterator<Map.Entry<Reapable,ReaperElement>> queueIter = entrySet.iterator();
+ while(queueIter.hasNext()) {
+ Map.Entry<Reapable,ReaperElement> entry = queueIter.next();
+ ReaperElement element = entry.getValue();
+ // inert is also locked on (this), but remove is not. So, we are careful to check that
+ // we don't insert an element that's been removed from the pending set by a concurrent thread.
+ if(entrySet.remove(entry)) {
+ synchronousInsert(element);
+ }
+ }
+ }
+ }
+
+
do
{
final ReaperElement e ;
@@ -757,47 +778,76 @@
{
return _listeners.remove(listener);
}
-
- /**
- * timeout is given in seconds, but we work in milliseconds.
- */
- public final boolean insert(Reapable control, int timeout)
- {
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "TransactionReaper::insert ( " + control + ", " + timeout
- + " )");
- }
- /*
- * Ignore if the timeout is zero, since this means the transaction
- * should never timeout.
- */
+ /**
+ * timeout is given in seconds, but we work in milliseconds.
+ */
+ public final boolean insert(Reapable control, int timeout)
+ {
+ if (tsLogger.arjLogger.debugAllowed())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "TransactionReaper::insert ( " + control + ", " + timeout
+ + " )");
+ }
- if (timeout == 0)
- return true;
+ /*
+ * Ignore if the timeout is zero, since this means the transaction
+ * should never timeout.
+ */
+ if (timeout == 0)
+ return true;
+
+
+
+ ReaperElement e = new ReaperElement(control, timeout);
+
+ boolean asyncInsert = false;
+
+ synchronized (this)
+ {
+ if(_transactions.size() > 0) {
+ ReaperElement first = (ReaperElement)_transactions.first();
+ // if the new element would timeout after the earliest one we already have,
+ // we can delay its insertion until that earlier timeout. Hopefully the new tx will
+ // complete before then and we'll never have to insert it at all.
+ if(first != null && e.compareTo(first) > 0) {
+ _pendingInsertions.put(control, e);
+ asyncInsert = true;
+ }
+ }
+
+ if(asyncInsert) {
+ return true;
+ } else {
+ return synchronousInsert(e);
+ }
+ }
+ }
+
+
+ private final boolean synchronousInsert(ReaperElement elementToInsert)
+ {
+
/**
* Ignore if it's already in the list with a different timeout.
* (This should never happen)
*/
- if (_timeouts.containsKey(control)) {
- return false; // remove this, rewrite put instead.
+ if (_timeouts.containsKey(elementToInsert._control)) {
+ return false; // TODO remove this, rewrite put instead.
}
- ReaperElement e = new ReaperElement(control, timeout);
-
synchronized (this)
{
- TransactionReaper._lifetime += timeout;
+ TransactionReaper._lifetime += elementToInsert._timeout;
- _timeouts.put(control, e);
- boolean rtn = _transactions.add(e);
+ _timeouts.put(elementToInsert._control, elementToInsert);
+ boolean rtn = _transactions.add(elementToInsert);
- if(_dynamic && _transactions.first() == e)
+ if(_dynamic && _transactions.first() == elementToInsert)
{
notifyAll(); // force recalc of next wakeup time, taking into account the newly inserted element
}
@@ -806,7 +856,20 @@
}
}
- public final boolean remove(java.lang.Object control)
+ public final boolean remove(java.lang.Object control)
+ {
+ // _pendingInsertions is a concurrent structure, so we don't lock it here. That means we need to be careful
+ // when transferring elements from pending to the real structures, see check() above. The synchronous remove
+ // must also take care to lock, or things may leak if a remove attempt happens concurrent to a pending copy.
+
+ if(_pendingInsertions.remove(control) != null) {
+ return true;
+ } else {
+ return synchronousRemove(control);
+ }
+ }
+
+ public final boolean synchronousRemove(java.lang.Object control)
{
if (tsLogger.arjLogger.debugAllowed())
{
@@ -1240,6 +1303,8 @@
private SortedSet _transactions = Collections.synchronizedSortedSet(new TreeSet()); // C of ReaperElement
private Map _timeouts = Collections.synchronizedMap(new HashMap()); // key = Reapable, value = ReaperElement
+ private final Map<Reapable, ReaperElement> _pendingInsertions = new ConcurrentHashMap<Reapable, ReaperElement>();
+
private List _workQueue = new LinkedList(); // C of ReaperElement
private Vector<ReaperMonitor> _listeners = new Vector<ReaperMonitor>();
More information about the jboss-svn-commits
mailing list