[jboss-svn-commits] JBL Code SVN: r13301 - in labs/jbosstm/trunk/ArjunaCore/arjuna: classes/com/arjuna/ats/arjuna/common and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Jul 10 11:29:45 EDT 2007
Author: adinn
Date: 2007-07-10 11:29:45 -0400 (Tue, 10 Jul 2007)
New Revision: 13301
Added:
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperWorkerThread.java
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
Modified:
labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/common/Environment.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperThread.java
labs/jbosstm/trunk/ArjunaCore/arjuna/etc/default-arjuna-properties.xml
Log:
Fix for JBTM-203
The ReaperThread thread now employs a ReaperWorkerThread to execute
cancel() calls so that a badly behaving TX cannot wedge the reaper. If
the ReaperWorkerThread gets wedged the ReaperThread interrupts it and,
in the absence of any response, marks it as a zombie (i.e. if it ever
unwedges it *has* to exit) and creates a new ReaperWorkerThread.
Failure of a cancel causes the TX to be marked as ROLLBACK_ONLY and an
error to be logged. Failure to mark as ROLLBACK_ONLY causes an error
to be logged. If the (non-exited) zombie count exceeds a threshold an
error is logged.
Defaults for the delays between cancel/interrupt and interrupt/mark as
zombie and for the zombie count threshold can be set via properties.
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml 2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml 2007-07-10 15:29:45 UTC (rev 13301)
@@ -449,6 +449,10 @@
todir="${com.hp.mwlabs.ts.arjuna.reports.dest}">
<fileset dir="${com.hp.mwlabs.ts.arjuna.tests.src}" includes="**/ReaperTestCase.java"/>
</batchtest>
+ <batchtest haltonerror="yes" haltonfailure="yes" fork="yes"
+ todir="${com.hp.mwlabs.ts.arjuna.reports.dest}">
+ <fileset dir="${com.hp.mwlabs.ts.arjuna.tests.src}" includes="**/ReaperTestCase2.java"/>
+ </batchtest>
</junit>
</target>
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/common/Environment.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/common/Environment.java 2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/common/Environment.java 2007-07-10 15:29:45 UTC (rev 13301)
@@ -74,6 +74,9 @@
* <li> RECOVERY_BACKOFF_PERIOD = com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod
* <li> TX_REAPER_MODE = com.arjuna.ats.arjuna.coordinator.txReaperMode
* <li> TX_REAPER_TIMEOUT = com.arjuna.ats.arjuna.coordinator.txReaperTimeout
+ * <li> TX_REAPER_CANCEL_WAIT_PERIOD = com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod
+ * <li> TX_REAPER_CANCEL_FAIL_WAIT_PERIOD = com.arjuna.ats.arjuna.coordinator.txReapercancelFailWaitPeriod
+ * <li> TX_REAPER_ZOMBIE_MAX = com.arjuna.ats.arjuna.coordinator.txReaperZombieMax
* <li> OBJECTSTORE_SHARE = com.arjuna.ats.arjuna.objectstore.share
* <li> OBJECTSTORE_HIERARCHY_RETRY = com.arjuna.ats.arjuna.objectstore.hierarchyRetry
* <li> OBJECTSTORE_HIERARCHY_TIMEOUT = com.arjuna.ats.arjuna.objectstore.hierarchyTimeout
@@ -128,6 +131,9 @@
public static final String RECOVERY_BACKOFF_PERIOD = "com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod" ;
public static final String TX_REAPER_MODE = "com.arjuna.ats.arjuna.coordinator.txReaperMode";
public static final String TX_REAPER_TIMEOUT = "com.arjuna.ats.arjuna.coordinator.txReaperTimeout";
+ public static final String TX_REAPER_CANCEL_WAIT_PERIOD = "com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod";
+ public static final String TX_REAPER_CANCEL_FAIL_WAIT_PERIOD = "com.arjuna.ats.arjuna.coordinator.txReaperCancelFailWaitPeriod";
+ public static final String TX_REAPER_ZOMBIE_MAX = "com.arjuna.ats.arjuna.coordinator.txReaperZombieMax";
public static final String OBJECTSTORE_SHARE = "com.arjuna.ats.arjuna.objectstore.share";
public static final String OBJECTSTORE_HIERARCHY_RETRY = "com.arjuna.ats.arjuna.objectstore.hierarchyRetry";
public static final String OBJECTSTORE_HIERARCHY_TIMEOUT = "com.arjuna.ats.arjuna.objectstore.hierarchyTimeout";
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java 2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java 2007-07-10 15:29:45 UTC (rev 13301)
@@ -1,6 +1,6 @@
/*
* JBoss, Home of Professional Open Source
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
@@ -15,7 +15,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*
- * (C) 2005-2006,
+ * (C) 2005-2007,
* @author JBoss Inc.
*/
/*
@@ -64,16 +64,46 @@
* TransactionReaper::check - comparing {0}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_3
* [com.arjuna.ats.arjuna.coordinator.TransactionReaper_3] -
- * TransactionReaper::check - rollback for {0}
+ * TransactionReaper::getTimeout for {0} returning {1}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_4
* [com.arjuna.ats.arjuna.coordinator.TransactionReaper_4] -
- * TransactionReaper failed to force rollback on {0}
+ * TransactionReaper::check interrupting cancel in progress for {0}
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_5
* [com.arjuna.ats.arjuna.coordinator.TransactionReaper_5] -
- * TransactionReaper failed to force rollback_only on {0}
+ * TransactionReaper::check worker zombie count {0] exceeds specified limit
* @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_6
* [com.arjuna.ats.arjuna.coordinator.TransactionReaper_6] -
- * TransactionReaper::getTimeout for {0} returning {1}
+ * TransactionReaper::check worker {0} not responding to interrupt when cancelling TX {1} -- worker marked as zombie and TX scheduled for mark-as-rollback
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_7
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_7] -
+ * TransactionReaper::doCancellations worker {0} successfuly cancelled TX {1}
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_8
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_8] -
+ * TransactionReaper::doCancellations worker {0} failed to cancel TX {1} -- rescheduling for mark-as-rollback
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_9
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_9] -
+ * TransactionReaper::doCancellations worker {0} exception during cancel of TX {1} -- rescheduling for mark-as-rollback
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_10
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_10] -
+ * TransactionReaper::check successfuly marked TX {0} as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_11
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_11] -
+ * TransactionReaper::check failed to mark TX {0} as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_12
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_12] -
+ * TransactionReaper::check exception while marking TX {0} as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_13
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_13] -
+ * TransactionReaper::doCancellations worker {0} missed interrupt when cancelling TX {1} -- exiting as zombie (zombie count decremented to {2})
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_14
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_14] -
+ * TransactionReaper::doCancellations worker {0} successfuly marked TX {1} as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_15
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_15] -
+ * TransactionReaper::doCancellations worker {0} failed to mark TX {1} as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_16
+ * [com.arjuna.ats.arjuna.coordinator.TransactionReaper_16] -
+ * TransactionReaper::doCancellations worker {0} exception while marking TX {1} as rollback only
*/
public class TransactionReaper
@@ -133,163 +163,632 @@
return Long.MAX_VALUE; // list is empty, so we can sleep until something is inserted.
}
}
- return _checkPeriod;
+ else
+ {
+ // if we have a cancel in progress which needs
+ // checking up on then we have to wake up in time
+ // for it whether we are using a static or
+ // dynamic model
+
+ try
+ {
+ final ReaperElement head = (ReaperElement) _transactions.first(); //_list.peak();
+ if (head._status != ReaperElement.RUN) {
+ long waitTime = head._absoluteTimeout - System.currentTimeMillis();
+ if (waitTime < _checkPeriod)
+ {
+ return head._absoluteTimeout - System.currentTimeMillis();
+ }
+ }
+ }
+ catch (final NoSuchElementException nsee) {}
+
+ return _checkPeriod;
+ }
}
- /*
- * Should be no need to protect with a mutex since only one thread is ever
- * doing the work.
- */
-
/**
- * Only check for one at a time to prevent starvation.
+ * process all entries in the timeout queue which have
+ * expired. entries for newly expired transactions are passed
+ * to a worker thread for cancellation and requeued for
+ * subsequent progress checks. the worker is given a kick if
+ * such checks find it is wedged.
*
* Timeout is given in milliseconds.
*/
public final boolean check()
{
+ if (tsLogger.arjLogger.debugAllowed())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "TransactionReaper::check ()");
+ }
+
+ do
+ {
+ final ReaperElement e ;
+ try
+ {
+ e = (ReaperElement)_transactions.first();
+ }
+ catch (final NoSuchElementException nsee)
+ {
+ return true ;
+ }
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_2",
+ new Object[]
+ { Long.toString(e._absoluteTimeout) });
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : checking " + e._control.get_uid());
+
+ final long now = System.currentTimeMillis();
+ if (now < e._absoluteTimeout)
+ {
+ // go back to sleep
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : ms wait " + (e._absoluteTimeout - now));
+ break;
+ }
+
if (tsLogger.arjLogger.debugAllowed())
{
- tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "TransactionReaper::check ()");
+ tsLogger.arjLogger
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "Reaper timeout for TX " + e._control.get_uid() + " state " + e.statusName());
}
- do {
- final ReaperElement e ;
- try
- {
- e = (ReaperElement)_transactions.first();
- }
- catch (final NoSuchElementException nsee)
- {
- return true ;
- }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : timeout " + e._control.get_uid());
- if (tsLogger.arjLoggerI18N.debugAllowed())
+ // if we have to synchronize on multiple objects we always
+ // do so in a fixed order ReaperElement before Reaper and
+ // ReaperElement before Reaper._cancelQueue in order to
+ // ensure we don't deadlock. We never sychronize on the
+ // reaper and the cancel queue at the same time.
+
+ synchronized(e) {
+ switch (e._status)
+ {
+ case ReaperElement.RUN:
+ {
+ // this tx has just timed out. remove it from the
+ // TX list, update the timeout to take account of
+ // cancellation period and reinsert as a cancelled
+ // TX. this ensures we process it again if it does
+ // not get cancelled in time
+
+ e._status = ReaperElement.SCHEDULE_CANCEL;
+
+ synchronized (this)
{
- tsLogger.arjLoggerI18N
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION,
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_2",
- new Object[]
- { Long.toString(e._absoluteTimeout) });
+ _transactions.remove(e);
+
+ e._absoluteTimeout =
+ (System.currentTimeMillis() + _cancelWaitPeriod);
+ _transactions.add(e);
}
- final long now = System.currentTimeMillis();
- if (now >= e._absoluteTimeout)
+ if (tsLogger.arjLogger.debugAllowed())
{
- if (e._control.running())
+ tsLogger.arjLogger
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "Reaper scheduling TX for cancellation " + e._control.get_uid());
+ }
+
+ // insert into cancellation queue for a worker
+ // thread to process and then make sure a worker
+ // thread is awake
+
+ synchronized (_workQueue)
+ {
+ _workQueue.add(e);
+ _workQueue.notify();
+ }
+ }
+ break;
+ case ReaperElement.SCHEDULE_CANCEL:
+ {
+ // hmm, a worker is taking its time to
+ // start processing this scheduled entry.
+ // we may just be running slow ... but the
+ // worker may be wedged under a cancel for
+ // some other TX. add an extra delay to
+ // give the worker more time to complete
+ // its current task and progress this
+ // entry to the CANCEL state. if the
+ // worker *is* wedged then this will
+ // ensure the wedged TX entry comes to the
+ // front of the queue.
+
+ synchronized (this)
+ {
+ _transactions.remove(e);
+
+ e._absoluteTimeout =
+ (System.currentTimeMillis() + _cancelWaitPeriod);
+
+ _transactions.add(e);
+ }
+
+ if (tsLogger.arjLogger.debugAllowed())
+ {
+ tsLogger.arjLogger
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "Reaper deferring interrupt for TX scheduled for cancel " + e._control.get_uid());
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : deferring interrupt for TX scheduled for cancel " + e._control.get_uid());
+ }
+ break;
+ case ReaperElement.CANCEL:
+ {
+ // ok, the worker must be wedged under a
+ // call to cancel() -- kick the thread and
+ // reschedule the element for a later
+ // check to ensure the thread responded to
+ // the kick
+
+ e._status = ReaperElement.CANCEL_INTERRUPTED;
+
+ e._worker.interrupt();
+
+ synchronized (this)
+ {
+ _transactions.remove(e);
+
+ e._absoluteTimeout =
+ (System.currentTimeMillis() + _cancelFailWaitPeriod);
+
+ _transactions.add(e);
+ }
+
+ // log that we interrupted cancel()
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_4",
+ new Object[]{e._control.get_uid()});
+ }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : interrupting cancel in progress for " + e._control.get_uid());
+ }
+ break;
+ case ReaperElement.CANCEL_INTERRUPTED:
+ {
+ // cancellation got truly wedged -- mark
+ // the element as a zombie so the worker
+ // exits when (if?) it wakes up and create
+ // a new worker thread to handle further
+ // cancellations. then mark the
+ // transaction as rollback only.
+
+ e._status = ReaperElement.ZOMBIE;
+
+ synchronized(this)
+ {
+ _zombieCount++;
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION, "Reaper " + Thread.currentThread() + " got a zombie " + e._worker + " (zombie count now " + _zombieCount + ") cancelling " + e._control.get_uid());
+ }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " got a zombie " + e._worker + " (zombie count now " + _zombieCount + ") cancealling " + e._control.get_uid());
+
+ if (_zombieCount == _zombieMax)
+ {
+ // log zombie overflow error call()
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
{
- /*
- * If this is a local transaction, then we can roll it back
- * completely. Otherwise, just mark it as rollback only.
- */
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.ERROR_MESSAGES,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_5",
+ new Object[]{new Integer(_zombieCount)});
+ }
- boolean problem = false;
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " too many zombies (" + _zombieCount + ")! " + e._control.get_uid());
+ }
+ }
- if (TxControl.enableStatistics)
- {
- TxStats.incrementTimeouts();
- }
+ _reaperWorkerThread = new ReaperWorkerThread(TransactionReaper._theReaper);
+ _reaperWorkerThread.setDaemon(true);
- try
- {
- if (e._control.cancel() == ActionStatus.ABORTED)
- {
- if (tsLogger.arjLoggerI18N.debugAllowed())
- {
- tsLogger.arjLoggerI18N
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION,
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_3",
- new Object[]
- { e._control.get_uid() });
- }
- }
- else
- problem = true;
- }
- catch (Exception ex2)
- {
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_4",
- new Object[]
- { e._control });
- }
+ _reaperWorkerThread.start();
- problem = true;
- }
+ // log a failed cancel()
- if (problem)
- {
- boolean error = false;
- boolean printDebug = tsLogger.arjLoggerI18N
- .isWarnEnabled();
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.ERROR_MESSAGES,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_6",
+ new Object[]{e._worker,
+ e._control.get_uid()});
+ }
- try
- {
- error = !e._control.preventCommit();
- }
- catch (Exception ex3)
- {
- error = true;
- }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : thread " + e._worker + " executing cancel did not respond to interrupt -- scheduling for rollback! " + e._control.get_uid());
+ // ok, since the worker was wedged we need to
+ // remove the entry from the timeouts and
+ // transactions lists then mark this tx as
+ // rollback only. we have to log a message
+ // whether we succeed, fail or get interrupted
- if (error || printDebug)
- {
- if (error)
- {
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N
- .warn(
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_5",
- new Object[]
- { e._control });
- }
- }
- else
- {
- if (tsLogger.arjLoggerI18N.debugAllowed())
- {
- tsLogger.arjLoggerI18N
- .debug(
- DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION,
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_3",
- new Object[]
- { e._control });
- }
- }
- }
- }
+ synchronized(this)
+ {
+ _timeouts.remove(e._control);
+ _transactions.remove(e);
+ }
+
+ try
+ {
+ if (e._control.preventCommit()) {
+
+ // log a successful preventCommit()
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_10",
+ new Object[]{e._control.get_uid()});
}
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " successfully marked TX " + e._control.get_uid() + " as rollback only");
+ }
+ else
+ {
+ // log a failed preventCommit()
- synchronized(this)
- {
- _timeouts.remove(e._control) ;
- _transactions.remove(e) ;
- }
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(DebugLevel.ERROR_MESSAGES,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_11",
+ new Object[]{e._control.get_uid()});
+ }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " failed to mark TX " + e._control.get_uid() + " as rollback only");
+ }
}
- else
+ catch(Exception e1)
{
- break;
+ // log an exception under preventCommit()
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled()) {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.ERROR_MESSAGES,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_12",
+ new Object[]{e._control.get_uid()});
+ }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " exception during mark-as-rollback of TX " + e._control.get_uid());
}
- } while(true) ;
+ }
+ break;
+ case ReaperElement.FAIL:
+ case ReaperElement.COMPLETE:
+ {
+ // ok, the worker should remove the tx
+ // from the transactions queue very soon
+ // but we need to progress to the next
+ // entry so we will steal in and do it
+ // first
- return true;
+ synchronized(this)
+ {
+ _timeouts.remove(e._control);
+ _transactions.remove(e);
+ }
+ }
+ break;
+
+ }
+ }
+ } while(true) ;
+
+ return true;
}
+ public final void waitForCancellations()
+ {
+ synchronized(_workQueue)
+ {
+ try
+ {
+ while (_workQueue.isEmpty())
+ {
+ _workQueue.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+
+ public final void doCancellations()
+ {
+ for (;;)
+ {
+ ReaperElement e;
+
+ // see if we have any cancellations to process
+
+ synchronized(_workQueue)
+ {
+ try
+ {
+ e = (ReaperElement)_workQueue.remove(0);
+ }
+ catch (IndexOutOfBoundsException ioobe) {break;}
+ }
+
+
+ // ok, current status must be SCHEDULE_CANCEL.
+ // progress state to CANCEL and call cancel()
+
+
+ if (tsLogger.arjLogger.debugAllowed())
+ {
+ tsLogger.arjLogger
+ .debug(
+ DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION, "Reaper Worker " + Thread.currentThread() + " attempting to cancel " + e._control.get_uid());
+ }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " : attempting to cancel " + e._control.get_uid());
+
+ boolean cancelled = false;
+ boolean excepted = false;
+
+ synchronized(e)
+ {
+ e._worker = Thread.currentThread();
+ e._status = ReaperElement.CANCEL;
+ e.notify();
+ }
+
+ // we are now exposed to at most one interrupt from
+ // the reaper. test for running and try the cancel if
+ // required
+
+ try
+ {
+ if (e._control.running()) {
+
+ // try to cancel the transaction
+
+ if (e._control.cancel() == ActionStatus.ABORTED)
+ {
+ cancelled = true;
+ }
+ }
+ }
+ catch (Exception e1)
+ {
+ excepted = true;
+ }
+
+ // ok, close the interrupt window by resetting the
+ // state -- unless we have been told to go away by
+ // being set to ZOMBIE
+
+ synchronized (e)
+ {
+ if (e._status == ReaperElement.ZOMBIE)
+ {
+ // we need to decrement the zombie count and
+ // force an immediate thread exit. the reaper
+ // will have removed the entry from the
+ // transactions list and started another
+ // worker thread.
+
+ ReaperWorkerThread worker = (ReaperWorkerThread)Thread.currentThread();
+ worker.shutdown();
+
+ synchronized(this)
+ {
+ _zombieCount--;
+ }
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.ERROR_MESSAGES,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_13",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid(),
+ new Integer(_zombieCount)});
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker" + Thread.currentThread() + " failed to respond to interrupt trying to cancel or mark as rollback only TX " + e._control.get_uid() + " -- exiting as zombie (zombie count decremented to " + _zombieCount + ")");
+
+ // this gets us out of the for(;;) loop and
+ // the shutdown call above makes sure we exit
+ // after returning
+
+ break;
+ }
+ else if (cancelled &&
+ e._status == ReaperElement.CANCEL_INTERRUPTED)
+ {
+ // ok the call to cancel() returned true but
+ // we cannot trust it because the reaper sent
+ // the thread an interrupt
+
+ cancelled = false;
+ e._status = ReaperElement.FAIL;
+ e.notify();
+ }
+ else
+ {
+ e._status = (cancelled
+ ? ReaperElement.COMPLETE
+ : ReaperElement.FAIL);
+ e.notify();
+ }
+ }
+
+ // log a message notifying success, failure or
+ // exception during cancel(), remove the element from
+ // the transactions queue and mark TX as rollback only
+
+ if (cancelled)
+ {
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_7",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " successfully cancelled TX " + e._control.get_uid());
+
+ synchronized(this)
+ {
+ _timeouts.remove(e._control);
+ _transactions.remove(e);
+ }
+ }
+ else
+ {
+ if (excepted)
+ {
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_9",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " exception during cancel of TX " + e._control.get_uid() + " -- rescheduling for mark-as-rollback");
+ }
+ else
+ {
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_8",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " failed to cancel TX " + e._control.get_uid() + " rescheduling for mark-as-rollback");
+ }
+
+ synchronized(this)
+ {
+ _timeouts.remove(e._control);
+ _transactions.remove(e);
+ }
+
+ try
+ {
+ if (e._control.preventCommit()) {
+ // log a successful preventCommit()
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_14",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " successfully marked TX " + e._control.get_uid() + " as rollback only");
+ }
+ else
+ {
+ // log a failed preventCommit()
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.ERROR_MESSAGES,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_15",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " failed to mark TX " + e._control.get_uid() + " as rollback only");
+ }
+ }
+ catch(Exception e1)
+ {
+ // log an exception under preventCommit()
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled()) {
+ tsLogger.arjLoggerI18N
+ .debug(
+ DebugLevel.ERROR_MESSAGES,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_16",
+ new Object[]{Thread.currentThread(),
+ e._control.get_uid()});
+ }
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " exception during mark-as-rollback of TX " + e._control.get_uid());
+ }
+ }
+ }
+ }
+
/**
* @return the number of items in the reaper's list.
* @since JTS 2.2.
@@ -369,13 +868,39 @@
if (control == null)
return false;
+ ReaperElement key;
+
synchronized(this)
{
- ReaperElement key = (ReaperElement)_timeouts.remove(control);
+ key = (ReaperElement)_timeouts.remove(control);
if(key == null) {
return false;
}
- return _transactions.remove(key);
+ }
+
+ // if a cancellation is in progress then we have to
+ // see it through as we have to ensure that the worker
+ // thread does not get wedged. so we have to tell the
+ // control has gone away. in order to test the status
+ // we need to synchronize on the element before we
+ // synchronize on this so we can ensure that we don't
+ // deadlock ourselves.
+
+ synchronized(key)
+ {
+ if (key._status != ReaperElement.RUN)
+ {
+ // we are cancelling this TX anyway and need
+ // to track the progress of the cancellation
+ // using this entry so we cnanot remove it
+
+ return false;
+ }
+
+ synchronized(this)
+ {
+ return _transactions.remove(key);
+ }
}
}
@@ -416,7 +941,7 @@
DebugLevel.FUNCTIONS,
VisibilityLevel.VIS_PUBLIC,
FacilityCode.FAC_ATOMIC_ACTION,
- "com.arjuna.ats.arjuna.coordinator.TransactionReaper_6",
+ "com.arjuna.ats.arjuna.coordinator.TransactionReaper_3",
new Object[]
{ control, timeout });
@@ -476,12 +1001,98 @@
TransactionReaper._theReaper = new TransactionReaper(checkPeriod);
+ String cancelWait = arjPropertyManager.propertyManager
+ .getProperty(Environment.TX_REAPER_CANCEL_WAIT_PERIOD);
+ if (cancelWait != null)
+ {
+ try
+ {
+ TransactionReaper._theReaper._cancelWaitPeriod = Long.valueOf(cancelWait).longValue();
+ }
+ catch (NumberFormatException e)
+ {
+ TransactionReaper._theReaper._cancelWaitPeriod = defaultCancelWaitPeriod;
+ }
+
+ // must give TX at least 10 millisecs to
+ // respond to cancel
+
+ if (TransactionReaper._theReaper._cancelWaitPeriod < 10) {
+ TransactionReaper._theReaper._cancelWaitPeriod = 10;
+ }
+ }
+ else
+ {
+ TransactionReaper._theReaper._cancelWaitPeriod = defaultCancelWaitPeriod;
+ }
+
+ String cancelFailWait = arjPropertyManager.propertyManager
+ .getProperty(Environment.TX_REAPER_CANCEL_FAIL_WAIT_PERIOD);
+ if (cancelFailWait != null)
+ {
+ try
+ {
+ TransactionReaper._theReaper._cancelFailWaitPeriod = Long.valueOf(cancelFailWait).longValue();
+ }
+ catch (NumberFormatException e)
+ {
+ TransactionReaper._theReaper._cancelFailWaitPeriod = defaultCancelFailWaitPeriod;
+ }
+
+ // must give TX at least 10 millisecs to
+ // respond to cancel
+
+ if (TransactionReaper._theReaper._cancelFailWaitPeriod < 10) {
+ TransactionReaper._theReaper._cancelFailWaitPeriod = 10;
+ }
+ }
+ else
+ {
+ TransactionReaper._theReaper._cancelFailWaitPeriod = defaultCancelFailWaitPeriod;
+ }
+
+ String zombieMax = arjPropertyManager.propertyManager
+ .getProperty(Environment.TX_REAPER_ZOMBIE_MAX);
+ if (zombieMax != null)
+ {
+ try
+ {
+ TransactionReaper._theReaper._zombieMax = Integer.valueOf(zombieMax).intValue();
+ }
+ catch (NumberFormatException e)
+ {
+ TransactionReaper._theReaper._zombieMax = defaultZombieMax;
+ }
+ // we start bleating if the zombie count
+ // reaches zombieMax so it has to be at
+ // least 1
+
+ if (TransactionReaper._theReaper._zombieMax <= 0) {
+ TransactionReaper._theReaper._zombieMax = 1;
+ }
+ }
+ else
+ {
+ TransactionReaper._theReaper._zombieMax = defaultZombieMax;
+ }
+
+ // use defaults for now
+
+ TransactionReaper._theReaper._cancelWaitPeriod = defaultCancelWaitPeriod;
+ TransactionReaper._theReaper._cancelFailWaitPeriod = defaultCancelFailWaitPeriod;
+ TransactionReaper._theReaper._zombieMax = defaultZombieMax;
+
_reaperThread = new ReaperThread(TransactionReaper._theReaper);
// _reaperThread.setPriority(Thread.MIN_PRIORITY);
_reaperThread.setDaemon(true);
+ _reaperWorkerThread = new ReaperWorkerThread(TransactionReaper._theReaper);
+ _reaperWorkerThread.setDaemon(true);
+
_reaperThread.start();
+
+ _reaperWorkerThread.start();
}
return TransactionReaper._theReaper;
@@ -520,6 +1131,9 @@
}
public static final long defaultCheckPeriod = 120000; // in milliseconds
+ public static final long defaultCancelWaitPeriod = 500; // in milliseconds
+ public static final long defaultCancelFailWaitPeriod = 500; // in milliseconds
+ public static final int defaultZombieMax = 8;
static final void reset()
{
@@ -529,14 +1143,41 @@
private SortedSet _transactions = Collections.synchronizedSortedSet(new TreeSet()); // C of ReaperElement
private Map _timeouts = Collections.synchronizedMap(new HashMap()); // key = Reapable, value = ReaperElement
+ private List _workQueue = new LinkedList(); // C of ReaperElement
+
private long _checkPeriod = 0;
+ /**
+ * number of millisecs delay afer a cancel() is scheduled
+ * before the reaper tries to interrupt the worker thread
+ * executing the cancel()
+ */
+ private long _cancelWaitPeriod = 0;
+
+ /**
+ * number of millisecs delay afer a worker thread is
+ * interrupted before the reaper writes the it off as a zombie
+ * and starts a new thread
+ */
+ private long _cancelFailWaitPeriod = 0;
+
+ /**
+ * threshold for count of non-exited zombies at which system
+ * starts logging error messages
+ */
+ private int _zombieMax = 0;
+
private static TransactionReaper _theReaper = null;
private static ReaperThread _reaperThread = null;
+ private static ReaperWorkerThread _reaperWorkerThread = null;
+
private static boolean _dynamic = false;
private static long _lifetime = 0;
+ private static int _zombieCount = 0;
+
+ public final static boolean printTestOutput = false;
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java 2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java 2007-07-10 15:29:45 UTC (rev 13301)
@@ -1,6 +1,6 @@
/*
* JBoss, Home of Professional Open Source
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
@@ -15,7 +15,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*
- * (C) 2005-2006,
+ * (C) 2005-2007,
* @author JBoss Inc.
*/
/*
@@ -58,6 +58,8 @@
_control = control;
_timeout = timeout;
+ _status = RUN;
+ _worker = null;
/*
* Given a timeout period in seconds, calculate its absolute value from
@@ -109,4 +111,148 @@
public long _absoluteTimeout;
public int _timeout;
+
+ /*
+ * status field to track the progress of the reaper worker which is
+ * attempting to cancel the associated TX. this is necessary to ensure
+ * that the the reaper only interrupts the worker in the windows where
+ * it may be exposed to being wedged by client code and is sure to be
+ * able to catch and handle an interrupt without dying. all accesses
+ * to this field must be synchronized on the containing element.
+ *
+ * this field is always initialised to RUN, indicating that
+ * the element is associated with a running transaction. the
+ * following transitions can occur, performed either by the
+ * reaper (R) or the worker (W)
+ *
+ * RUN --> SCHEDULE_CANCEL (R)
+ *
+ * SCHEDULE_CANCEL --> CANCEL (W)
+ *
+ * CANCEL --> COMPLETE (W)
+ *
+ * CANCEL --> FAIL (W)
+ *
+ * CANCEL --> CANCEL_INTERRUPTED (R)
+ *
+ * CANCEL_INTERRUPTED --> COMPLETE (W)
+ *
+ * CANCEL_INTERRUPTED --> FAIL (W)
+ *
+ * CANCEL_INTERRUPTED --> ZOMBIE (R)
+ */
+
+ public int _status;
+
+ /*
+ * the reaper worker which is attempting to cancel the associated TX
+ */
+
+ public Thread _worker;
+
+ /*
+ * status values for progressing reaper element from default
+ * RUN status through stages of cancellation from
+ * SCHEDULE_CANCEL to ZOMBIE. the reaper thread can only
+ * interrupt a reaper worker thread if it is wedged while
+ * the element is in state CANCEL.
+ */
+
+ /*
+ * status of a reaper element for a TX which has not yet timed out
+ */
+
+ public static final int RUN = 0;
+
+ /*
+ * status of a reaper element which has been queued for
+ * cancellation by a reaper worker
+ */
+
+ public static final int SCHEDULE_CANCEL = 1;
+
+ /*
+ * status of a reaper element while the reaper worker is under
+ * a call to the TX cancel operation and, hence, potentially
+ * exposed to being wedged. the reaper may safely interrupt
+ * the worker when it is in this state
+ */
+
+ public static final int CANCEL = 2;
+
+
+ /*
+ * status of a reaper element if the reaper thread has
+ * interrupted the worker during the CANCEL state. if the
+ * reaper discovers the thread is still in this state
+ * following a suitable delay then the thread is seriously
+ * wedged (possibly on a non-interruptible i/o) and requires
+ * notice of termination (by resetting the state to ZOMBIE)
+ * and replacement by a new worker thread.
+ */
+
+ public static final int CANCEL_INTERRUPTED = 3;
+
+ /*
+ * status of a reaper element if the reaper worker has been
+ * unable to cancel the TX and has failed to mark it as
+ * rollback only. it is safe for the reaper to remove the
+ * element from the transactions list if it is in this state
+ * (modulo synchronization) although the worker should do so
+ * with minimal delay.
+ */
+
+ public static final int FAIL = 4;
+
+ /*
+ * status of a reaper element if the reaper worker has been
+ * able to cancel the tx or has marked it as rollback only. it
+ * is safe for the reaper to remove the element from the
+ * transactions list if it is in this state (modulo
+ * synchronization) although the worker should do so with
+ * minimal delay.
+ */
+
+ public static final int COMPLETE = 5;
+
+ /*
+ * status of a reaper element if the worker got so wedged it
+ * failed to respond to an interrupt either during
+ * cancellation or marking as rollback only. if the worker
+ * wakes up and finds the element in this state then it must
+ * exit. the reaper will have ensured that the failure to
+ * cancel and rollback the transaction has been logged and
+ * will have removed the element from the transactions list.
+ */
+
+ public static final int ZOMBIE = 6;
+
+ /*
+ * convenience method to provide printable string for current status
+ * for use in debugging/logging. should only be called while
+ * synchronized.
+ */
+
+ public final String statusName()
+ {
+ switch (_status)
+ {
+ case RUN:
+ return "RUN";
+ case SCHEDULE_CANCEL:
+ return "SCHEDULE_CANCEL";
+ case CANCEL:
+ return "CANCEL";
+ case CANCEL_INTERRUPTED:
+ return "CANCEL_INTERRUPTED";
+ case FAIL:
+ return "FAIL";
+ case COMPLETE:
+ return "COMPLETE";
+ case ZOMBIE:
+ return "ZOMBIE";
+ default:
+ return "UNKNOWN";
+ }
+ }
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperThread.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperThread.java 2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperThread.java 2007-07-10 15:29:45 UTC (rev 13301)
@@ -1,6 +1,6 @@
/*
* JBoss, Home of Professional Open Source
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
@@ -15,7 +15,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*
- * (C) 2005-2006,
+ * (C) 2005-2007,
* @author JBoss Inc.
*/
/*
@@ -70,6 +70,8 @@
FacilityCode.FAC_ATOMIC_ACTION, "ReaperThread.run ()");
}
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : started");
+
for (;;)
{
/*
@@ -80,27 +82,37 @@
synchronized(reaperObject)
{
- sleepPeriod = reaperObject.checkingPeriod();
+ sleepPeriod = reaperObject.checkingPeriod();
if (sleepPeriod > 0)
{
- try
- {
- if (tsLogger.arjLoggerI18N.isDebugEnabled())
- {
- tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION,
- "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_1",
- new Object[]{Thread.currentThread(),
- Long.toString(sleepPeriod)});
- }
-
- reaperObject.wait(sleepPeriod);
- }
- catch (InterruptedException e1) {}
+ try
+ {
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_1",
+ new Object[]{Thread.currentThread(),
+ Long.toString(sleepPeriod)});
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : sleeping " + Long.toString(sleepPeriod));
+
+ reaperObject.wait(sleepPeriod);
+ }
+ catch (InterruptedException e1) {}
}
}
+ if (tsLogger.arjLogger.debugAllowed())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION, "ReaperThread.run ()");
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : woke up");
+
if (_shutdown)
return;
Added: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperWorkerThread.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperWorkerThread.java (rev 0)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperWorkerThread.java 2007-07-10 15:29:45 UTC (rev 13301)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2006-2007,
+ * @author JBoss Inc.
+ */
+
+package com.arjuna.ats.internal.arjuna.coordinator;
+
+import com.arjuna.ats.arjuna.logging.tsLogger;
+import com.arjuna.ats.arjuna.logging.FacilityCode;
+
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+
+import com.arjuna.common.util.logging.*;
+
+/**
+ * Class to reap timed out transactions on behalf of the transaction reaper
+ * thread which is dispatched to terminate a series of transactions when their
+ * timeout elapses.
+ *
+ * @author Andrew Dinn (adinn at redhat.com) 2007-07-08
+ */
+
+public class ReaperWorkerThread extends Thread
+{
+
+ public ReaperWorkerThread (TransactionReaper arg)
+ {
+ _theReaper = arg;
+ _shutdown = false;
+ }
+
+ /**
+ * @message com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkThread_1 [com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkThread_1] - Thread {0} waiting for cancelled TXs
+ * @message com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkThread_2 [com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkThread_2] - Thread {0} performing cancellations
+ */
+
+public void run ()
+ {
+ if (tsLogger.arjLogger.debugAllowed())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION, "ReaperWorkerThread.run ()");
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Worker " + this + " : started");
+
+ for (;;)
+ {
+ // wait for the reaper thread to queue some TXs for
+ // this thread to cancel
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkerThread_1",
+ new Object[]{Thread.currentThread()});
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Worker " + this + " : waiting");
+
+ _theReaper.waitForCancellations();
+
+ // check for shutdown before we wait again
+
+ if (_shutdown)
+ return;
+
+ // get the reaper to cancel any TXs queued for cancellation.
+
+ if (tsLogger.arjLoggerI18N.isDebugEnabled())
+ {
+ tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_ATOMIC_ACTION,
+ "com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkerThread_2",
+ new Object[]{Thread.currentThread()});
+ }
+
+ if (TransactionReaper.printTestOutput) System.out.println("Worker " + this + " : doing cancellations");
+
+ _theReaper.doCancellations();
+
+ // check for shutdown before we wait again
+
+ if (_shutdown)
+ return;
+ }
+ }
+
+ public void shutdown ()
+ {
+ _shutdown = true;
+ }
+
+ private TransactionReaper _theReaper;
+ private boolean _shutdown;
+
+}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/etc/default-arjuna-properties.xml
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/etc/default-arjuna-properties.xml 2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/etc/default-arjuna-properties.xml 2007-07-10 15:29:45 UTC (rev 13301)
@@ -44,6 +44,24 @@
name="com.arjuna.ats.arjuna.coordinator.txReaperMode"
value="NORMAL"/>
<!--
+ Transaction Reaper Cancel Wait Period (default is 500 ms, min is 10 msecs).
+ -->
+ <property
+ name="com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod"
+ value="500"/>
+ <!--
+ Transaction Reaper Cancel Fail Wait Period (default is 500 ms, min is 10 msecs).
+ -->
+ <property
+ name="com.arjuna.ats.arjuna.coordinator.txReaperCancelFailWaitPeriod"
+ value="500"/>
+ <!--
+ Transaction Reaper Zombie Max (default is 8).
+ -->
+ <property
+ name="com.arjuna.ats.arjuna.coordinator.txReaperZombieMax"
+ value="8"/>
+ <!--
(default is NO)
-->
<property
Added: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java (rev 0)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java 2007-07-10 15:29:45 UTC (rev 13301)
@@ -0,0 +1,270 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2007,
+ * @author JBoss Inc.
+ */
+package com.hp.mwtests.ts.arjuna.reaper;
+
+import junit.framework.TestCase;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.Reapable;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.internal.arjuna.coordinator.ReaperElement;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Exercise cancellation behaviour of TransactionReaper with resources
+ * that time out and, optionally, get wedged either when a cancel is
+ * tried and/or when an interrupt is delivered
+ *
+ * @author Andrwe Dinn (adinn at redhat.com), 2007-07-09
+ */
+
+public class ReaperTestCase2 extends TestCase
+{
+ public static Test suite()
+ {
+ return new TestSuite(ReaperTestCase2.class);
+ }
+
+ public void testReaper() throws Exception
+ {
+ TransactionReaper.create(500);
+ TransactionReaper reaper = TransactionReaper.transactionReaper();
+
+ // give the reaper worker time to start too
+
+ Thread.sleep(1000);
+
+ // create slow reapables some of which will not respond immediately
+ // to cancel requests and ensure that they get cancelled
+ // and that the reaper does not get wedged
+
+ SlowReapable reapable1 = new SlowReapable(new Uid(), 2000, 0, true, true);
+ SlowReapable reapable2 = new SlowReapable(new Uid(), 0, 0, true, true);
+ SlowReapable reapable3 = new SlowReapable(new Uid(), 100, 2000, false, true);
+ SlowReapable reapable4 = new SlowReapable(new Uid(), 0, 0, false, false);
+
+ // insert reapables so they timeout at 1 second intervals then
+ // check progress of cancellations and rollbacks
+
+ assertTrue(reaper.insert(reapable1, 1));
+
+ assertTrue(reaper.insert(reapable2, 2));
+
+ assertTrue(reaper.insert(reapable3, 3));
+
+ assertTrue(reaper.insert(reapable4, 4));
+
+ // make sure they were all registered
+
+ assertEquals(4, reaper.numberOfTransactions());
+ assertEquals(4, reaper.numberOfTimeouts());
+
+ // n.b. the reaper will not operate in dynamic mode by default
+ // so we have to allow an extra checkPeriod millisecs for it
+ // to detect timeouts (it may go back to sleep a few
+ // milliseconds before a transaction times out). also by
+ // default the reaper waits 500 msecs for a cancel to take
+ // effect before interrupting and 500 msecs for an interrupt
+ // to take effect before making a wedged worker a zombie. so
+ // these need to be factored into this thread's delays when
+ // tetsing the state of the reapables.
+
+ // wait at most 2 seconds for the first reapable to be cancelled
+
+ int count = 0;
+
+ while (!reapable1.getCancelTried() && count < 20) {
+ count++;
+ Thread.sleep(100);
+ }
+
+ assertTrue(count < 20);
+
+ // ensure that the second one gets cancelled even if the
+ // first one is wedged
+
+ count = 0;
+
+ while (reapable2.getRunning() && count < 15) {
+ count++;
+ Thread.sleep(100);
+ }
+
+ assertTrue(count < 15);
+
+ // ensure also that the second one gave up at the cancel and
+ // not the mark for rollback
+
+ assertTrue(reapable2.getCancelTried());
+ assertTrue(!reapable2.getRollbackTried());
+
+ // ensure that the first one responded to the interrupt and
+ // marks itself for rollback only
+
+ count = 0;
+
+ while (reapable1.getRunning() && count < 10) {
+ count++;
+ Thread.sleep(100);
+ }
+
+ assertTrue(count < 10);
+ assertTrue(reapable1.getRollbackTried());
+
+ // check that the third one refuses the cancel and gets marked
+ // for rollback instead
+
+ count = 0;
+
+ while (!reapable3.getCancelTried() && count < 25) {
+ count++;
+ Thread.sleep(100);
+ }
+
+ assertTrue(count < 25);
+
+ // ensure that it gets marked for rollback
+
+ count = 0;
+
+ while (reapable3.getRunning() && count < 10) {
+ count++;
+ Thread.sleep(100);
+ }
+
+ assertTrue(count < 10);
+
+ assertTrue(reapable3.getRollbackTried());
+
+ // ensure the fourth one gets cancelled and marked for rolback
+ // even though it does not play ball
+
+ count = 0;
+
+ while (reapable4.getRunning() && count < 25) {
+ count++;
+ Thread.sleep(100);
+ }
+
+ assertTrue(count < 25);
+
+ assertTrue(reapable4.getCancelTried());
+ assertTrue(reapable4.getRollbackTried());
+ }
+
+ public class SlowReapable implements Reapable
+ {
+ public SlowReapable(Uid uid, int callDelay, int interruptDelay, boolean doCancel, boolean doRollback)
+ {
+ this.uid = uid;
+ this.callDelay = callDelay;
+ this.interruptDelay = interruptDelay;
+ this.doCancel = doCancel;
+ this.doRollback = doRollback;
+ cancelTried = false;
+ rollbackTried = false;
+ running = true;
+ }
+
+ public boolean running()
+ {
+ return getRunning();
+ }
+
+ public boolean preventCommit()
+ {
+ setRollbackTried();
+ clearRunning();
+ return doRollback;
+ }
+
+ public int cancel()
+ {
+ boolean interrupted = false;
+
+ setCancelTried();
+
+ if (callDelay > 0) {
+ try {
+ Thread.sleep(callDelay);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ if (interrupted && interruptDelay > 0) {
+ try {
+ Thread.sleep(interruptDelay);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ if (doCancel) {
+ clearRunning();
+ return ActionStatus.ABORTED;
+ } else {
+ return ActionStatus.RUNNING;
+ }
+ }
+
+ public Uid get_uid()
+ {
+ return uid;
+ }
+
+ private Uid uid;
+ private int callDelay; // in milliseconds
+ private int interruptDelay; // in milliseconds
+ private boolean doCancel;
+ private boolean doRollback;
+ private boolean cancelTried;
+ private boolean rollbackTried;
+ private boolean running;
+
+ public synchronized void setCancelTried()
+ {
+ cancelTried = true;
+ }
+ public synchronized boolean getCancelTried()
+ {
+ return cancelTried;
+ }
+ public synchronized void setRollbackTried()
+ {
+ rollbackTried = true;
+ }
+ public synchronized boolean getRollbackTried()
+ {
+ return rollbackTried;
+ }
+ public synchronized void clearRunning()
+ {
+ running = false;
+ }
+ public synchronized boolean getRunning()
+ {
+ return running;
+ }
+ }
+}
More information about the jboss-svn-commits
mailing list