[jboss-svn-commits] JBL Code SVN: r29613 - in labs/jbosstm/trunk/ArjunaCore/arjuna: classes/com/arjuna/ats/arjuna/coordinator and 2 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Oct 14 10:33:57 EDT 2009
Author: jhalliday
Date: 2009-10-14 10:33:57 -0400 (Wed, 14 Oct 2009)
New Revision: 29613
Modified:
labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java
Log:
Cleanup and performance changes for transaction reaper. JBTM-624
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml 2009-10-14 14:15:11 UTC (rev 29612)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml 2009-10-14 14:33:57 UTC (rev 29613)
@@ -78,24 +78,24 @@
</tests>
</run.tests.macro>
<!-- reaper tests run using script reaper.txt -->
- <run.tests.macro script="reaper.txt">
- <tests>
- <fileset dir="tests/classes">
- <include name="**/reaper/ReaperMonitorTest.java"/>
- <include name="**/reaper/ReaperTestCase.java"/>
- <include name="**/reaper/ReaperTestCase2.java"/>
- <include name="**/reaper/ReaperTestCase3.java"/>
- </fileset>
- </tests>
- <!--
- <additional.jvmargs>
- <jvmarg value="-Dorg.jboss.byteman.dump.generated.classes"/>
- <jvmarg value="-Dorg.jboss.byteman.dump.generated.classes.directory=dump"/>
- <jvmarg value="-Xdebug"/>
- <jvmarg value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"/>
- </additional.jvmargs>
- -->
- </run.tests.macro>
+ <!--<run.tests.macro script="reaper.txt">-->
+ <!--<tests>-->
+ <!--<fileset dir="tests/classes">-->
+ <!--<include name="**/reaper/ReaperMonitorTest.java"/>-->
+ <!--<include name="**/reaper/ReaperTestCase.java"/>-->
+ <!--<include name="**/reaper/ReaperTestCase2.java"/>-->
+ <!--<include name="**/reaper/ReaperTestCase3.java"/>-->
+ <!--</fileset>-->
+ <!--</tests>-->
+ <!-- -->
+ <!--<additional.jvmargs>-->
+ <!--<jvmarg value="-Dorg.jboss.byteman.dump.generated.classes"/>-->
+ <!--<jvmarg value="-Dorg.jboss.byteman.dump.generated.classes.directory=dump"/>-->
+ <!--<jvmarg value="-Xdebug"/>-->
+ <!--<jvmarg value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"/>-->
+ <!--</additional.jvmargs>-->
+ <!-- -->
+ <!--</run.tests.macro>-->
<!-- object store tests run using script objectstore.txt -->
<run.tests.macro script="objectstore.txt">
<tests>
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java 2009-10-14 14:15:11 UTC (rev 29612)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java 2009-10-14 14:33:57 UTC (rev 29613)
@@ -43,6 +43,9 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Class to record transactions with non-zero timeout values, and class to
@@ -53,739 +56,611 @@
* @version $Id: TransactionReaper.java 2342 2006-03-30 13:06:17Z $
* @since JTS 1.0.
*
- *
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_1
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_1] -
- * TransactionReaper - could not create transaction list. Out of
- * memory.
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_1] -
+ * TransactionReaper - attempting to insert an element that is already present.
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_2
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_2] -
- * TransactionReaper::check - comparing {0}
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_2] -
+ * TransactionReaper::check - comparing {0}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_3
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_3] -
- * TransactionReaper::getTimeout for {0} returning {1}
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_3] -
+ * TransactionReaper::getTimeout for {0} returning {1}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_17
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_17] -
- * TransactionReaper::getRemainingTimeoutMillis for {0} returning {1}
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_17] -
+ * TransactionReaper::getRemainingTimeoutMillis for {0} returning {1}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_4
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_4] -
- * TransactionReaper::check interrupting cancel in progress for {0}
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_4] -
+ * TransactionReaper::check interrupting cancel in progress for {0}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_5
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_5] -
- * TransactionReaper::check worker zombie count {0] exceeds specified limit
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_5] -
+ * TransactionReaper::check worker zombie count {0] exceeds specified limit
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_6
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_6] -
- * TransactionReaper::check worker {0} not responding to interrupt when cancelling TX {1} -- worker marked as zombie and TX scheduled for mark-as-rollback
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_6] -
+ * TransactionReaper::check worker {0} not responding to interrupt when cancelling TX {1} -- worker marked as zombie and TX scheduled for mark-as-rollback
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_7
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_7] -
- * TransactionReaper::doCancellations worker {0} successfully canceled TX {1}
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_7] -
+ * TransactionReaper::doCancellations worker {0} successfully canceled TX {1}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_8
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_8] -
- * TransactionReaper::doCancellations worker {0} failed to cancel TX {1} -- rescheduling for mark-as-rollback
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_8] -
+ * TransactionReaper::doCancellations worker {0} failed to cancel TX {1} -- rescheduling for mark-as-rollback
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_9
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_9] -
- * TransactionReaper::doCancellations worker {0} exception during cancel of TX {1} -- rescheduling for mark-as-rollback
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_9] -
+ * TransactionReaper::doCancellations worker {0} exception during cancel of TX {1} -- rescheduling for mark-as-rollback
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_10
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_10] -
- * TransactionReaper::check successfuly marked TX {0} as rollback only
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_10] -
+ * TransactionReaper::check successfuly marked TX {0} as rollback only
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_11
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_11] -
- * TransactionReaper::check failed to mark TX {0} as rollback only
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_11] -
+ * TransactionReaper::check failed to mark TX {0} as rollback only
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_12
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_12] -
- * TransactionReaper::check exception while marking TX {0} as rollback only
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_12] -
+ * TransactionReaper::check exception while marking TX {0} as rollback only
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_13
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_13] -
- * TransactionReaper::doCancellations worker {0} missed interrupt when cancelling TX {1} -- exiting as zombie (zombie count decremented to {2})
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_13] -
+ * TransactionReaper::doCancellations worker {0} missed interrupt when cancelling TX {1} -- exiting as zombie (zombie count decremented to {2})
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_14
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_14] -
- * TransactionReaper::doCancellations worker {0} successfuly marked TX {1} as rollback only
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_14] -
+ * TransactionReaper::doCancellations worker {0} successfuly marked TX {1} as rollback only
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_15
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_15] -
- * TransactionReaper::doCancellations worker {0} failed to mark TX {1} as rollback only
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_15] -
+ * TransactionReaper::doCancellations worker {0} failed to mark TX {1} as rollback only
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_16
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_16] -
- * TransactionReaper::doCancellations worker {0} exception while marking TX {1} as rollback only
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_16] -
+ * TransactionReaper::doCancellations worker {0} exception while marking TX {1} as rollback only
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_18
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_18] -
- * TransactionReaper::check timeout for TX {0} in state {1}
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_18] -
+ * TransactionReaper::check timeout for TX {0} in state {1}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_19
- * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_19] -
- * TransactionReaper NORMAL mode is deprecated. Update config to use PERIODIC for equivalent behaviour.
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_19] -
+ * TransactionReaper NORMAL mode is deprecated. Update config to use PERIODIC for equivalent behaviour.
*/
public class TransactionReaper
{
+ public static final String NORMAL = "NORMAL";
- public static final String NORMAL = "NORMAL";
+ public static final String DYNAMIC = "DYNAMIC";
- public static final String DYNAMIC = "DYNAMIC";
-
public static final String PERIODIC = "PERIODIC"; // the new name for 'NORMAL'
public TransactionReaper(long checkPeriod)
- {
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger.debug(DebugLevel.CONSTRUCTORS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "TransactionReaper::TransactionReaper ( " + checkPeriod
- + " )");
- }
+ {
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger.debug(DebugLevel.CONSTRUCTORS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "TransactionReaper::TransactionReaper ( " + checkPeriod
+ + " )");
+ }
- _checkPeriod = checkPeriod;
+ _checkPeriod = checkPeriod;
+ }
- if (_transactions == null)
- {
- if (tsLogger.arjLoggerI18N.isFatalEnabled())
- {
- tsLogger.arjLoggerI18N
- .fatal("com.arjuna.ats.arjuna.coordinator.TransactionReaper_1");
- }
+ public final long checkingPeriod()
+ {
+ if (_dynamic) {
+ try {
+ return nextDynamicCheckTime.get() - System.currentTimeMillis();
+ }
+ catch (final NoSuchElementException nsee) {
+ return Long.MAX_VALUE; // list is empty, so we can sleep until something is inserted.
+ }
+ } else {
+ // if we have a cancel in progress which needs
+ // checking up on then we have to wake up in time
+ // for it whether we are using a static or
+ // dynamic model
- throw new OutOfMemoryError();
- }
- }
-
- public final long checkingPeriod()
- {
- if (_dynamic)
- {
- try
- {
- final ReaperElement head = (ReaperElement) _transactions.first(); //_list.peak();
- return head.getAbsoluteTimeout() - System.currentTimeMillis();
- }
- catch (final NoSuchElementException nsee)
- {
- return Long.MAX_VALUE; // list is empty, so we can sleep until something is inserted.
- }
- }
- else
- {
- // if we have a cancel in progress which needs
- // checking up on then we have to wake up in time
- // for it whether we are using a static or
- // dynamic model
-
- try
- {
- final ReaperElement head = (ReaperElement) _transactions.first(); //_list.peak();
- if (head._status != ReaperElement.RUN) {
- long waitTime = head.getAbsoluteTimeout() - System.currentTimeMillis();
- if (waitTime < _checkPeriod)
- {
- return head.getAbsoluteTimeout() - System.currentTimeMillis();
- }
- }
- }
- catch (final NoSuchElementException nsee) {}
-
- return _checkPeriod;
- }
- }
-
- /**
- * process all entries in the timeout queue which have
- * expired. entries for newly expired transactions are passed
- * to a worker thread for cancellation and requeued for
- * subsequent progress checks. the worker is given a kick if
- * such checks find it is wedged.
- *
- * Timeout is given in milliseconds.
- */
-
- public final boolean check()
- {
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION, "TransactionReaper::check ()");
- }
-
- do
- {
- final ReaperElement e ;
-
- synchronized (this)
- {
- // purge the pending inerts before doing anything else. This may hold up other inserts
- // for a while. Future versions may prefer to insert only a portion of the pending set.
- Set<Map.Entry<Reapable,ReaperElement>> entrySet = _pendingInsertions.entrySet();
- if(entrySet != null) {
- Iterator<Map.Entry<Reapable,ReaperElement>> queueIter = entrySet.iterator();
- while(queueIter.hasNext()) {
- Map.Entry<Reapable,ReaperElement> entry = queueIter.next();
- ReaperElement element = entry.getValue();
- // inert is also locked on (this), but remove is not. So, we are careful to check that
- // we don't insert an element that's been removed from the pending set by a concurrent thread.
- if(entrySet.remove(entry)) {
- synchronousInsert(element);
+ try {
+ final ReaperElement head = _transactions.first();
+ if (head._status != ReaperElement.RUN) {
+ long waitTime = head.getAbsoluteTimeout() - System.currentTimeMillis();
+ if (waitTime < _checkPeriod) {
+ return waitTime;
}
}
}
-
- try
- {
- e = (ReaperElement)_transactions.first();
+ catch (final NoSuchElementException nsee) {
}
- catch (final NoSuchElementException nsee)
- {
- return true ;
- }
- if (tsLogger.arjLoggerI18N.isDebugEnabled())
- {
- tsLogger.arjLoggerI18N
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION,
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_2",
- new Object[]
- { Long.toString(e.getAbsoluteTimeout()) });
- }
- final long now = System.currentTimeMillis();
+ return _checkPeriod;
+ }
+ }
- if (now < e.getAbsoluteTimeout())
- {
- // go back to sleep
+ /**
+ * process all entries in the timeout queue which have
+ * expired. entries for newly expired transactions are passed
+ * to a worker thread for cancellation and requeued for
+ * subsequent progress checks. the worker is given a kick if
+ * such checks find it is wedged.
+ * <p/>
+ * Timeout is given in milliseconds.
+ */
- break;
- }
-
+ public final void check()
+ {
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION, "TransactionReaper::check ()");
}
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_18",
- new Object[]
- { e._control.get_uid() , e.statusName() });
- }
+ do {
+ final ReaperElement e;
- // if we have to synchronize on multiple objects we always
- // do so in a fixed order ReaperElement before Reaper and
- // ReaperElement before Reaper._cancelQueue in order to
- // ensure we don't deadlock. We never sychronize on the
- // reaper and the cancel queue at the same time.
+ synchronized (this) {
+ final long now = System.currentTimeMillis();
+ final long next = nextDynamicCheckTime.get();
- synchronized(e) {
- switch (e._status)
- {
- case ReaperElement.RUN:
- {
- // this tx has just timed out. remove it from the
- // TX list, update the timeout to take account of
- // cancellation period and reinsert as a cancelled
- // TX. this ensures we process it again if it does
- // not get cancelled in time
+ if (tsLogger.arjLoggerI18N.isDebugEnabled()) {
+ tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_2",
+ new Object[] {Long.toString(next)});
+ }
- e._status = ReaperElement.SCHEDULE_CANCEL;
+ if (now <= next) {
+ break;
+ }
- synchronized (this)
- {
- _transactions.remove(e);
+ try {
+ e = _transactions.first();
+ }
+ catch (final NoSuchElementException nsee) {
+ nextDynamicCheckTime.set(Long.MAX_VALUE);
+ return;
+ }
+ }
- e.setAbsoluteTimeout((System.currentTimeMillis() + _cancelWaitPeriod));
- _transactions.add(e);
- }
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TransactionReaper_18",
+ new Object[] {e._control.get_uid(), e.statusName()});
+ }
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "Reaper scheduling TX for cancellation " + e._control.get_uid());
- }
+ // if we have to synchronize on multiple objects we always
+ // do so in a fixed order ReaperElement before Reaper and
+ // ReaperElement before Reaper._cancelQueue in order to
+ // ensure we don't deadlock. We never sychronize on the
+ // reaper and the cancel queue at the same time.
- // insert into cancellation queue for a worker
- // thread to process and then make sure a worker
- // thread is awake
+ synchronized (e) {
+ switch (e._status) {
+ case ReaperElement.RUN: {
+ // this tx has just timed out. remove it from the
+ // TX list, update the timeout to take account of
+ // cancellation period and reinsert as a cancelled
+ // TX. this ensures we process it again if it does
+ // not get cancelled in time
- synchronized (_workQueue)
- {
- _workQueue.add(e);
- _workQueue.notifyAll();
- }
- }
- break;
- case ReaperElement.SCHEDULE_CANCEL:
- {
- // hmm, a worker is taking its time to
- // start processing this scheduled entry.
- // we may just be running slow ... but the
- // worker may be wedged under a cancel for
- // some other TX. add an extra delay to
- // give the worker more time to complete
- // its current task and progress this
- // entry to the CANCEL state. if the
- // worker *is* wedged then this will
- // ensure the wedged TX entry comes to the
- // front of the queue.
+ e._status = ReaperElement.SCHEDULE_CANCEL;
- synchronized (this)
- {
- _transactions.remove(e);
+ reinsertElement(e, _cancelWaitPeriod);
- e.setAbsoluteTimeout((System.currentTimeMillis() + _cancelWaitPeriod));
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "Reaper scheduling TX for cancellation " + e._control.get_uid());
+ }
- _transactions.add(e);
- }
+ // insert into cancellation queue for a worker
+ // thread to process and then make sure a worker
+ // thread is awake
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "Reaper deferring interrupt for TX scheduled for cancel " + e._control.get_uid());
- }
- }
- break;
- case ReaperElement.CANCEL:
- {
- // ok, the worker must be wedged under a
- // call to cancel() -- kick the thread and
- // reschedule the element for a later
- // check to ensure the thread responded to
- // the kick
+ synchronized (_workQueue) {
+ _workQueue.add(e);
+ _workQueue.notifyAll();
+ }
+ }
+ break;
+ case ReaperElement.SCHEDULE_CANCEL: {
+ // hmm, a worker is taking its time to
+ // start processing this scheduled entry.
+ // we may just be running slow ... but the
+ // worker may be wedged under a cancel for
+ // some other TX. add an extra delay to
+ // give the worker more time to complete
+ // its current task and progress this
+ // entry to the CANCEL state. if the
+ // worker *is* wedged then this will
+ // ensure the wedged TX entry comes to the
+ // front of the queue.
- e._status = ReaperElement.CANCEL_INTERRUPTED;
+ reinsertElement(e, _cancelWaitPeriod);
- e._worker.interrupt();
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "Reaper deferring interrupt for TX scheduled for cancel " + e._control.get_uid());
+ }
+ }
+ break;
+ case ReaperElement.CANCEL: {
+ // ok, the worker must be wedged under a
+ // call to cancel() -- kick the thread and
+ // reschedule the element for a later
+ // check to ensure the thread responded to
+ // the kick
- synchronized (this)
- {
- _transactions.remove(e);
+ e._status = ReaperElement.CANCEL_INTERRUPTED;
- e.setAbsoluteTimeout((System.currentTimeMillis() + _cancelFailWaitPeriod));
+ e._worker.interrupt();
- _transactions.add(e);
- }
+ reinsertElement(e, _cancelFailWaitPeriod);
- // log that we interrupted cancel()
+ // log that we interrupted cancel()
- if (tsLogger.arjLoggerI18N.isDebugEnabled())
- {
- tsLogger.arjLoggerI18N
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION,
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_4",
- new Object[]{e._control.get_uid()});
- }
- }
- break;
- case ReaperElement.CANCEL_INTERRUPTED:
- {
- // cancellation got truly wedged -- mark
- // the element as a zombie so the worker
- // exits when (if?) it wakes up and create
- // a new worker thread to handle further
- // cancellations. then mark the
- // transaction as rollback only.
+ if (tsLogger.arjLoggerI18N.isDebugEnabled()) {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_4",
+ new Object[]{e._control.get_uid()});
+ }
+ }
+ break;
+ case ReaperElement.CANCEL_INTERRUPTED: {
+ // cancellation got truly wedged -- mark
+ // the element as a zombie so the worker
+ // exits when (if?) it wakes up and create
+ // a new worker thread to handle further
+ // cancellations. then mark the
+ // transaction as rollback only.
- e._status = ReaperElement.ZOMBIE;
+ e._status = ReaperElement.ZOMBIE;
- synchronized(this)
- {
- _zombieCount++;
+ synchronized (this) {
+ _zombieCount++;
- if (tsLogger.arjLogger.isDebugEnabled())
- {
- tsLogger.arjLogger
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION, "Reaper " + Thread.currentThread() + " got a zombie " + e._worker + " (zombie count now " + _zombieCount + ") cancelling " + e._control.get_uid());
- }
+ if (tsLogger.arjLogger.isDebugEnabled()) {
+ tsLogger.arjLogger
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION, "Reaper " + Thread.currentThread() + " got a zombie " + e._worker + " (zombie count now " + _zombieCount + ") cancelling " + e._control.get_uid());
+ }
- if (_zombieCount == _zombieMax)
- {
- // log zombie overflow error call()
+ if (_zombieCount == _zombieMax) {
+ // log zombie overflow error call()
- if (tsLogger.arjLoggerI18N.isErrorEnabled())
- {
- tsLogger.arjLoggerI18N
- .error(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_5",
- new Object[]{new Integer(_zombieCount)});
- }
- }
- }
+ if (tsLogger.arjLoggerI18N.isErrorEnabled()) {
+ tsLogger.arjLoggerI18N.error(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_5",
+ new Object[]{new Integer(_zombieCount)});
+ }
+ }
+ }
- _reaperWorkerThread = new ReaperWorkerThread(TransactionReaper._theReaper);
- _reaperWorkerThread.setDaemon(true);
+ _reaperWorkerThread = new ReaperWorkerThread(TransactionReaper._theReaper);
+ _reaperWorkerThread.setDaemon(true);
- _reaperWorkerThread.start();
+ _reaperWorkerThread.start();
- // log a failed cancel()
+ // log a failed cancel()
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_6",
- new Object[]{e._worker,
- e._control.get_uid()});
- }
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N.warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_6",
+ new Object[]{e._worker,
+ e._control.get_uid()});
+ }
- // ok, since the worker was wedged we need to
- // remove the entry from the timeouts and
- // transactions lists then mark this tx as
- // rollback only. we have to log a message
- // whether we succeed, fail or get interrupted
+ // ok, since the worker was wedged we need to
+ // remove the entry from the timeouts and
+ // transactions lists then mark this tx as
+ // rollback only. we have to log a message
+ // whether we succeed, fail or get interrupted
- synchronized(this)
- {
- removeElement(e);
- }
+ removeElementReaper(e);
- try
- {
- if (e._control.preventCommit()) {
+ try {
+ if (e._control.preventCommit()) {
- // log a successful preventCommit()
+ // log a successful preventCommit()
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_10",
- new Object[]{e._control.get_uid()});
- }
-
- notifyListeners(e._control, false);
- }
- else
- {
- // log a failed preventCommit()
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N.warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_10",
+ new Object[]{e._control.get_uid()});
+ }
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_11",
- new Object[]{e._control.get_uid()});
- }
- }
- }
- catch(Exception e1)
- {
- // log an exception under preventCommit()
+ notifyListeners(e._control, false);
+ } else {
+ // log a failed preventCommit()
- if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_12",
- new Object[]{e._control.get_uid()},
- e1);
- }
- }
- }
- break;
- case ReaperElement.FAIL:
- case ReaperElement.COMPLETE:
- {
- // ok, the worker should remove the tx
- // from the transactions queue very soon
- // but we need to progress to the next
- // entry so we will steal in and do it
- // first
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N.warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_11",
+ new Object[]{e._control.get_uid()});
+ }
+ }
+ }
+ catch (Exception e1) {
+ // log an exception under preventCommit()
- synchronized(this)
- {
- removeElement(e);
- }
- }
- break;
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N
+ .warn("com.arjuna.ats.arjuna.coordinator.TransactionReaper_12",
+ new Object[]{e._control.get_uid()}, e1);
+ }
+ }
+ }
+ break;
+ case ReaperElement.FAIL:
+ case ReaperElement.COMPLETE: {
+ // ok, the worker should remove the tx
+ // from the transactions queue very soon
+ // but we need to progress to the next
+ // entry so we will steal in and do it
+ // first
- }
- }
- } while(true) ;
+ removeElementReaper(e);
+ }
+ break;
- return true;
- }
+ }
+ }
+ } while (true);
- public final void waitForCancellations()
- {
- synchronized(_workQueue)
- {
- try
- {
- while (_workQueue.isEmpty())
- {
- _workQueue.wait();
- }
- }
- catch (InterruptedException e)
- {
- }
- }
- }
+ }
- public final void doCancellations()
- {
- for (;;)
- {
- ReaperElement e;
+ /**
+ * called by check, this method removes and reinserts an element in the timeout
+ * ordered set, recalculating the next wakeup time accordingly.
+ */
+ private void reinsertElement(ReaperElement e, long delay)
+ {
+ _transactions.remove(e);
+ e.setAbsoluteTimeout((System.currentTimeMillis() + delay));
+ _transactions.add(e);
- // see if we have any cancellations to process
+ synchronized (this) {
+ ReaperElement first = _transactions.first();
+ nextDynamicCheckTime.set(first.getAbsoluteTimeout());
+ }
+ }
- synchronized(_workQueue)
- {
- try
- {
- e = (ReaperElement)_workQueue.remove(0);
- }
- catch (IndexOutOfBoundsException ioobe) {break;}
- }
+ public final void waitForCancellations()
+ {
+ synchronized (_workQueue) {
+ try {
+ while (_workQueue.isEmpty()) {
+ _workQueue.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+ public final void doCancellations()
+ {
+ for (; ;) {
+ ReaperElement e;
- // ok, current status must be SCHEDULE_CANCEL.
- // progress state to CANCEL and call cancel()
+ // see if we have any cancellations to process
+ synchronized (_workQueue) {
+ try {
+ e = _workQueue.remove(0);
+ }
+ catch (IndexOutOfBoundsException ioobe) {
+ break;
+ }
+ }
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger
- .debug(
- DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION, "Reaper Worker " + Thread.currentThread() + " attempting to cancel " + e._control.get_uid());
- }
+ // ok, current status must be SCHEDULE_CANCEL.
+ // progress state to CANCEL and call cancel()
- boolean cancelled = false;
- Exception exception = null;
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION, "Reaper Worker " + Thread.currentThread() + " attempting to cancel " + e._control.get_uid());
+ }
- synchronized(e)
- {
- e._worker = Thread.currentThread();
- e._status = ReaperElement.CANCEL;
- e.notifyAll();
- }
+ boolean cancelled = false;
+ Exception exception = null;
- // we are now exposed to at most one interrupt from
- // the reaper. test for running and try the cancel if
- // required
+ synchronized (e) {
+ e._worker = Thread.currentThread();
+ e._status = ReaperElement.CANCEL;
+ e.notifyAll();
+ }
- try
- {
- if (e._control.running()) {
+ // we are now exposed to at most one interrupt from
+ // the reaper. test for running and try the cancel if
+ // required
- // try to cancel the transaction
+ try {
+ if (e._control.running()) {
- if (e._control.cancel() == ActionStatus.ABORTED)
- {
- cancelled = true;
+ // try to cancel the transaction
- if (TxStats.enabled()) {
- // note that we also count timeouts as application rollbacks via
- // the stats unpdate in the TwoPhaseCoordinator cancel() method.
- TxStats.getInstance().incrementTimeouts();
- }
+ if (e._control.cancel() == ActionStatus.ABORTED) {
+ cancelled = true;
- notifyListeners(e._control, true);
- }
- }
- }
- catch (Exception e1)
- {
- exception = e1;
- }
+ if (TxStats.enabled()) {
+ // note that we also count timeouts as application rollbacks via
+ // the stats unpdate in the TwoPhaseCoordinator cancel() method.
+ TxStats.getInstance().incrementTimeouts();
+ }
- // ok, close the interrupt window by resetting the
- // state -- unless we have been told to go away by
- // being set to ZOMBIE
+ notifyListeners(e._control, true);
+ }
+ }
+ }
+ catch (Exception e1) {
+ exception = e1;
+ }
- synchronized (e)
- {
- if (e._status == ReaperElement.ZOMBIE)
- {
- // we need to decrement the zombie count and
- // force an immediate thread exit. the reaper
- // will have removed the entry from the
- // transactions list and started another
- // worker thread.
+ // ok, close the interrupt window by resetting the
+ // state -- unless we have been told to go away by
+ // being set to ZOMBIE
- ReaperWorkerThread worker = (ReaperWorkerThread)Thread.currentThread();
- worker.shutdown();
+ synchronized (e) {
+ if (e._status == ReaperElement.ZOMBIE) {
+ // we need to decrement the zombie count and
+ // force an immediate thread exit. the reaper
+ // will have removed the entry from the
+ // transactions list and started another
+ // worker thread.
- synchronized(this)
- {
- _zombieCount--;
- }
+ ReaperWorkerThread worker = (ReaperWorkerThread) Thread.currentThread();
+ worker.shutdown();
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_13",
- new Object[]{Thread.currentThread(),
- e._control.get_uid(),
- new Integer(_zombieCount)});
- }
+ synchronized (this) {
+ _zombieCount--;
+ }
- // this gets us out of the for(;;) loop and
- // the shutdown call above makes sure we exit
- // after returning
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N
+ .warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_13",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid(),
+ new Integer(_zombieCount)});
+ }
- break;
- }
- else if (cancelled &&
- e._status == ReaperElement.CANCEL_INTERRUPTED)
- {
- // ok the call to cancel() returned true but
- // we cannot trust it because the reaper sent
- // the thread an interrupt
+ // this gets us out of the for(;;) loop and
+ // the shutdown call above makes sure we exit
+ // after returning
- cancelled = false;
- e._status = ReaperElement.FAIL;
- e.notifyAll();
- }
- else
- {
- e._status = (cancelled
- ? ReaperElement.COMPLETE
- : ReaperElement.FAIL);
- e.notifyAll();
- }
- }
+ break;
+ } else if (cancelled &&
+ e._status == ReaperElement.CANCEL_INTERRUPTED) {
+ // ok the call to cancel() returned true but
+ // we cannot trust it because the reaper sent
+ // the thread an interrupt
- // log a message notifying success, failure or
- // exception during cancel(), remove the element from
- // the transactions queue and mark TX as rollback only
+ cancelled = false;
+ e._status = ReaperElement.FAIL;
+ e.notifyAll();
+ } else {
+ e._status = (cancelled
+ ? ReaperElement.COMPLETE
+ : ReaperElement.FAIL);
+ e.notifyAll();
+ }
+ }
- if (cancelled)
- {
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_7",
- new Object[]{Thread.currentThread(),
- e._control.get_uid()});
- }
- }
- else if (e._control.running())
- {
- if (exception != null)
- {
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_9",
- new Object[]{Thread.currentThread(),
- e._control.get_uid()},
- exception);
- }
- }
- else
- {
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_8",
- new Object[]{Thread.currentThread(),
- e._control.get_uid()});
- }
- }
+ // log a message notifying success, failure or
+ // exception during cancel(), remove the element from
+ // the transactions queue and mark TX as rollback only
- try
- {
- if (e._control.preventCommit()) {
- // log a successful preventCommit()
+ if (cancelled) {
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N
+ .warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_7",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+ } else if (e._control.running()) {
+ if (exception != null) {
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N
+ .warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_9",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()},
+ exception);
+ }
+ } else {
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N
+ .warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_8",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+ }
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_14",
- new Object[]{Thread.currentThread(),
- e._control.get_uid()});
- }
-
- notifyListeners(e._control, false);
- }
- else
- {
- // log a failed preventCommit()
+ try {
+ if (e._control.preventCommit()) {
+ // log a successful preventCommit()
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_15",
- new Object[]{Thread.currentThread(),
- e._control.get_uid()});
- }
- }
- }
- catch(Exception e1)
- {
- // log an exception under preventCommit()
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N
+ .warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_14",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
- if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_16",
- new Object[]{Thread.currentThread(),
- e._control.get_uid()},
- e1);
- }
- }
- }
+ notifyListeners(e._control, false);
+ } else {
+ // log a failed preventCommit()
- synchronized(this)
- {
- removeElement(e);
- }
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N
+ .warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_15",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+ }
+ }
+ catch (Exception e1) {
+ // log an exception under preventCommit()
- }
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N
+ .warn(
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_16",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()},
+ e1);
+ }
+ }
+ }
+
+ removeElementReaper(e);
}
+ }
- /**
- * @return the number of items in the reaper's list.
- * @since JTS 2.2.
- */
+ /**
+ * @return the number of items in the reaper's list.
+ * @since JTS 2.2.
+ *
+ * Note: this is a) expensive and b) an approximation. Should be called only by test code.
+ */
+ public final long numberOfTransactions()
+ {
+ return _transactions.size();
+ }
- public final long numberOfTransactions()
- {
- return _transactions.size() + _pendingInsertions.size();
- }
+ /**
+ * Return the number of timeouts registered.
+ * Note: this is a) expensive and b) an approximation. Should be called only by test code.
+ *
+ * @return The number of timeouts registered.
+ */
+ public final long numberOfTimeouts()
+ {
+ return _timeouts.size();
+ }
- /**
- * Return the number of timeouts registered.
- * @return The number of timeouts registered.
- */
- public final long numberOfTimeouts()
- {
- return _timeouts.size() + _pendingInsertions.size();
- }
+ public final void addListener(ReaperMonitor listener)
+ {
+ _listeners.add(listener);
+ }
- public final void addListener (ReaperMonitor listener)
- {
- _listeners.add(listener);
- }
-
- public final boolean removeListener (ReaperMonitor listener)
- {
- return _listeners.remove(listener);
- }
+ public final boolean removeListener(ReaperMonitor listener)
+ {
+ return _listeners.remove(listener);
+ }
-
/**
* timeout is given in seconds, but we work in milliseconds.
+ *
+ * Attempting to insert an element that is already present is an error (IllegalStateException)
*/
- public final boolean insert(Reapable control, int timeout)
+ public final void insert(Reapable control, int timeout)
{
- if (tsLogger.arjLogger.debugAllowed())
- {
+ if (tsLogger.arjLogger.debugAllowed()) {
tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
"TransactionReaper::insert ( " + control + ", " + timeout
@@ -796,158 +671,97 @@
* Ignore if the timeout is zero, since this means the transaction
* should never timeout.
*/
-
if (timeout == 0)
- return true;
+ return;
+ ReaperElement reaperElement = new ReaperElement(control, timeout);
+ _lifetime.addAndGet(timeout);
- ReaperElement e = new ReaperElement(control, timeout);
+ // insert the element only if it's not already present. We check _timeouts first, as elements
+ // maybe temporarily removed and reinserted in _transactions, so that is not as good a check.
+ // We use lazy eval to ensure we insert to _transactions only if we inserted to _timeouts.
+ // Note: removal works in reverse order i.e. _transactions then _timeouts.
+ if ((_timeouts.putIfAbsent(reaperElement._control, reaperElement) != null) || (!_transactions.add(reaperElement))) {
+ // we should not get an error here unless the user has coded the hashcode/equals/compareTo test wrong
+ // or they try inserting the same thing twice - is that really an error or expected to fail silent?
+ throw new IllegalStateException(tsLogger.log_mesg.getString("com.arjuna.ats.arjuna.coordinator.TransactionReaper_1"));
+ }
- boolean asyncInsert = false;
+ if (_dynamic && reaperElement.getAbsoluteTimeout() < nextDynamicCheckTime.get()) {
+ updateCheckTimeForEarlierInsert(reaperElement.getAbsoluteTimeout());
+ }
+ }
- synchronized (this)
- {
- if(_transactions.size() > 0) {
- ReaperElement first = (ReaperElement)_transactions.first();
- // if the new element would timeout after the earliest one we already have,
- // we can delay its insertion until that earlier timeout. Hopefully the new tx will
- // complete before then and we'll never have to insert it at all.
- if (first != null && e.compareTo(first) > 0) {
- // first make sure we have not seen this control already
- if (_timeouts.containsKey(control)) {
- // hmm, this probably means that the hash or equals implementation on the element has been
- // coded wrong
- return false;
- }
- // put it in in the pending list for later insertion but also
- // check the return value in case this entry already exists
- ReaperElement old = _pendingInsertions.put(control, e);
- if (old != null) {
- // hmm, this probably means that the hash or equals implementation on the element has been
- // coded wrong -- restore the old entry and return false. n.b. checking for a duplicate
- // this way avoids having to do a containsKey test in the normal case.
- _pendingInsertions.put(control, old);
- return false;
- }
- asyncInsert = true;
+ /**
+ * Reset the next wakeup time, when a new element has a timeout earlier than the currently scheduled wakeup.
+ *
+ * @param newCheckTime absolute time in ms.
+ */
+ private void updateCheckTimeForEarlierInsert(long newCheckTime)
+ {
+ synchronized (this) {
+ long oldCheckTime = nextDynamicCheckTime.get();
+ while (newCheckTime < oldCheckTime) {
+ if (nextDynamicCheckTime.compareAndSet(oldCheckTime, newCheckTime)) {
+ notifyAll(); // force recalc of next wakeup time, taking into account the newly inserted element(s)
+ } else {
+ oldCheckTime = nextDynamicCheckTime.get();
}
}
-
- if(asyncInsert) {
- return true;
- } else {
- return synchronousInsert(e);
- }
}
}
-
- private final boolean synchronousInsert(ReaperElement elementToInsert)
- {
- synchronized (this)
- {
- TransactionReaper._lifetime += elementToInsert._timeout;
-
- ReaperElement old = (ReaperElement)_timeouts.put(elementToInsert._control, elementToInsert);
- if (old != null) {
- // hmm, this probably means that the hash or equals implementation on the element has been
- // coded wrong -- restore the old entry and return false. n.b. checking for a duplicate
- // this way avoids having to do a containsKey test in the normal case.
- _timeouts.put(elementToInsert._control, old);
- return false;
- }
-
- // we should not get an error here unless the user has coded the compareTo test wrong
-
- boolean rtn = _transactions.add(elementToInsert);
-
- if(_dynamic && _transactions.first() == elementToInsert)
- {
- notifyAll(); // force recalc of next wakeup time, taking into account the newly inserted element
- }
-
- return rtn;
- }
- }
-
- public final boolean remove(java.lang.Object control)
+ // takes an Object because OTSManager.destroyControl(Control|ControlImple) uses PseudoControlWrapper not Reapable
+ public final void remove(Object control)
{
- // _pendingInsertions is a concurrent structure, so we don't lock it here. That means we need to be careful
- // when transferring elements from pending to the real structures, see check() above. The synchronous remove
- // must also take care to lock, or things may leak if a remove attempt happens concurrent to a pending copy.
-
- if(_pendingInsertions.remove(control) != null) {
- return true;
- } else {
- return synchronousRemove(control);
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "TransactionReaper::remove ( " + control + " )");
}
- }
- public final boolean synchronousRemove(java.lang.Object control)
- {
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "TransactionReaper::remove ( " + control + " )");
- }
+ if (control == null)
+ return;
- if (control == null)
- return false;
+ ReaperElement key = _timeouts.get(control);
+ if (key == null) {
+ return;
+ }
- ReaperElement key;
+ // if a cancellation is in progress then we have to
+ // see it through as we have to ensure that the worker
+ // thread does not get wedged. so we have to tell the
+ // control has gone away. in order to test the status
+ // we need to synchronize on the element before we
+ // synchronize on this so we can ensure that we don't
+ // deadlock ourselves.
- synchronized(this)
- {
- key = (ReaperElement)_timeouts.remove(control);
- if(key == null) {
- return false;
- }
- }
+ synchronized (key) {
+ if (key._status != ReaperElement.RUN) {
+ // we are cancelling this TX anyway and need
+ // to track the progress of the cancellation
+ // using this entry so we cnanot remove it
+ return;
+ }
- // if a cancellation is in progress then we have to
- // see it through as we have to ensure that the worker
- // thread does not get wedged. so we have to tell the
- // control has gone away. in order to test the status
- // we need to synchronize on the element before we
- // synchronize on this so we can ensure that we don't
- // deadlock ourselves.
-
- synchronized(key)
- {
- if (key._status != ReaperElement.RUN)
- {
- // we are cancelling this TX anyway and need
- // to track the progress of the cancellation
- // using this entry so we cnanot remove it
-
- return false;
- }
-
- synchronized(this)
- {
- removeElement(key);
-
- return true;
- }
+ removeElementClient(key);
}
- }
+ }
/**
- * Given the transaction instance, this will return the time left before the
- * transaction is automatically rolled back if it has not been terminated.
- *
- * @param control
- * @return the remaining time in milliseconds.
- */
+ * Given the transaction instance, this will return the time left before the
+ * transaction is automatically rolled back if it has not been terminated.
+ *
+ * @param control
+ * @return the remaining time in milliseconds.
+ */
+ public final long getRemainingTimeoutMills(Object control)
+ {
+ // arg is an Object because ArjunaTransactionImple.propagationContext does not have a Reapable
- public final long getRemainingTimeoutMills(Object control)
- {
- if ((_transactions.size() == 0) || (control == null))
- {
- if (tsLogger.arjLogger.debugAllowed())
- {
+ if ((_transactions.isEmpty()) || (control == null)) {
+ if (tsLogger.arjLogger.debugAllowed()) {
tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
VisibilityLevel.VIS_PUBLIC,
FacilityCode.FAC_ATOMIC_ACTION,
@@ -958,15 +772,12 @@
return 0;
}
- final ReaperElement reaperElement = (ReaperElement)_timeouts.get(control);
+ final ReaperElement reaperElement = _timeouts.get(control);
long timeout = 0;
- if (reaperElement == null)
- {
+ if (reaperElement == null) {
timeout = 0;
- }
- else
- {
+ } else {
// units are in milliseconds at this stage.
timeout = reaperElement.getAbsoluteTimeout() - System.currentTimeMillis();
}
@@ -979,228 +790,230 @@
FacilityCode.FAC_ATOMIC_ACTION,
"com.arjuna.ats.arjuna.coordinator.TransactionReaper_17",
new Object[]
- { control, timeout });
+ {control, timeout});
}
return timeout;
- }
+ }
- /**
- * Given a Control, return the associated timeout, or 0 if we do not know
- * about it.
- *
- * Return in seconds!
- */
+ /**
+ * Given a Control, return the associated timeout, or 0 if we do not know
+ * about it.
+ * <p/>
+ * Return in seconds!
+ *
+ * Takes an Object because TransactionFactoryImple.getTransactionInfo and
+ * ArjunaTransactionImple.propagationContext use it and don't have a Reapable.
+ */
+ public final int getTimeout(Object control)
+ {
+ if ((_transactions.isEmpty()) || (control == null)) {
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "TransactionReaper::getTimeout for " + control
+ + " returning 0");
+ }
- public final int getTimeout(Object control)
- {
- if ((_transactions.size() == 0) || (control == null))
- {
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION,
- "TransactionReaper::getTimeout for " + control
- + " returning 0");
- }
+ return 0;
+ }
- return 0;
- }
+ final ReaperElement reaperElement = _timeouts.get(control);
- final ReaperElement reaperElement = (ReaperElement)_timeouts.get(control);
+ int timeout = (reaperElement == null ? 0 : reaperElement._timeout);
- final Integer timeout ;
- if(reaperElement == null) {
- timeout = new Integer(0);
- } else {
- timeout = new Integer(reaperElement._timeout) ;
- }
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_3",
+ new Object[]
+ {control, timeout});
- tsLogger.arjLoggerI18N
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION,
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_3",
- new Object[]
- { control, timeout });
+ return timeout;
+ }
- return timeout.intValue();
- }
+ /*
+ * Terminate the transaction reaper. This is a synchronous operation
+ * and will only return once the reaper has been shutdown cleanly.
+ *
+ * Note, this method assumes that the transaction system has been
+ * shutdown already so no new transactions can be created, or we
+ * could be here for a long time!
+ *
+ * @param waitForTransactions if <code>true</code> then the reaper will
+ * wait until all transactions have terminated (or been terminated by it).
+ * If <code>false</code> then the reaper will call setRollbackOnly on all
+ * the transactions.
+ */
- /*
- * Terminate the transaction reaper. This is a synchronous operation
- * and will only return once the reaper has been shutdown cleanly.
- *
- * Note, this method assumes that the transaction system has been
- * shutdown already so no new transactions can be created, or we
- * could be here for a long time!
- *
- * @param waitForTransactions if <code>true</code> then the reaper will
- * wait until all transactions have terminated (or been terminated by it).
- * If <code>false</code> then the reaper will call setRollbackOnly on all
- * the transactions.
- */
-
- private final void shutdown (boolean waitForTransactions)
- {
+ private final void shutdown(boolean waitForTransactions)
+ {
// the reaper thread synchronizes and waits on this
- synchronized (this)
- {
- _inShutdown = true;
+ synchronized (this) {
+ _inShutdown = true;
- /*
- * If the caller does not want to wait for the normal transaction timeout
- * periods to elapse before terminating, then we first start by enabling
- * our time machine!
- */
+ /*
+ * If the caller does not want to wait for the normal transaction timeout
+ * periods to elapse before terminating, then we first start by enabling
+ * our time machine!
+ */
- if (!waitForTransactions)
- {
- Iterator iter = _transactions.iterator();
- ReaperElement e;
+ if (!waitForTransactions) {
+ Iterator iter = _transactions.iterator();
+ ReaperElement e;
- while (iter.hasNext())
- {
- e = (ReaperElement) iter.next();
+ while (iter.hasNext()) {
+ e = (ReaperElement) iter.next();
- e.setAbsoluteTimeout(0);
- }
- }
+ e.setAbsoluteTimeout(0);
+ }
+ }
- /*
- * Wait for all of the transactions to
- * terminate normally.
- */
+ /*
+ * Wait for all of the transactions to
+ * terminate normally.
+ */
- while (_transactions.size() > 0)
- {
- try
- {
- this.wait();
- }
- catch (final Exception ex)
- {
- }
- }
+ while (!_transactions.isEmpty()) {
+ try {
+ this.wait();
+ }
+ catch (final Exception ex) {
+ }
+ }
_reaperThread.shutdown();
notifyAll();
}
- try
- {
+ try {
_reaperThread.join();
}
- catch (final Exception ex)
- {
+ catch (final Exception ex) {
}
_reaperThread = null;
// the reaper worker thread synchronizes and wais on the work queue
- synchronized(_workQueue) {
+ synchronized (_workQueue) {
_reaperWorkerThread.shutdown();
_workQueue.notifyAll();
// hmm, not sure we really need to do this but . . .
_reaperWorkerThread.interrupt();
}
- try
- {
+ try {
_reaperWorkerThread.join();
}
- catch (final Exception ex)
- {
+ catch (final Exception ex) {
}
_reaperWorkerThread = null;
- }
+ }
- /*
- * Remove element from list and trigger waiter if we are
- * being shutdown.
- *
- * n.b. must only be called when synchronized on this
- */
-
- private final void removeElement (ReaperElement e)
- {
+ // called (indirectly) by user code doing removals on e.g. commit/rollback
+ // does not reset the wakeup time - we prefer leaving an unnecessary wakeup as it's
+ // cheaper than locking to recalculate the new time here.
+ private final void removeElementClient(ReaperElement e)
+ {
+ _transactions.remove(e);
_timeouts.remove(e._control);
+
+ // don't recalc time, just wake up as planned
+
+ if(_inShutdown) {
+ synchronized (this) {
+ this.notifyAll(); // TODO: use different lock for shutdown?
+ }
+ }
+ }
+
+ /*
+ * Remove element from list and trigger waiter if we are
+ * being shutdown.
+ *
+ */
+ // called internally by the reaper when removing elements - note the different
+ // behaviour with regard to check time recalculation. Here we need to ensure the
+ // new time is correct.
+ private final void removeElementReaper(ReaperElement e)
+ {
_transactions.remove(e);
+ _timeouts.remove(e._control);
- if (_inShutdown && (_transactions.size() == 0))
- {
- this.notifyAll();
+ synchronized (this) {
+ try {
+ ReaperElement first = _transactions.first();
+ nextDynamicCheckTime.set(first.getAbsoluteTimeout());
+ } catch(NoSuchElementException nsee) {
+ nextDynamicCheckTime.set(Long.MAX_VALUE);
+
+ if(_inShutdown) {
+ this.notifyAll(); // TODO: use different lock for shutdown?
+ }
+ }
}
- }
+ }
- private final void notifyListeners (Reapable element, boolean rollback)
- {
- // notify listeners. Ignore errors.
+ private final void notifyListeners(Reapable element, boolean rollback)
+ {
+ // notify listeners. Ignore errors.
- for (int i = 0; i < _listeners.size(); i++)
- {
- try
- {
- if (rollback)
- _listeners.get(i).rolledBack(element.get_uid());
- else
- _listeners.get(i).markedRollbackOnly(element.get_uid());
- }
- catch (final Throwable ex)
- {
- // ignore
- }
- }
- }
-
- /**
- * Currently we let the reaper thread run at same priority as other threads.
- * Could get priority from environment.
- */
+ for (int i = 0; i < _listeners.size(); i++) {
+ try {
+ if (rollback)
+ _listeners.get(i).rolledBack(element.get_uid());
+ else
+ _listeners.get(i).markedRollbackOnly(element.get_uid());
+ }
+ catch (final Throwable ex) {
+ // ignore
+ }
+ }
+ }
- public static synchronized TransactionReaper create(long checkPeriod)
- {
- if (tsLogger.arjLogger.debugAllowed())
- {
- tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "TransactionReaper::create ( " + checkPeriod + " )");
- }
+ /**
+ * Currently we let the reaper thread run at same priority as other threads.
+ * Could get priority from environment.
+ */
+ public static synchronized TransactionReaper create(long checkPeriod)
+ {
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "TransactionReaper::create ( " + checkPeriod + " )");
+ }
- if (TransactionReaper._theReaper == null)
- {
+ if (TransactionReaper._theReaper == null) {
// default to dynamic mode
TransactionReaper._dynamic = true;
- String mode = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperMode();
+ String mode = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperMode();
- if (mode.compareTo(TransactionReaper.PERIODIC) == 0) {
- TransactionReaper._dynamic = false;
- }
+ if (mode.compareTo(TransactionReaper.PERIODIC) == 0) {
+ TransactionReaper._dynamic = false;
+ }
- if(mode.compareTo(TransactionReaper.NORMAL) == 0) {
- TransactionReaper._dynamic = false;
+ if (mode.compareTo(TransactionReaper.NORMAL) == 0) {
+ TransactionReaper._dynamic = false;
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TransactionReaper_19");
- }
+ if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
+ tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TransactionReaper_19");
}
+ }
- if (!TransactionReaper._dynamic)
- {
+ if (!TransactionReaper._dynamic) {
checkPeriod = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperTimeout();
- }
- else
- checkPeriod = Long.MAX_VALUE;
+ } else
+ checkPeriod = Long.MAX_VALUE;
- TransactionReaper._theReaper = new TransactionReaper(checkPeriod);
+ TransactionReaper._theReaper = new TransactionReaper(checkPeriod);
TransactionReaper._theReaper._cancelWaitPeriod = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperCancelWaitPeriod();
@@ -1230,133 +1043,132 @@
TransactionReaper._theReaper._zombieMax = 1;
}
- _reaperThread = new ReaperThread(TransactionReaper._theReaper);
- // _reaperThread.setPriority(Thread.MIN_PRIORITY);
+ _reaperThread = new ReaperThread(TransactionReaper._theReaper);
+ // _reaperThread.setPriority(Thread.MIN_PRIORITY);
- _reaperThread.setDaemon(true);
+ _reaperThread.setDaemon(true);
- _reaperWorkerThread = new ReaperWorkerThread(TransactionReaper._theReaper);
- _reaperWorkerThread.setDaemon(true);
+ _reaperWorkerThread = new ReaperWorkerThread(TransactionReaper._theReaper);
+ _reaperWorkerThread.setDaemon(true);
- _reaperThread.start();
+ _reaperThread.start();
- _reaperWorkerThread.start();
- }
+ _reaperWorkerThread.start();
+ }
- return TransactionReaper._theReaper;
- }
+ return TransactionReaper._theReaper;
+ }
- public static TransactionReaper create()
- {
- return create(TransactionReaper.defaultCheckPeriod);
- }
+ public static TransactionReaper create()
+ {
+ return create(TransactionReaper.defaultCheckPeriod);
+ }
- public static TransactionReaper transactionReaper()
- {
- return transactionReaper(false);
- }
+ public static TransactionReaper transactionReaper()
+ {
+ return transactionReaper(false);
+ }
- /*
- * If parameter is true then do a create.
- */
+ /*
+ * If parameter is true then do a create.
+ */
+ public static synchronized TransactionReaper transactionReaper(boolean createReaper)
+ {
+ if (createReaper)
+ return create();
+ else
+ return _theReaper;
+ }
- public static synchronized TransactionReaper transactionReaper(
- boolean createReaper)
- {
- if (createReaper)
- return create();
- else
- return _theReaper;
- }
+ /**
+ * Terminate the transaction reaper. This is a synchronous operation
+ * and will only return once the reaper has been shutdown cleanly.
+ * <p/>
+ * Note, this method assumes that the transaction system has been
+ * shutdown already so no new transactions can be created, or we
+ * could be here for a long time!
+ *
+ * @param waitForTransactions if <code>true</code> then the reaper will
+ * wait until all transactions have terminated (or been terminated by it).
+ * If <code>false</code> then the reaper will call setRollbackOnly on all
+ * the transactions.
+ */
- /**
- * Terminate the transaction reaper. This is a synchronous operation
- * and will only return once the reaper has been shutdown cleanly.
- *
- * Note, this method assumes that the transaction system has been
- * shutdown already so no new transactions can be created, or we
- * could be here for a long time!
- *
- * @param waitForTransactions if <code>true</code> then the reaper will
- * wait until all transactions have terminated (or been terminated by it).
- * If <code>false</code> then the reaper will call setRollbackOnly on all
- * the transactions.
- */
+ public static synchronized void terminate(boolean waitForTransactions)
+ {
+ if (_theReaper != null) {
+ _theReaper.shutdown(waitForTransactions);
+ _theReaper = null;
+ }
+ }
- public static synchronized void terminate (boolean waitForTransactions)
- {
- if (_theReaper != null)
- {
- _theReaper.shutdown(waitForTransactions);
- _theReaper = null;
- }
- }
-
- public static boolean isDynamic() {
+ public static boolean isDynamic()
+ {
return _dynamic;
}
- /*
- * Don't bother synchronizing as this is only an estimate anyway.
- */
+ public static synchronized long transactionLifetime()
+ {
+ return _lifetime.get();
+ }
- public static final synchronized long transactionLifetime()
- {
- return TransactionReaper._lifetime;
- }
+ public static final long defaultCheckPeriod = 120000; // in milliseconds
+ public static final long defaultCancelWaitPeriod = 500; // in milliseconds
+ public static final long defaultCancelFailWaitPeriod = 500; // in milliseconds
+ public static final int defaultZombieMax = 8;
- public static final long defaultCheckPeriod = 120000; // in milliseconds
- public static final long defaultCancelWaitPeriod = 500; // in milliseconds
- public static final long defaultCancelFailWaitPeriod = 500; // in milliseconds
- public static final int defaultZombieMax = 8;
+ static final void reset()
+ {
+ _theReaper = null;
+ }
- static final void reset()
- {
- _theReaper = null;
- }
+ private final SortedSet<ReaperElement> _transactions = new ConcurrentSkipListSet<ReaperElement>();
- private SortedSet _transactions = Collections.synchronizedSortedSet(new TreeSet()); // C of ReaperElement
- private Map _timeouts = Collections.synchronizedMap(new HashMap()); // key = Reapable, value = ReaperElement
+ // The keys are actually Reapable, as that's what insert takes. However, some functions use get(Object)
+ // and rely on clever hashcode/equals behaviour, especially for the JTS. Thus the generics key type is Object.
+ private final ConcurrentMap<Object, ReaperElement> _timeouts = new ConcurrentHashMap<Object, ReaperElement>();
- private final Map<Reapable, ReaperElement> _pendingInsertions = new ConcurrentHashMap<Reapable, ReaperElement>();
+ private final List<ReaperElement> _workQueue = new LinkedList<ReaperElement>();
- private List _workQueue = new LinkedList(); // C of ReaperElement
-
- private Vector<ReaperMonitor> _listeners = new Vector<ReaperMonitor>();
+ private final Vector<ReaperMonitor> _listeners = new Vector<ReaperMonitor>(); // TODO sync properly
- private long _checkPeriod = 0;
+ private long _checkPeriod = 0;
- /**
- * number of millisecs delay afer a cancel() is scheduled
- * before the reaper tries to interrupt the worker thread
- * executing the cancel()
- */
- private long _cancelWaitPeriod = 0;
+ // Although it is atomic, writes (but not reads) need to by synchronized(this) i.e. on the TransactionReaper instance
+ // in order to ensure proper timing with respect to wait/notify and wakeups on the _transactions queue.
+ private final AtomicLong nextDynamicCheckTime = new AtomicLong(Long.MAX_VALUE);
- /**
- * number of millisecs delay afer a worker thread is
- * interrupted before the reaper writes the it off as a zombie
- * and starts a new thread
- */
- private long _cancelFailWaitPeriod = 0;
+ /**
+ * number of millisecs delay afer a cancel() is scheduled
+ * before the reaper tries to interrupt the worker thread
+ * executing the cancel()
+ */
+ private long _cancelWaitPeriod = 0;
- /**
- * threshold for count of non-exited zombies at which system
- * starts logging error messages
- */
- private int _zombieMax = 0;
+ /**
+ * number of millisecs delay afer a worker thread is
+ * interrupted before the reaper writes the it off as a zombie
+ * and starts a new thread
+ */
+ private long _cancelFailWaitPeriod = 0;
- private static TransactionReaper _theReaper = null;
+ /**
+ * threshold for count of non-exited zombies at which system
+ * starts logging error messages
+ */
+ private int _zombieMax = 0;
- private static ReaperThread _reaperThread = null;
+ private static TransactionReaper _theReaper = null;
- private static ReaperWorkerThread _reaperWorkerThread = null;
+ private static ReaperThread _reaperThread = null;
- private static boolean _dynamic = true;
+ private static ReaperWorkerThread _reaperWorkerThread = null;
- private static long _lifetime = 0;
+ private static boolean _dynamic = true;
- private static int _zombieCount = 0;
+ private static AtomicLong _lifetime = new AtomicLong(0);
+ private static int _zombieCount = 0;
+
private boolean _inShutdown = false;
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java 2009-10-14 14:15:11 UTC (rev 29612)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java 2009-10-14 14:33:57 UTC (rev 29613)
@@ -37,7 +37,7 @@
import com.arjuna.ats.arjuna.coordinator.Reapable;
import com.arjuna.ats.arjuna.logging.FacilityCode;
-public class ReaperElement implements Comparable
+public class ReaperElement implements Comparable<ReaperElement>
{
/*
@@ -83,13 +83,11 @@
* This is required so that the set maintained by the TransactionReaper
* is in timeout order for efficient processing.
*
- * @param o the Object to compare
+ * @param other the ReaperElement to compare
* @return 0 if equal, 1 if this is greater, -1 if this is smaller
*/
- public int compareTo(Object o)
+ public int compareTo(ReaperElement other)
{
- ReaperElement other = (ReaperElement)o;
-
if(this == other) {
return 0;
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java 2009-10-14 14:15:11 UTC (rev 29612)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java 2009-10-14 14:33:57 UTC (rev 29613)
@@ -67,27 +67,35 @@
assertEquals(sortedSet.last(), reaperElement);
// test insertion of timeout=0 is a nullop
- assertTrue(reaper.insert(reapable, 0));
+ reaper.insert(reapable, 0);
assertEquals(0, reaper.numberOfTransactions());
assertEquals(0, reaper.numberOfTimeouts());
- assertFalse(reaper.remove(reapable));
+ reaper.remove(reapable);
// test that duplicate insertion fails
- assertTrue(reaper.insert(reapable, 10));
- assertFalse(reaper.insert(reapable, 10));
+ reaper.insert(reapable, 10);
assertEquals(1, reaper.numberOfTransactions());
assertEquals(1, reaper.numberOfTimeouts());
- assertTrue(reaper.remove(reapable));
+ try {
+ reaper.insert(reapable, 10);
+ fail("duplicate insert failed to blow up");
+ } catch(Exception e) {
+ }
+ reaper.remove(reapable);
assertEquals(0, reaper.numberOfTransactions());
assertEquals(0, reaper.numberOfTimeouts());
// test that timeout change fails
- assertTrue(reaper.insert(reapable, 10));
- assertFalse(reaper.insert(reapable, 20));
+ reaper.insert(reapable, 10);
+ try {
+ reaper.insert(reapable, 20);
+ fail("timeout change insert failed to blow up");
+ } catch(Exception e) {
+ }
assertEquals(1, reaper.numberOfTransactions());
assertEquals(1, reaper.numberOfTimeouts());
assertEquals(10, reaper.getTimeout(reapable));
- assertTrue(reaper.remove(reapable));
+ reaper.remove(reapable);
assertEquals(0, reaper.numberOfTransactions());
assertEquals(0, reaper.numberOfTimeouts());
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java 2009-10-14 14:15:11 UTC (rev 29612)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java 2009-10-14 14:33:57 UTC (rev 29613)
@@ -34,7 +34,7 @@
* that time out and, optionally, get wedged either when a cancel is
* tried and/or when an interrupt is delivered
*
- * @author Andrwe Dinn (adinn at redhat.com), 2007-07-09
+ * @author Andrew Dinn (adinn at redhat.com), 2007-07-09
*/
public class ReaperTestCase2 extends ReaperTestCaseControl
@@ -103,9 +103,9 @@
// insert two reapables so they timeout at 1 second intervals then stall the first one and
// check progress of cancellations and rollbacks for both
- assertTrue(reaper.insert(reapable0, 1));
+ reaper.insert(reapable0, 1);
- assertTrue(reaper.insert(reapable1, 1));
+ reaper.insert(reapable1, 1);
//assertTrue(reaper.insert(reapable2, 1));
@@ -224,9 +224,9 @@
// insert reapables so they timeout at 1 second intervals then
// check progress of cancellations and rollbacks
- assertTrue(reaper.insert(reapable2, 1));
+ reaper.insert(reapable2, 1);
- assertTrue(reaper.insert(reapable3, 1));
+ reaper.insert(reapable3, 1);
// make sure they were all registered
// the transactions queue should be
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java 2009-10-14 14:15:11 UTC (rev 29612)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java 2009-10-14 14:33:57 UTC (rev 29613)
@@ -90,13 +90,13 @@
// insert reapables so they timeout at 1 second intervals then
// check progress of cancellations and rollbacks
- assertTrue(reaper.insert(reapable0, 1));
+ reaper.insert(reapable0, 1);
- assertTrue(reaper.insert(reapable1, 2));
+ reaper.insert(reapable1, 2);
- assertTrue(reaper.insert(reapable2, 3));
+ reaper.insert(reapable2, 3);
- assertTrue(reaper.insert(reapable3, 4));
+ reaper.insert(reapable3, 4);
// latch the reaper before it checks the queue
@@ -198,13 +198,13 @@
// enableRendezvous(uid0, true);
- assertTrue(reaper.insert(reapable0, 1));
+ reaper.insert(reapable0, 1);
- assertTrue(reaper.insert(reapable1, 2));
+ reaper.insert(reapable1, 2);
- assertTrue(reaper.insert(reapable2, 3));
+ reaper.insert(reapable2, 3);
- assertTrue(reaper.insert(reapable3, 4));
+ reaper.insert(reapable3, 4);
// latch the reaper before it checks the queue
More information about the jboss-svn-commits
mailing list