[jboss-svn-commits] JBL Code SVN: r13301 - in labs/jbosstm/trunk/ArjunaCore/arjuna: classes/com/arjuna/ats/arjuna/common and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Jul 10 11:29:45 EDT 2007


Author: adinn
Date: 2007-07-10 11:29:45 -0400 (Tue, 10 Jul 2007)
New Revision: 13301

Added:
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperWorkerThread.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
Modified:
   labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/common/Environment.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperThread.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/etc/default-arjuna-properties.xml
Log:
Fix for JBTM-203

The ReaperThread thread now employs a ReaperWorkerThread to execute
cancel() calls so that a badly behaving TX cannot wedge the reaper. If
the ReaperWorkerThread gets wedged the ReaperThread interrupts it and,
in the absence of any response, marks it as a zombie (i.e. if it ever
unwedges it *has* to exit) and creates a new ReaperWorkerThread.

Failure of a cancel causes the TX to be marked as ROLLBACK_ONLY and an
error to be logged. Failure to mark as ROLLBACK_ONLY causes an error
to be logged. If the (non-exited) zombie count exceeds a threshold an
error is logged.

Defaults for the delays between cancel/interrupt and interrupt/mark as
zombie and for the zombie count threshold can be set via properties.


Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml	2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml	2007-07-10 15:29:45 UTC (rev 13301)
@@ -449,6 +449,10 @@
                 todir="${com.hp.mwlabs.ts.arjuna.reports.dest}">
                 <fileset dir="${com.hp.mwlabs.ts.arjuna.tests.src}" includes="**/ReaperTestCase.java"/>
             </batchtest>
+            <batchtest haltonerror="yes" haltonfailure="yes" fork="yes"
+                todir="${com.hp.mwlabs.ts.arjuna.reports.dest}">
+                <fileset dir="${com.hp.mwlabs.ts.arjuna.tests.src}" includes="**/ReaperTestCase2.java"/>
+            </batchtest>
         </junit>
     </target>
 

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/common/Environment.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/common/Environment.java	2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/common/Environment.java	2007-07-10 15:29:45 UTC (rev 13301)
@@ -74,6 +74,9 @@
  * <li> RECOVERY_BACKOFF_PERIOD = com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod
  * <li> TX_REAPER_MODE = com.arjuna.ats.arjuna.coordinator.txReaperMode
  * <li> TX_REAPER_TIMEOUT = com.arjuna.ats.arjuna.coordinator.txReaperTimeout
+ * <li> TX_REAPER_CANCEL_WAIT_PERIOD = com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod
+ * <li> TX_REAPER_CANCEL_FAIL_WAIT_PERIOD = com.arjuna.ats.arjuna.coordinator.txReapercancelFailWaitPeriod
+ * <li> TX_REAPER_ZOMBIE_MAX = com.arjuna.ats.arjuna.coordinator.txReaperZombieMax
  * <li> OBJECTSTORE_SHARE = com.arjuna.ats.arjuna.objectstore.share
  * <li> OBJECTSTORE_HIERARCHY_RETRY = com.arjuna.ats.arjuna.objectstore.hierarchyRetry
  * <li> OBJECTSTORE_HIERARCHY_TIMEOUT = com.arjuna.ats.arjuna.objectstore.hierarchyTimeout
@@ -128,6 +131,9 @@
     public static final String RECOVERY_BACKOFF_PERIOD = "com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod" ;
     public static final String TX_REAPER_MODE = "com.arjuna.ats.arjuna.coordinator.txReaperMode";
     public static final String TX_REAPER_TIMEOUT = "com.arjuna.ats.arjuna.coordinator.txReaperTimeout";
+    public static final String TX_REAPER_CANCEL_WAIT_PERIOD = "com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod";
+    public static final String TX_REAPER_CANCEL_FAIL_WAIT_PERIOD = "com.arjuna.ats.arjuna.coordinator.txReaperCancelFailWaitPeriod";
+    public static final String TX_REAPER_ZOMBIE_MAX = "com.arjuna.ats.arjuna.coordinator.txReaperZombieMax";
     public static final String OBJECTSTORE_SHARE = "com.arjuna.ats.arjuna.objectstore.share";
     public static final String OBJECTSTORE_HIERARCHY_RETRY = "com.arjuna.ats.arjuna.objectstore.hierarchyRetry";
     public static final String OBJECTSTORE_HIERARCHY_TIMEOUT = "com.arjuna.ats.arjuna.objectstore.hierarchyTimeout";

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java	2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java	2007-07-10 15:29:45 UTC (rev 13301)
@@ -1,6 +1,6 @@
 /*
  * JBoss, Home of Professional Open Source
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
  * as indicated by the @author tags.
  * See the copyright.txt in the distribution for a
  * full listing of individual contributors.
@@ -15,7 +15,7 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  * MA  02110-1301, USA.
  *
- * (C) 2005-2006,
+ * (C) 2005-2007,
  * @author JBoss Inc.
  */
 /*
@@ -64,16 +64,46 @@
  *          TransactionReaper::check - comparing {0}
  * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_3
  *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_3] -
- *          TransactionReaper::check - rollback for {0}
+ *          TransactionReaper::getTimeout for {0} returning {1}
  * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_4
  *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_4] -
- *          TransactionReaper failed to force rollback on {0}
+ *          TransactionReaper::check interrupting cancel in progress for {0}
  * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_5
  *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_5] -
- *          TransactionReaper failed to force rollback_only on {0}
+ *          TransactionReaper::check worker zombie count {0] exceeds specified limit 
  * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_6
  *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_6] -
- *          TransactionReaper::getTimeout for {0} returning {1}
+ *          TransactionReaper::check worker {0} not responding to interrupt when cancelling TX {1} -- worker marked as zombie and TX scheduled for mark-as-rollback
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_7
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_7] -
+ *          TransactionReaper::doCancellations worker {0} successfuly cancelled TX {1}
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_8
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_8] -
+ *          TransactionReaper::doCancellations worker {0} failed to cancel TX {1} -- rescheduling for mark-as-rollback
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_9
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_9] -
+ *          TransactionReaper::doCancellations worker {0} exception during cancel of TX {1} -- rescheduling for mark-as-rollback
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_10
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_10] -
+ *          TransactionReaper::check successfuly marked TX {0} as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_11
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_11] -
+ *          TransactionReaper::check failed to mark TX {0}  as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_12
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_12] -
+ *          TransactionReaper::check exception while marking TX {0} as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_13
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_13] -
+ *          TransactionReaper::doCancellations worker {0} missed interrupt when cancelling TX {1} -- exiting as zombie (zombie count decremented to {2})
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_14
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_14] -
+ *          TransactionReaper::doCancellations worker {0} successfuly marked TX {1} as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_15
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_15] -
+ *          TransactionReaper::doCancellations worker {0} failed to mark TX {1}  as rollback only
+ * @message com.arjuna.ats.arjuna.coordinator.TransactionReaper_16
+ *          [com.arjuna.ats.arjuna.coordinator.TransactionReaper_16] -
+ *          TransactionReaper::doCancellations worker {0} exception while marking TX {1} as rollback only
  */
 
 public class TransactionReaper
@@ -133,163 +163,632 @@
 				return Long.MAX_VALUE; // list is empty, so we can sleep until something is inserted.
 			}
 		}
-		return _checkPeriod;
+		else
+		{
+                     // if we have a cancel in progress which needs
+                     // checking up on then we have to wake up in time
+                     // for it whether we are using a static or
+                     // dynamic model
+
+                     try
+                     {
+                          final ReaperElement head = (ReaperElement) _transactions.first();  //_list.peak();
+                          if (head._status != ReaperElement.RUN) {
+                               long waitTime = head._absoluteTimeout - System.currentTimeMillis();
+                               if (waitTime < _checkPeriod)
+                               {
+                                    return head._absoluteTimeout - System.currentTimeMillis();
+                               }
+                          }
+                     }
+                    catch (final NoSuchElementException nsee) {}
+
+		    return _checkPeriod;
+		}
 	}
 
-	/*
-	 * Should be no need to protect with a mutex since only one thread is ever
-	 * doing the work.
-	 */
-
 	/**
-	 * Only check for one at a time to prevent starvation.
+         * process all entries in the timeout queue which have
+         * expired. entries for newly expired transactions are passed
+         * to a worker thread for cancellation and requeued for
+         * subsequent progress checks. the worker is given a kick if
+         * such checks find it is wedged.
 	 *
 	 * Timeout is given in milliseconds.
 	 */
 
 	public final boolean check()
 	{
+	    if (tsLogger.arjLogger.debugAllowed())
+	    {
+		tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+					 VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+					 "TransactionReaper::check ()");
+	    }
+
+	    do
+	    {
+		final ReaperElement e ;
+		try
+		{
+		    e = (ReaperElement)_transactions.first();
+		}
+		catch (final NoSuchElementException nsee)
+		{
+		    return true ;
+		}
+
+		if (tsLogger.arjLoggerI18N.isDebugEnabled())
+		{
+		    tsLogger.arjLoggerI18N
+			.debug(
+			    DebugLevel.FUNCTIONS,
+			    VisibilityLevel.VIS_PUBLIC,
+			    FacilityCode.FAC_ATOMIC_ACTION,
+			    "com.arjuna.ats.arjuna.coordinator.TransactionReaper_2",
+			    new Object[]
+			    { Long.toString(e._absoluteTimeout) });
+		}
+
+		if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : checking " + e._control.get_uid());
+
+		final long now = System.currentTimeMillis();
+		if (now < e._absoluteTimeout)
+		{
+		    // go back to sleep
+
+		    if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : ms wait " + (e._absoluteTimeout - now));
+		    break;
+		}
+
 		if (tsLogger.arjLogger.debugAllowed())
 		{
-			tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
-					VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
-					"TransactionReaper::check ()");
+		    tsLogger.arjLogger
+			.debug(
+			    DebugLevel.FUNCTIONS,
+			    VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+			    "Reaper timeout for TX " + e._control.get_uid() + " state " + e.statusName());
 		}
 
-		do {
-			final ReaperElement e ;
-                        try
-                        {
-                            e = (ReaperElement)_transactions.first();
-                        }
-                        catch (final NoSuchElementException nsee)
-                        {
-                            return true ;
-                        }
+		if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : timeout " + e._control.get_uid());
 
-			if (tsLogger.arjLoggerI18N.debugAllowed())
+		// if we have to synchronize on multiple objects we always
+		// do so in a fixed order ReaperElement before Reaper and
+		// ReaperElement before Reaper._cancelQueue in order to
+		// ensure we don't deadlock. We never sychronize on the
+		// reaper and the cancel queue at the same time.
+
+		synchronized(e) {
+		    switch (e._status)
+		    {
+		    case ReaperElement.RUN:
+		    {
+			// this tx has just timed out. remove it from the
+			// TX list, update the timeout to take account of
+			// cancellation period and reinsert as a cancelled
+			// TX. this ensures we process it again if it does
+			// not get cancelled in time
+
+			e._status = ReaperElement.SCHEDULE_CANCEL;
+
+			synchronized (this)
 			{
-				tsLogger.arjLoggerI18N
-						.debug(
-								DebugLevel.FUNCTIONS,
-								VisibilityLevel.VIS_PUBLIC,
-								FacilityCode.FAC_ATOMIC_ACTION,
-								"com.arjuna.ats.arjuna.coordinator.TransactionReaper_2",
-								new Object[]
-								{ Long.toString(e._absoluteTimeout) });
+			    _transactions.remove(e);
+
+			    e._absoluteTimeout =
+				(System.currentTimeMillis() + _cancelWaitPeriod);
+			    _transactions.add(e);
 			}
 
-			final long now = System.currentTimeMillis();
-			if (now >= e._absoluteTimeout)
+			if (tsLogger.arjLogger.debugAllowed())
 			{
-				if (e._control.running())
+			    tsLogger.arjLogger
+				.debug(
+				    DebugLevel.FUNCTIONS,
+				    VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+				    "Reaper scheduling TX for cancellation " + e._control.get_uid());
+			}
+
+			// insert into cancellation queue for a worker
+			// thread to process and then make sure a worker
+			// thread is awake
+
+			synchronized (_workQueue)
+			{
+			    _workQueue.add(e);
+			    _workQueue.notify();
+			}
+		    }
+		    break;
+		    case ReaperElement.SCHEDULE_CANCEL:
+		    {
+			// hmm, a worker is taking its time to
+			// start processing this scheduled entry.
+			// we may just be running slow ... but the
+			// worker may be wedged under a cancel for
+			// some other TX. add an extra delay to
+			// give the worker more time to complete
+			// its current task and progress this
+			// entry to the CANCEL state. if the
+			// worker *is* wedged then this will
+			// ensure the wedged TX entry comes to the
+			// front of the queue.
+
+			synchronized (this)
+			{
+			    _transactions.remove(e);
+
+			    e._absoluteTimeout =
+				(System.currentTimeMillis() + _cancelWaitPeriod);
+
+			    _transactions.add(e);
+			}
+
+			if (tsLogger.arjLogger.debugAllowed())
+			{
+			    tsLogger.arjLogger
+				.debug(
+				    DebugLevel.FUNCTIONS,
+				    VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+				    "Reaper deferring interrupt for TX scheduled for cancel " + e._control.get_uid());
+			}
+
+			if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : deferring interrupt for TX scheduled for cancel " + e._control.get_uid());
+		    }
+		    break;
+		    case ReaperElement.CANCEL:
+		    {
+			// ok, the worker must be wedged under a
+			// call to cancel() -- kick the thread and
+			// reschedule the element for a later
+			// check to ensure the thread responded to
+			// the kick
+
+			e._status = ReaperElement.CANCEL_INTERRUPTED;
+
+			e._worker.interrupt();
+
+			synchronized (this)
+			{
+			    _transactions.remove(e);
+
+			    e._absoluteTimeout =
+				(System.currentTimeMillis() + _cancelFailWaitPeriod);
+
+			    _transactions.add(e);
+			}
+
+			// log that we interrupted cancel()
+
+			if (tsLogger.arjLoggerI18N.isDebugEnabled())
+			{
+			    tsLogger.arjLoggerI18N
+				.debug(
+				    DebugLevel.FUNCTIONS,
+				    VisibilityLevel.VIS_PUBLIC,
+				    FacilityCode.FAC_ATOMIC_ACTION,
+				    "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_4", 
+				    new Object[]{e._control.get_uid()});
+			}
+			if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : interrupting cancel in progress for " + e._control.get_uid());
+		    }
+		    break;
+		    case ReaperElement.CANCEL_INTERRUPTED:
+		    {
+			// cancellation got truly wedged -- mark
+			// the element as a zombie so the worker
+			// exits when (if?) it wakes up and create
+			// a new worker thread to handle further
+			// cancellations. then mark the
+			// transaction as rollback only.
+
+			e._status = ReaperElement.ZOMBIE;
+
+			synchronized(this)
+			{
+			    _zombieCount++;
+
+			    if (tsLogger.arjLoggerI18N.isDebugEnabled())
+			    {
+				tsLogger.arjLoggerI18N
+				    .debug(
+					DebugLevel.FUNCTIONS,
+					VisibilityLevel.VIS_PUBLIC,
+					FacilityCode.FAC_ATOMIC_ACTION, "Reaper " + Thread.currentThread() + " got a zombie " + e._worker + " (zombie count now " + _zombieCount + ") cancelling "  + e._control.get_uid());
+			    }
+			    if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " got a zombie " + e._worker + " (zombie count now " + _zombieCount + ") cancealling "  + e._control.get_uid());
+
+			    if (_zombieCount == _zombieMax)
+			    {
+				// log zombie overflow error call()
+
+				if (tsLogger.arjLoggerI18N.isDebugEnabled())
 				{
-					/*
-					 * If this is a local transaction, then we can roll it back
-					 * completely. Otherwise, just mark it as rollback only.
-					 */
+				    tsLogger.arjLoggerI18N
+					.debug(
+					    DebugLevel.ERROR_MESSAGES,
+					    VisibilityLevel.VIS_PUBLIC,
+					    FacilityCode.FAC_ATOMIC_ACTION,
+					    "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_5", 
+					    new Object[]{new Integer(_zombieCount)});
+				}
 
-					boolean problem = false;
+				if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " too many zombies (" + _zombieCount + ")! " + e._control.get_uid());
+			    }
+			}
 
-					if (TxControl.enableStatistics)
-					{
-						TxStats.incrementTimeouts();
-					}
+			_reaperWorkerThread = new ReaperWorkerThread(TransactionReaper._theReaper);
+			_reaperWorkerThread.setDaemon(true);
 
-					try
-					{
-						if (e._control.cancel() == ActionStatus.ABORTED)
-						{
-							if (tsLogger.arjLoggerI18N.debugAllowed())
-							{
-								tsLogger.arjLoggerI18N
-										.debug(
-												DebugLevel.FUNCTIONS,
-												VisibilityLevel.VIS_PUBLIC,
-												FacilityCode.FAC_ATOMIC_ACTION,
-												"com.arjuna.ats.arjuna.coordinator.TransactionReaper_3",
-												new Object[]
-												{ e._control.get_uid() });
-							}
-						}
-						else
-							problem = true;
-					}
-					catch (Exception ex2)
-					{
-						if (tsLogger.arjLoggerI18N.isWarnEnabled())
-						{
-							tsLogger.arjLoggerI18N
-									.warn(
-											"com.arjuna.ats.arjuna.coordinator.TransactionReaper_4",
-											new Object[]
-											{ e._control });
-						}
+			_reaperWorkerThread.start();
 
-						problem = true;
-					}
+			// log a failed cancel()
 
-					if (problem)
-					{
-						boolean error = false;
-						boolean printDebug = tsLogger.arjLoggerI18N
-								.isWarnEnabled();
+			if (tsLogger.arjLoggerI18N.isDebugEnabled())
+			{
+			    tsLogger.arjLoggerI18N
+				.debug(
+				    DebugLevel.ERROR_MESSAGES,
+				    VisibilityLevel.VIS_PUBLIC,
+				    FacilityCode.FAC_ATOMIC_ACTION,
+				    "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_6", 
+				    new Object[]{e._worker,
+						 e._control.get_uid()});
+			}
 
-						try
-						{
-							error = !e._control.preventCommit();
-						}
-						catch (Exception ex3)
-						{
-							error = true;
-						}
+			if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : thread " + e._worker + " executing cancel did not respond to interrupt -- scheduling for rollback! " + e._control.get_uid());
+			// ok, since the worker was wedged we need to
+			// remove the entry from the timeouts and
+			// transactions lists then mark this tx as
+			// rollback only. we have to log a message
+			// whether we succeed, fail or get interrupted
 
-						if (error || printDebug)
-						{
-							if (error)
-							{
-								if (tsLogger.arjLoggerI18N.isWarnEnabled())
-								{
-									tsLogger.arjLoggerI18N
-											.warn(
-													"com.arjuna.ats.arjuna.coordinator.TransactionReaper_5",
-													new Object[]
-													{ e._control });
-								}
-							}
-							else
-							{
-								if (tsLogger.arjLoggerI18N.debugAllowed())
-								{
-									tsLogger.arjLoggerI18N
-											.debug(
-													DebugLevel.FUNCTIONS,
-													VisibilityLevel.VIS_PUBLIC,
-													FacilityCode.FAC_ATOMIC_ACTION,
-													"com.arjuna.ats.arjuna.coordinator.TransactionReaper_3",
-													new Object[]
-													{ e._control });
-								}
-							}
-						}
-					}
+			synchronized(this)
+			{
+			    _timeouts.remove(e._control);
+			    _transactions.remove(e);
+			}
+
+			try
+			{
+			    if (e._control.preventCommit()) {
+
+				// log a successful preventCommit()
+
+				if (tsLogger.arjLoggerI18N.isDebugEnabled())
+				{
+				    tsLogger.arjLoggerI18N
+					.debug(
+					    DebugLevel.FUNCTIONS,
+					    VisibilityLevel.VIS_PUBLIC,
+					    FacilityCode.FAC_ATOMIC_ACTION,
+					    "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_10",
+					    new Object[]{e._control.get_uid()});
 				}
+				if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " successfully marked TX " + e._control.get_uid() + " as rollback only");
+			    }
+			    else
+			    {
+				// log a failed preventCommit()
 
-                                synchronized(this)
-                                {
-                                    _timeouts.remove(e._control) ;
-                                    _transactions.remove(e) ;
-                                }
+				if (tsLogger.arjLoggerI18N.isDebugEnabled())
+				{
+				    tsLogger.arjLoggerI18N
+					.debug(DebugLevel.ERROR_MESSAGES,
+					       VisibilityLevel.VIS_PUBLIC,
+					       FacilityCode.FAC_ATOMIC_ACTION,
+					       "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_11",
+					       new Object[]{e._control.get_uid()});
+				}
+				if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " failed to mark TX " + e._control.get_uid() + " as rollback only");
+			    }
 			}
-			else
+			catch(Exception e1)
 			{
-				break;
+			    // log an exception under preventCommit()
+
+			    if (tsLogger.arjLoggerI18N.isDebugEnabled()) {
+				tsLogger.arjLoggerI18N
+				    .debug(
+					DebugLevel.ERROR_MESSAGES,
+					VisibilityLevel.VIS_PUBLIC,
+					FacilityCode.FAC_ATOMIC_ACTION,
+					"com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_12",
+					new Object[]{e._control.get_uid()});
+			    }
+			    if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " exception during mark-as-rollback of TX " + e._control.get_uid());
 			}
-		} while(true) ;
+		    }
+		    break;
+		    case ReaperElement.FAIL:
+		    case ReaperElement.COMPLETE:
+		    {
+			// ok, the worker should remove the tx
+			// from the transactions queue very soon
+			// but we need to progress to the next
+			// entry so we will steal in and do it
+			// first
 
-		return true;
+			synchronized(this)
+			{
+			    _timeouts.remove(e._control);
+			    _transactions.remove(e);
+			}
+		    }
+		    break;
+
+		    }
+		}
+	    } while(true) ;
+     
+	    return true;
 	}
 
+        public final void waitForCancellations()
+        {
+             synchronized(_workQueue)
+             {
+                  try
+                  {
+                       while (_workQueue.isEmpty())
+                       {
+                            _workQueue.wait();
+                       }
+                  }
+                  catch (InterruptedException e)
+                  {
+                  }
+             }
+        }
+
+	public final void doCancellations()
+        {
+	    for (;;)
+	    {
+		ReaperElement e;
+
+		// see if we have any cancellations to process
+
+		synchronized(_workQueue)
+		{
+		    try
+		    {
+			e = (ReaperElement)_workQueue.remove(0);
+		    }
+		    catch (IndexOutOfBoundsException ioobe) {break;}
+		}
+
+
+		// ok, current status must be SCHEDULE_CANCEL.
+		// progress state to CANCEL and call cancel()
+
+
+		if (tsLogger.arjLogger.debugAllowed())
+		{
+		    tsLogger.arjLogger
+			.debug(
+			    DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+			    FacilityCode.FAC_ATOMIC_ACTION, "Reaper Worker " + Thread.currentThread() + " attempting to cancel "  + e._control.get_uid());
+		}
+		if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " : attempting to cancel " + e._control.get_uid());
+
+		boolean cancelled = false;
+		boolean excepted = false;
+
+		synchronized(e)
+		{
+		    e._worker = Thread.currentThread();
+		    e._status = ReaperElement.CANCEL;
+		    e.notify();
+		}
+
+		// we are now exposed to at most one interrupt from
+		// the reaper. test for running and try the cancel if
+		// required
+
+		try
+		{
+		    if (e._control.running()) {
+
+			// try to cancel the transaction
+
+			if (e._control.cancel() == ActionStatus.ABORTED)
+			{
+			    cancelled = true;
+			}
+		    }
+		}
+		catch (Exception e1)
+		{
+		    excepted = true;
+		}
+
+		// ok, close the interrupt window by resetting the
+		// state -- unless we have been told to go away by
+		// being set to ZOMBIE
+
+		synchronized (e)
+		{
+		    if (e._status == ReaperElement.ZOMBIE)
+		    {
+			// we need to decrement the zombie count and
+			// force an immediate thread exit. the reaper
+			// will have removed the entry from the
+			// transactions list and started another
+			// worker thread.
+
+			ReaperWorkerThread worker = (ReaperWorkerThread)Thread.currentThread();
+			worker.shutdown();
+
+			synchronized(this)
+			{
+			    _zombieCount--;
+			}
+
+			if (tsLogger.arjLoggerI18N.isDebugEnabled())
+			{
+			    tsLogger.arjLoggerI18N
+				.debug(
+				    DebugLevel.ERROR_MESSAGES,
+				    VisibilityLevel.VIS_PUBLIC,
+				    FacilityCode.FAC_ATOMIC_ACTION,
+				    "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_13",
+				    new Object[]{Thread.currentThread(),
+						 e._control.get_uid(),
+						 new Integer(_zombieCount)});
+			}
+
+			if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker" + Thread.currentThread() + " failed to respond to interrupt trying to cancel or mark as rollback only TX " + e._control.get_uid() + " -- exiting as zombie (zombie count decremented to " + _zombieCount + ")");
+
+			// this gets us out of the for(;;) loop and
+			// the shutdown call above makes sure we exit
+			// after returning
+
+			break;
+		    }
+		    else if (cancelled &&
+			     e._status == ReaperElement.CANCEL_INTERRUPTED)
+		    {
+			// ok the call to cancel() returned true but
+			// we cannot trust it because the reaper sent
+			// the thread an interrupt
+
+			cancelled = false;
+			e._status = ReaperElement.FAIL;
+			e.notify();
+		    }
+		    else
+		    {
+			e._status = (cancelled
+				     ? ReaperElement.COMPLETE
+				     : ReaperElement.FAIL);
+			e.notify();
+		    }
+		}
+
+		// log a message notifying success, failure or
+		// exception during cancel(), remove the element from
+		// the transactions queue and mark TX as rollback only
+
+		if (cancelled)
+		{
+		    if (tsLogger.arjLoggerI18N.isDebugEnabled())
+		    {
+			tsLogger.arjLoggerI18N
+			    .debug(
+				DebugLevel.FUNCTIONS,
+				VisibilityLevel.VIS_PUBLIC,
+				FacilityCode.FAC_ATOMIC_ACTION,
+				"com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_7",
+				new Object[]{Thread.currentThread(),
+					     e._control.get_uid()});
+		    }
+
+		    if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " successfully cancelled TX " + e._control.get_uid());
+
+		    synchronized(this)
+		    {
+			_timeouts.remove(e._control);
+			_transactions.remove(e);
+		    }
+		}
+		else
+		{
+		    if (excepted)
+		    {
+			if (tsLogger.arjLoggerI18N.isDebugEnabled())
+			{
+			    tsLogger.arjLoggerI18N
+				.debug(
+				    DebugLevel.FUNCTIONS,
+				    VisibilityLevel.VIS_PUBLIC,
+				    FacilityCode.FAC_ATOMIC_ACTION,
+				    "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_9",
+				    new Object[]{Thread.currentThread(),
+						 e._control.get_uid()});
+			}
+
+			if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " exception during cancel of TX " + e._control.get_uid() + " -- rescheduling for mark-as-rollback");
+		    }
+		    else
+		    {
+			if (tsLogger.arjLoggerI18N.isDebugEnabled())
+			{
+			    tsLogger.arjLoggerI18N
+				.debug(
+				    DebugLevel.FUNCTIONS,
+				    VisibilityLevel.VIS_PUBLIC,
+				    FacilityCode.FAC_ATOMIC_ACTION,
+				    "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_8",
+				    new Object[]{Thread.currentThread(),
+						 e._control.get_uid()});
+			}
+
+			if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " failed to cancel TX " + e._control.get_uid() + " rescheduling for mark-as-rollback");
+		    }
+
+		    synchronized(this)
+		    {
+			_timeouts.remove(e._control);
+			_transactions.remove(e);
+		    }
+
+		    try
+		    {
+			if (e._control.preventCommit()) {
+			    // log a successful preventCommit()
+
+			    if (tsLogger.arjLoggerI18N.isDebugEnabled())
+			    {
+				tsLogger.arjLoggerI18N
+				    .debug(
+					DebugLevel.FUNCTIONS,
+					VisibilityLevel.VIS_PUBLIC,
+					FacilityCode.FAC_ATOMIC_ACTION,
+					"com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_14",
+					new Object[]{Thread.currentThread(),
+						     e._control.get_uid()});
+			    }
+			    if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " successfully marked TX " + e._control.get_uid() + " as rollback only");
+			}
+			else
+			{
+			    // log a failed preventCommit()
+
+			    if (tsLogger.arjLoggerI18N.isDebugEnabled())
+			    {
+				tsLogger.arjLoggerI18N
+				    .debug(
+					DebugLevel.ERROR_MESSAGES,
+					VisibilityLevel.VIS_PUBLIC,
+					FacilityCode.FAC_ATOMIC_ACTION,
+					"com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_15",
+					new Object[]{Thread.currentThread(),
+						     e._control.get_uid()});
+			    }
+			    if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " failed to mark TX " + e._control.get_uid() + " as rollback only");
+			}
+		    }
+		    catch(Exception e1)
+		    {
+			// log an exception under preventCommit()
+
+			if (tsLogger.arjLoggerI18N.isDebugEnabled()) {
+			    tsLogger.arjLoggerI18N
+				.debug(
+				    DebugLevel.ERROR_MESSAGES,
+				    VisibilityLevel.VIS_PUBLIC,
+				    FacilityCode.FAC_ATOMIC_ACTION,
+				    "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_16",
+				    new Object[]{Thread.currentThread(),
+						 e._control.get_uid()});
+			}
+			if (TransactionReaper.printTestOutput) System.out.println("Reaper Worker " + Thread.currentThread() + " exception during mark-as-rollback of TX " + e._control.get_uid());
+		    }
+		}
+	    }
+        }
+
 	/**
 	 * @return the number of items in the reaper's list.
 	 * @since JTS 2.2.
@@ -369,13 +868,39 @@
 		if (control == null)
 			return false;
 
+		ReaperElement key;
+
                 synchronized(this)
                 {
-                    ReaperElement key = (ReaperElement)_timeouts.remove(control);
+                    key = (ReaperElement)_timeouts.remove(control);
                     if(key == null) {
                             return false;
                     }
-                    return _transactions.remove(key);
+		}
+
+		// if a cancellation is in progress then we have to
+		// see it through as we have to ensure that the worker
+		// thread does not get wedged. so we have to tell the
+		// control has gone away. in order to test the status
+		// we need to synchronize on the element before we
+		// synchronize on this so we can ensure that we don't
+		// deadlock ourselves.
+
+		synchronized(key)
+		{
+                    if (key._status != ReaperElement.RUN)
+                    {
+                         // we are cancelling this TX anyway and need
+                         // to track the progress of the cancellation
+                         // using this entry so we cnanot remove it
+
+                         return false;
+                    }
+
+		    synchronized(this)
+		    {
+			return _transactions.remove(key);
+		    }
                 }
 	}
 
@@ -416,7 +941,7 @@
 						DebugLevel.FUNCTIONS,
 						VisibilityLevel.VIS_PUBLIC,
 						FacilityCode.FAC_ATOMIC_ACTION,
-						"com.arjuna.ats.arjuna.coordinator.TransactionReaper_6",
+						"com.arjuna.ats.arjuna.coordinator.TransactionReaper_3",
 						new Object[]
 								{ control, timeout });
 
@@ -476,12 +1001,98 @@
 
 			TransactionReaper._theReaper = new TransactionReaper(checkPeriod);
 
+			String cancelWait = arjPropertyManager.propertyManager
+					.getProperty(Environment.TX_REAPER_CANCEL_WAIT_PERIOD);
+			if (cancelWait != null)
+			{
+                             try
+                             {
+				 TransactionReaper._theReaper._cancelWaitPeriod = Long.valueOf(cancelWait).longValue();
+                             }
+                             catch (NumberFormatException e)
+                             {
+                                  TransactionReaper._theReaper._cancelWaitPeriod = defaultCancelWaitPeriod;
+                             }
+
+			     // must give TX at least 10 millisecs to
+			     // respond to cancel
+
+                             if (TransactionReaper._theReaper._cancelWaitPeriod < 10) {
+                                  TransactionReaper._theReaper._cancelWaitPeriod = 10;
+                             }
+			}
+                        else
+                        {
+                             TransactionReaper._theReaper._cancelWaitPeriod = defaultCancelWaitPeriod;
+                        }
+                        
+			String cancelFailWait = arjPropertyManager.propertyManager
+					.getProperty(Environment.TX_REAPER_CANCEL_FAIL_WAIT_PERIOD);
+			if (cancelFailWait != null)
+			{
+                             try
+                             {
+                                  TransactionReaper._theReaper._cancelFailWaitPeriod = Long.valueOf(cancelFailWait).longValue();
+                             }
+                             catch (NumberFormatException e)
+                             {
+                                  TransactionReaper._theReaper._cancelFailWaitPeriod = defaultCancelFailWaitPeriod;
+                             }
+
+			     // must give TX at least 10 millisecs to
+			     // respond to cancel
+
+                             if (TransactionReaper._theReaper._cancelFailWaitPeriod < 10) {
+                                  TransactionReaper._theReaper._cancelFailWaitPeriod = 10;
+                             }
+			}
+                        else
+                        {
+                             TransactionReaper._theReaper._cancelFailWaitPeriod = defaultCancelFailWaitPeriod;
+                        }
+
+			String zombieMax = arjPropertyManager.propertyManager
+					.getProperty(Environment.TX_REAPER_ZOMBIE_MAX);
+			if (zombieMax != null)
+			{
+                             try
+                             {
+                                  TransactionReaper._theReaper._zombieMax = Integer.valueOf(zombieMax).intValue();
+                             }
+                             catch (NumberFormatException e)
+                             {
+                                  TransactionReaper._theReaper._zombieMax = defaultZombieMax;
+                             }
+			     // we start bleating if the zombie count
+			     // reaches zombieMax so it has to be at
+			     // least 1
+
+                             if (TransactionReaper._theReaper._zombieMax <= 0) {
+                                  TransactionReaper._theReaper._zombieMax = 1;
+                             }
+			}
+                        else
+                        {
+                             TransactionReaper._theReaper._zombieMax = defaultZombieMax;
+                        }
+
+                        // use defaults for now
+
+                        TransactionReaper._theReaper._cancelWaitPeriod = defaultCancelWaitPeriod;
+                        TransactionReaper._theReaper._cancelFailWaitPeriod = defaultCancelFailWaitPeriod;
+                        TransactionReaper._theReaper._zombieMax = defaultZombieMax;
+
 			_reaperThread = new ReaperThread(TransactionReaper._theReaper);
 			// _reaperThread.setPriority(Thread.MIN_PRIORITY);
 
 			_reaperThread.setDaemon(true);
 
+			_reaperWorkerThread = new ReaperWorkerThread(TransactionReaper._theReaper);
+			_reaperWorkerThread.setDaemon(true);
+
 			_reaperThread.start();
+
+			_reaperWorkerThread.start();
 		}
 
 		return TransactionReaper._theReaper;
@@ -520,6 +1131,9 @@
 	}
 
 	public static final long defaultCheckPeriod = 120000; // in milliseconds
+	public static final long defaultCancelWaitPeriod = 500; // in milliseconds
+	public static final long defaultCancelFailWaitPeriod = 500; // in milliseconds
+	public static final int defaultZombieMax = 8;
 
 	static final void reset()
 	{
@@ -529,14 +1143,41 @@
 	private SortedSet _transactions = Collections.synchronizedSortedSet(new TreeSet()); // C of ReaperElement
 	private Map _timeouts = Collections.synchronizedMap(new HashMap()); // key = Reapable, value = ReaperElement
 
+	private List _workQueue = new LinkedList(); // C of ReaperElement
+
 	private long _checkPeriod = 0;
 
+        /**
+	 * number of millisecs delay afer a cancel() is scheduled
+	 * before the reaper tries to interrupt the worker thread
+	 * executing the cancel()
+	 */
+	private long _cancelWaitPeriod = 0;
+
+        /**
+	 * number of millisecs delay afer a worker thread is
+	 * interrupted before the reaper writes the it off as a zombie
+	 * and starts a new thread
+	 */
+	private long _cancelFailWaitPeriod = 0;
+
+        /**
+	 * threshold for count of non-exited zombies at which system
+	 * starts logging error messages
+	 */
+	private int _zombieMax = 0;
+
 	private static TransactionReaper _theReaper = null;
 
 	private static ReaperThread _reaperThread = null;
 
+	private static ReaperWorkerThread _reaperWorkerThread = null;
+
 	private static boolean _dynamic = false;
 
 	private static long _lifetime = 0;
 
+	private static int _zombieCount = 0;
+
+        public final static boolean printTestOutput = false;
 }

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java	2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElement.java	2007-07-10 15:29:45 UTC (rev 13301)
@@ -1,6 +1,6 @@
 /*
  * JBoss, Home of Professional Open Source
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
  * as indicated by the @author tags.
  * See the copyright.txt in the distribution for a
  * full listing of individual contributors.
@@ -15,7 +15,7 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  * MA  02110-1301, USA.
  *
- * (C) 2005-2006,
+ * (C) 2005-2007,
  * @author JBoss Inc.
  */
 /*
@@ -58,6 +58,8 @@
 
 		_control = control;
 		_timeout = timeout;
+		_status = RUN;
+                _worker = null;
 
 		/*
 		 * Given a timeout period in seconds, calculate its absolute value from
@@ -109,4 +111,148 @@
 	public long _absoluteTimeout;
 
 	public int _timeout;
+
+        /*
+         * status field to track the progress of the reaper worker which is
+         * attempting to cancel the associated TX. this is necessary to ensure
+         * that the the reaper only interrupts the worker in the windows where
+         * it may be exposed to being wedged by client code and is sure to be
+         * able to catch and handle an interrupt without dying. all accesses
+         * to this field must be synchronized on the containing element.
+         *
+         * this field is always initialised to RUN, indicating that
+         * the element is associated with a running transaction. the
+         * following transitions can occur, performed either by the
+         * reaper (R) or the worker (W)
+         *
+         * RUN --> SCHEDULE_CANCEL (R)
+         *
+         * SCHEDULE_CANCEL --> CANCEL (W)
+         *
+         * CANCEL --> COMPLETE (W)
+         *
+         * CANCEL --> FAIL (W)
+         *
+         * CANCEL --> CANCEL_INTERRUPTED (R)
+         *
+         * CANCEL_INTERRUPTED --> COMPLETE (W)
+         *
+         * CANCEL_INTERRUPTED --> FAIL (W)
+         *
+         * CANCEL_INTERRUPTED --> ZOMBIE (R)
+         */
+
+        public int _status;
+
+        /*
+         * the reaper worker which is attempting to cancel the associated TX
+         */
+
+        public Thread _worker;
+
+        /*
+         * status values for progressing reaper element from default
+         * RUN status through stages of cancellation from
+         * SCHEDULE_CANCEL to ZOMBIE. the reaper thread can only
+         * interrupt a reaper worker thread if it is wedged while
+         * the element is in state CANCEL.
+         */
+
+        /*
+         * status of a reaper element for a TX which has not yet timed out
+         */
+
+        public static final int RUN = 0;
+
+        /*
+         * status of a reaper element which has been queued for
+         * cancellation by a reaper worker
+         */
+
+        public static final int SCHEDULE_CANCEL = 1;
+
+        /*
+         * status of a reaper element while the reaper worker is under
+         * a call  to the TX cancel operation  and, hence, potentially
+         * exposed to  being wedged. the reaper may safely interrupt
+         * the worker when it is in this state
+         */
+
+        public static final int CANCEL = 2;
+
+
+        /*
+         * status of a reaper element if the reaper thread has
+         * interrupted the worker during the CANCEL state. if the
+         * reaper discovers the thread is still in this state
+         * following a suitable delay then the thread is seriously
+         * wedged (possibly on a non-interruptible i/o) and requires
+         * notice of termination (by resetting the state to ZOMBIE)
+         * and replacement by a new worker thread.
+         */
+
+        public static final int CANCEL_INTERRUPTED = 3;
+
+        /*
+         * status of a reaper element if the reaper worker has been
+         * unable to cancel the TX and has failed to mark it as
+         * rollback only. it is safe for the reaper to remove the
+         * element from the transactions list if it is in this state
+         * (modulo synchronization) although the worker should do so
+         * with minimal delay.
+         */
+
+        public static final int FAIL = 4;
+
+        /*
+         * status of a reaper element if the reaper worker has been
+         * able to cancel the tx or has marked it as rollback only. it
+         * is safe for the reaper to remove the element from the
+         * transactions list if it is in this state (modulo
+         * synchronization) although the worker should do so with
+         * minimal delay.
+         */
+
+        public static final int COMPLETE = 5;
+
+        /*
+         * status of a reaper element if the worker got so wedged it
+         * failed to respond to an interrupt either during
+         * cancellation or marking as rollback only. if the worker
+         * wakes up and finds the element in this state then it must
+         * exit. the reaper will have ensured that the failure to
+         * cancel and rollback the transaction has been logged and
+         * will have removed the element from the transactions list.
+         */
+
+        public static final int ZOMBIE = 6;
+
+        /*
+         * convenience method to provide printable string for current status
+         * for use in debugging/logging. should only be called while
+         * synchronized.
+         */
+
+         public final String statusName()
+         {
+              switch (_status)
+              {
+              case RUN:
+                   return "RUN";
+              case SCHEDULE_CANCEL:
+                   return "SCHEDULE_CANCEL";
+              case CANCEL:
+                   return "CANCEL";
+              case CANCEL_INTERRUPTED:
+                   return "CANCEL_INTERRUPTED";
+              case FAIL:
+                   return "FAIL";
+              case COMPLETE:
+                   return "COMPLETE";
+              case ZOMBIE:
+                   return "ZOMBIE";
+              default:
+                   return "UNKNOWN";
+              }
+         }
 }

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperThread.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperThread.java	2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperThread.java	2007-07-10 15:29:45 UTC (rev 13301)
@@ -1,6 +1,6 @@
 /*
  * JBoss, Home of Professional Open Source
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors 
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors 
  * as indicated by the @author tags. 
  * See the copyright.txt in the distribution for a
  * full listing of individual contributors. 
@@ -15,7 +15,7 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
  * MA  02110-1301, USA.
  * 
- * (C) 2005-2006,
+ * (C) 2005-2007,
  * @author JBoss Inc.
  */
 /*
@@ -70,6 +70,8 @@
     				     FacilityCode.FAC_ATOMIC_ACTION, "ReaperThread.run ()");
     	}
     	
+        if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : started");
+
     	for (;;)
     	{
     	    /*
@@ -80,27 +82,37 @@
     
             synchronized(reaperObject)
             {
-        		sleepPeriod = reaperObject.checkingPeriod();
+		sleepPeriod = reaperObject.checkingPeriod();
         
                 if (sleepPeriod > 0)
                 {
-            		try
-            		{
-            		    if (tsLogger.arjLoggerI18N.isDebugEnabled())
-            		    {
-            		        tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
-            						     FacilityCode.FAC_ATOMIC_ACTION,
-            						     "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_1", 
-            						     new Object[]{Thread.currentThread(),
-            								  Long.toString(sleepPeriod)});
-            		    }
-            
-            		    reaperObject.wait(sleepPeriod);
-            		}
-            		catch (InterruptedException e1) {}
+                     try
+                     {
+                          if (tsLogger.arjLoggerI18N.isDebugEnabled())
+                          {
+                               tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+                                                            FacilityCode.FAC_ATOMIC_ACTION,
+                                                            "com.arjuna.ats.internal.arjuna.coordinator.ReaperThread_1", 
+                                                            new Object[]{Thread.currentThread(),
+                                                                         Long.toString(sleepPeriod)});
+                          }
+
+                          if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : sleeping " + Long.toString(sleepPeriod));
+
+                          reaperObject.wait(sleepPeriod);
+                     }
+                     catch (InterruptedException e1) {}
                 }
             }
     
+            if (tsLogger.arjLogger.debugAllowed())
+            {
+                 tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+                                          FacilityCode.FAC_ATOMIC_ACTION, "ReaperThread.run ()");
+            }
+    	
+            if (TransactionReaper.printTestOutput) System.out.println("Reaper " + Thread.currentThread() + " : woke up");
+
     	    if (_shutdown)
     	        return;
     

Added: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperWorkerThread.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperWorkerThread.java	                        (rev 0)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperWorkerThread.java	2007-07-10 15:29:45 UTC (rev 13301)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors 
+ * as indicated by the @author tags. 
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors. 
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 
+ * MA  02110-1301, USA.
+ * 
+ * (C) 2006-2007,
+ * @author JBoss Inc.
+ */
+
+package com.arjuna.ats.internal.arjuna.coordinator;
+
+import com.arjuna.ats.arjuna.logging.tsLogger;
+import com.arjuna.ats.arjuna.logging.FacilityCode;
+
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+
+import com.arjuna.common.util.logging.*;
+
+/**
+ * Class to reap timed out transactions on behalf of the transaction reaper
+ * thread which is dispatched to terminate a series of transactions when their
+ * timeout elapses.
+ *
+ * @author Andrew Dinn (adinn at redhat.com) 2007-07-08
+ */
+
+public class ReaperWorkerThread extends Thread
+{
+
+    public ReaperWorkerThread (TransactionReaper arg)
+    {
+        _theReaper = arg;
+        _shutdown = false;
+    }
+    
+    /**
+     * @message com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkThread_1 [com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkThread_1] - Thread {0} waiting for cancelled TXs
+     * @message com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkThread_2 [com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkThread_2] - Thread {0} performing cancellations
+     */
+
+public void run ()
+    {
+         if (tsLogger.arjLogger.debugAllowed())
+         {
+              tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+                                       FacilityCode.FAC_ATOMIC_ACTION, "ReaperWorkerThread.run ()");
+         }
+    	
+        if (TransactionReaper.printTestOutput) System.out.println("Worker " + this + " : started");
+
+        for (;;)
+    	{
+             // wait for the reaper thread to queue some TXs for
+             // this thread to cancel
+
+             if (tsLogger.arjLoggerI18N.isDebugEnabled())
+             {
+                  tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+                                               FacilityCode.FAC_ATOMIC_ACTION,
+                                               "com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkerThread_1", 
+                                               new Object[]{Thread.currentThread()});
+             }
+
+             if (TransactionReaper.printTestOutput) System.out.println("Worker " + this + " : waiting");
+
+             _theReaper.waitForCancellations();
+
+             // check for shutdown before we wait again
+
+             if (_shutdown)
+                  return;
+
+             // get the reaper to cancel any TXs queued for cancellation.
+
+             if (tsLogger.arjLoggerI18N.isDebugEnabled())
+             {
+                  tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+                                               FacilityCode.FAC_ATOMIC_ACTION,
+                                               "com.arjuna.ats.internal.arjuna.coordinator.ReaperWorkerThread_2", 
+                                               new Object[]{Thread.currentThread()});
+             }
+
+             if (TransactionReaper.printTestOutput) System.out.println("Worker " + this + " : doing cancellations");
+
+             _theReaper.doCancellations();
+
+            // check for shutdown before we wait again
+
+            if (_shutdown)
+    	        return;
+        }
+    }
+
+    public void shutdown ()
+    {
+	_shutdown = true;
+    }
+
+    private TransactionReaper _theReaper;
+    private boolean           _shutdown;
+    
+}

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/etc/default-arjuna-properties.xml
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/etc/default-arjuna-properties.xml	2007-07-10 15:27:43 UTC (rev 13300)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/etc/default-arjuna-properties.xml	2007-07-10 15:29:45 UTC (rev 13301)
@@ -44,6 +44,24 @@
         name="com.arjuna.ats.arjuna.coordinator.txReaperMode"
         value="NORMAL"/>
       <!--
+        Transaction Reaper Cancel Wait Period (default is 500 ms, min is 10 msecs).
+      -->
+      <property
+        name="com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod"
+        value="500"/>
+      <!--
+        Transaction Reaper Cancel Fail Wait Period (default is 500 ms, min is 10 msecs).
+      -->
+      <property
+        name="com.arjuna.ats.arjuna.coordinator.txReaperCancelFailWaitPeriod"
+        value="500"/>
+      <!--
+        Transaction Reaper Zombie Max (default is 8).
+      -->
+      <property
+        name="com.arjuna.ats.arjuna.coordinator.txReaperZombieMax"
+        value="8"/>
+      <!--
         (default is NO)
       -->
       <property

Added: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java	                        (rev 0)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java	2007-07-10 15:29:45 UTC (rev 13301)
@@ -0,0 +1,270 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2007,
+ * @author JBoss Inc.
+ */
+package com.hp.mwtests.ts.arjuna.reaper;
+
+import junit.framework.TestCase;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.Reapable;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.internal.arjuna.coordinator.ReaperElement;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Exercise cancellation behaviour of TransactionReaper with resources
+ * that time out and, optionally, get wedged either when a cancel is
+ * tried and/or when an interrupt is delivered
+ *
+ * @author Andrwe Dinn (adinn at redhat.com), 2007-07-09
+ */
+
+public class ReaperTestCase2 extends TestCase
+{
+    public static Test suite()
+    {
+	return new TestSuite(ReaperTestCase2.class);
+    }
+
+    public void testReaper() throws Exception
+    {
+	TransactionReaper.create(500);
+	TransactionReaper reaper = TransactionReaper.transactionReaper();
+
+	// give the reaper worker time to start too
+
+	Thread.sleep(1000);
+
+	// create slow reapables some of which will not respond immediately
+        // to cancel requests and ensure that they get cancelled
+        // and that the reaper does not get wedged
+
+	SlowReapable reapable1 = new SlowReapable(new Uid(), 2000, 0, true, true);
+	SlowReapable reapable2 = new SlowReapable(new Uid(), 0, 0, true, true);
+	SlowReapable reapable3 = new SlowReapable(new Uid(), 100, 2000, false, true);
+	SlowReapable reapable4 = new SlowReapable(new Uid(), 0, 0, false, false);
+
+	// insert reapables so they timeout at 1 second intervals then
+	// check progress of cancellations and rollbacks
+
+	assertTrue(reaper.insert(reapable1, 1));
+
+	assertTrue(reaper.insert(reapable2, 2));
+
+	assertTrue(reaper.insert(reapable3, 3));
+
+	assertTrue(reaper.insert(reapable4, 4));
+
+	// make sure they were all registered
+
+	assertEquals(4, reaper.numberOfTransactions());
+	assertEquals(4, reaper.numberOfTimeouts());
+
+	// n.b. the reaper will not operate in dynamic mode by default
+	// so we have to allow an extra checkPeriod millisecs for it
+	// to detect timeouts (it may go back to sleep a few
+	// milliseconds before a transaction times out). also by
+	// default the reaper waits 500 msecs for a cancel to take
+	// effect before interrupting and 500 msecs for an interrupt
+	// to take effect before making a wedged worker a zombie. so
+	// these need to be factored into this thread's delays when
+	// tetsing the state of the reapables.
+
+	// wait at most 2 seconds for the first reapable to be cancelled
+
+	int count = 0;
+
+	while (!reapable1.getCancelTried() && count < 20) {
+	    count++;
+	    Thread.sleep(100);
+	}
+
+	assertTrue(count < 20);
+
+	// ensure that the second one gets cancelled even if  the
+	// first one is wedged
+
+	count = 0;
+
+	while (reapable2.getRunning() && count < 15) {
+	    count++;
+	    Thread.sleep(100);
+	}
+
+	assertTrue(count < 15);
+
+	// ensure also that the second one gave up at the cancel and
+	// not the mark for rollback
+
+	assertTrue(reapable2.getCancelTried());
+	assertTrue(!reapable2.getRollbackTried());
+
+	// ensure that the first one responded to the interrupt and
+	// marks itself for rollback only
+
+	count = 0;
+
+	while (reapable1.getRunning() && count < 10) {
+	    count++;
+	    Thread.sleep(100);
+	}
+
+	assertTrue(count < 10);
+	assertTrue(reapable1.getRollbackTried());
+	
+	// check that the third one refuses the cancel and gets marked
+	// for rollback instead
+
+	count = 0;
+
+	while (!reapable3.getCancelTried() && count < 25) {
+	    count++;
+	    Thread.sleep(100);
+	}
+
+	assertTrue(count < 25);
+
+	// ensure that it gets marked for rollback
+
+	count = 0;
+
+	while (reapable3.getRunning() && count < 10) {
+	    count++;
+	    Thread.sleep(100);
+	}
+
+	assertTrue(count < 10);
+
+	assertTrue(reapable3.getRollbackTried());
+
+	// ensure the fourth one gets cancelled and marked for rolback
+	// even though it does not play ball
+
+	count = 0;
+
+	while (reapable4.getRunning() && count < 25) {
+	    count++;
+	    Thread.sleep(100);
+	}
+
+	assertTrue(count < 25);
+
+	assertTrue(reapable4.getCancelTried());
+	assertTrue(reapable4.getRollbackTried());
+    }
+
+    public class SlowReapable implements Reapable
+    {
+	public SlowReapable(Uid uid, int callDelay, int interruptDelay, boolean doCancel, boolean doRollback)
+	{
+	    this.uid = uid;
+            this.callDelay = callDelay;
+            this.interruptDelay = interruptDelay;
+            this.doCancel = doCancel;
+            this.doRollback = doRollback;
+	    cancelTried = false;
+	    rollbackTried = false;
+	    running = true;
+        }
+
+	public boolean running()
+	{
+	    return getRunning();
+	}
+
+	public boolean preventCommit()
+	{
+	    setRollbackTried();
+	    clearRunning();
+	    return doRollback;
+	}
+
+	public int cancel()
+	{
+	    boolean interrupted = false;
+
+	    setCancelTried();
+
+	    if (callDelay > 0) {
+		try {
+		    Thread.sleep(callDelay);
+		} catch (InterruptedException e) {
+		    interrupted = true;
+		}
+	    }
+	    if (interrupted && interruptDelay > 0) {
+		try {
+		    Thread.sleep(interruptDelay);
+		} catch (InterruptedException e) {
+		}
+	    }
+
+	    if (doCancel) {
+		clearRunning();
+		return ActionStatus.ABORTED;
+	    } else {
+		return ActionStatus.RUNNING;
+	    }
+	}
+
+	public Uid get_uid()
+	{
+	    return uid;
+	}
+
+	private Uid uid;
+        private int callDelay; // in milliseconds
+        private int interruptDelay; // in milliseconds
+        private boolean doCancel;
+        private boolean doRollback;
+	private boolean cancelTried;
+	private boolean rollbackTried;
+	private boolean running;
+
+	public synchronized void setCancelTried()
+	{
+	    cancelTried = true;
+	}
+	public synchronized boolean getCancelTried()
+	{
+	    return cancelTried;
+	}
+	public synchronized void setRollbackTried()
+	{
+	    rollbackTried = true;
+	}
+	public synchronized boolean getRollbackTried()
+	{
+	    return rollbackTried;
+	}
+	public synchronized void clearRunning()
+	{
+	    running = false;
+	}
+	public synchronized boolean getRunning()
+	{
+	    return running;
+	}
+    }
+}




More information about the jboss-svn-commits mailing list