[jboss-svn-commits] JBL Code SVN: r29634 - in labs/jbosstm/trunk: ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator and 10 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Oct 16 09:08:51 EDT 2009
Author: jhalliday
Date: 2009-10-16 09:08:50 -0400 (Fri, 16 Oct 2009)
New Revision: 29634
Added:
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElementManager.java
Modified:
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/AtomicAction.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperMonitorTest.java
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java
labs/jbosstm/trunk/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java
labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/TransactionImple.java
labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/subordinate/jca/SubordinateAtomicTransaction.java
labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/interposition/ServerFactory.java
labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/TransactionFactoryImple.java
labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/coordinator/ArjunaTransactionImple.java
labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/jts/OTSManager.java
labs/jbosstm/trunk/atsintegration/classes/com/arjuna/ats/jbossatx/jta/TransactionManagerService.java
Log:
transaction reaper changes. JBTM-624
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/AtomicAction.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/AtomicAction.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/AtomicAction.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -143,7 +143,7 @@
_timeout = TxControl.getDefaultTimeout();
if (_timeout > 0)
- TransactionReaper.transactionReaper(true).insert(this, _timeout);
+ TransactionReaper.transactionReaper().insert(this, _timeout);
}
return status;
@@ -181,7 +181,7 @@
ThreadActionData.popAction();
- TransactionReaper.create().remove(this);
+ TransactionReaper.transactionReaper().remove(this);
return status;
}
@@ -205,7 +205,7 @@
ThreadActionData.popAction();
- TransactionReaper.create().remove(this);
+ TransactionReaper.transactionReaper().remove(this);
return status;
}
@@ -219,7 +219,7 @@
* the thread-to-tx association though.
*/
- TransactionReaper.create().remove(this);
+ TransactionReaper.transactionReaper().remove(this);
return outcome;
}
@@ -233,7 +233,7 @@
* the thread-to-tx association though.
*/
- TransactionReaper.create().remove(this);
+ TransactionReaper.transactionReaper().remove(this);
return outcome;
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -44,7 +44,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -117,13 +116,14 @@
public class TransactionReaper
{
+
public static final String NORMAL = "NORMAL";
public static final String DYNAMIC = "DYNAMIC";
public static final String PERIODIC = "PERIODIC"; // the new name for 'NORMAL'
- public TransactionReaper(long checkPeriod)
+ private TransactionReaper(long checkPeriod)
{
if (tsLogger.arjLogger.debugAllowed()) {
tsLogger.arjLogger.debug(DebugLevel.CONSTRUCTORS,
@@ -138,20 +138,15 @@
public final long checkingPeriod()
{
if (_dynamic) {
- try {
- return nextDynamicCheckTime.get() - System.currentTimeMillis();
- }
- catch (final NoSuchElementException nsee) {
- return Long.MAX_VALUE; // list is empty, so we can sleep until something is inserted.
- }
+ return nextDynamicCheckTime.get() - System.currentTimeMillis();
} else {
// if we have a cancel in progress which needs
// checking up on then we have to wake up in time
// for it whether we are using a static or
// dynamic model
- try {
- final ReaperElement head = _transactions.first();
+ final ReaperElement head = _reaperElements.getFirst();
+ if(head != null) {
if (head._status != ReaperElement.RUN) {
long waitTime = head.getAbsoluteTimeout() - System.currentTimeMillis();
if (waitTime < _checkPeriod) {
@@ -159,8 +154,6 @@
}
}
}
- catch (final NoSuchElementException nsee) {
- }
return _checkPeriod;
}
@@ -184,7 +177,7 @@
}
do {
- final ReaperElement e;
+ final ReaperElement reaperElement;
synchronized (this) {
final long now = System.currentTimeMillis();
@@ -202,10 +195,10 @@
break;
}
- try {
- e = _transactions.first();
- }
- catch (final NoSuchElementException nsee) {
+ reaperElement = _reaperElements.getFirst();
+ // TODO close window where first can change - maybe record nextDynamicCheckTime before probing first,
+ // then use compareAndSet? Although something will need to check before sleeping anyhow...
+ if(reaperElement == null) {
nextDynamicCheckTime.set(Long.MAX_VALUE);
return;
}
@@ -213,7 +206,7 @@
if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TransactionReaper_18",
- new Object[] {e._control.get_uid(), e.statusName()});
+ new Object[] {reaperElement._control.get_uid(), reaperElement.statusName()});
}
// if we have to synchronize on multiple objects we always
@@ -222,8 +215,8 @@
// ensure we don't deadlock. We never sychronize on the
// reaper and the cancel queue at the same time.
- synchronized (e) {
- switch (e._status) {
+ synchronized (reaperElement) {
+ switch (reaperElement._status) {
case ReaperElement.RUN: {
// this tx has just timed out. remove it from the
// TX list, update the timeout to take account of
@@ -231,16 +224,16 @@
// TX. this ensures we process it again if it does
// not get cancelled in time
- e._status = ReaperElement.SCHEDULE_CANCEL;
+ reaperElement._status = ReaperElement.SCHEDULE_CANCEL;
- reinsertElement(e, _cancelWaitPeriod);
+ reinsertElement(reaperElement, _cancelWaitPeriod);
if (tsLogger.arjLogger.debugAllowed()) {
tsLogger.arjLogger
.debug(
DebugLevel.FUNCTIONS,
VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "Reaper scheduling TX for cancellation " + e._control.get_uid());
+ "Reaper scheduling TX for cancellation " + reaperElement._control.get_uid());
}
// insert into cancellation queue for a worker
@@ -248,7 +241,7 @@
// thread is awake
synchronized (_workQueue) {
- _workQueue.add(e);
+ _workQueue.add(reaperElement);
_workQueue.notifyAll();
}
}
@@ -266,14 +259,14 @@
// ensure the wedged TX entry comes to the
// front of the queue.
- reinsertElement(e, _cancelWaitPeriod);
+ reinsertElement(reaperElement, _cancelWaitPeriod);
if (tsLogger.arjLogger.debugAllowed()) {
tsLogger.arjLogger
.debug(
DebugLevel.FUNCTIONS,
VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "Reaper deferring interrupt for TX scheduled for cancel " + e._control.get_uid());
+ "Reaper deferring interrupt for TX scheduled for cancel " + reaperElement._control.get_uid());
}
}
break;
@@ -284,11 +277,11 @@
// check to ensure the thread responded to
// the kick
- e._status = ReaperElement.CANCEL_INTERRUPTED;
+ reaperElement._status = ReaperElement.CANCEL_INTERRUPTED;
- e._worker.interrupt();
+ reaperElement._worker.interrupt();
- reinsertElement(e, _cancelFailWaitPeriod);
+ reinsertElement(reaperElement, _cancelFailWaitPeriod);
// log that we interrupted cancel()
@@ -299,7 +292,7 @@
VisibilityLevel.VIS_PUBLIC,
FacilityCode.FAC_ATOMIC_ACTION,
"com.arjuna.ats.arjuna.coordinator.TransactionReaper_4",
- new Object[]{e._control.get_uid()});
+ new Object[]{reaperElement._control.get_uid()});
}
}
break;
@@ -311,7 +304,7 @@
// cancellations. then mark the
// transaction as rollback only.
- e._status = ReaperElement.ZOMBIE;
+ reaperElement._status = ReaperElement.ZOMBIE;
synchronized (this) {
_zombieCount++;
@@ -321,7 +314,7 @@
.debug(
DebugLevel.FUNCTIONS,
VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_ATOMIC_ACTION, "Reaper " + Thread.currentThread() + " got a zombie " + e._worker + " (zombie count now " + _zombieCount + ") cancelling " + e._control.get_uid());
+ FacilityCode.FAC_ATOMIC_ACTION, "Reaper " + Thread.currentThread() + " got a zombie " + reaperElement._worker + " (zombie count now " + _zombieCount + ") cancelling " + reaperElement._control.get_uid());
}
if (_zombieCount == _zombieMax) {
@@ -345,8 +338,8 @@
if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
tsLogger.arjLoggerI18N.warn(
"com.arjuna.ats.arjuna.coordinator.TransactionReaper_6",
- new Object[]{e._worker,
- e._control.get_uid()});
+ new Object[]{reaperElement._worker,
+ reaperElement._control.get_uid()});
}
// ok, since the worker was wedged we need to
@@ -355,27 +348,27 @@
// rollback only. we have to log a message
// whether we succeed, fail or get interrupted
- removeElementReaper(e);
+ removeElementReaper(reaperElement);
try {
- if (e._control.preventCommit()) {
+ if (reaperElement._control.preventCommit()) {
// log a successful preventCommit()
if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
tsLogger.arjLoggerI18N.warn(
"com.arjuna.ats.arjuna.coordinator.TransactionReaper_10",
- new Object[]{e._control.get_uid()});
+ new Object[]{reaperElement._control.get_uid()});
}
- notifyListeners(e._control, false);
+ notifyListeners(reaperElement._control, false);
} else {
// log a failed preventCommit()
if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
tsLogger.arjLoggerI18N.warn(
"com.arjuna.ats.arjuna.coordinator.TransactionReaper_11",
- new Object[]{e._control.get_uid()});
+ new Object[]{reaperElement._control.get_uid()});
}
}
}
@@ -385,7 +378,7 @@
if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
tsLogger.arjLoggerI18N
.warn("com.arjuna.ats.arjuna.coordinator.TransactionReaper_12",
- new Object[]{e._control.get_uid()}, e1);
+ new Object[]{reaperElement._control.get_uid()}, e1);
}
}
}
@@ -398,7 +391,7 @@
// entry so we will steal in and do it
// first
- removeElementReaper(e);
+ removeElementReaper(reaperElement);
}
break;
@@ -414,13 +407,9 @@
*/
private void reinsertElement(ReaperElement e, long delay)
{
- _transactions.remove(e);
- e.setAbsoluteTimeout((System.currentTimeMillis() + delay));
- _transactions.add(e);
-
synchronized (this) {
- ReaperElement first = _transactions.first();
- nextDynamicCheckTime.set(first.getAbsoluteTimeout());
+ long newWakeup = _reaperElements.reorder(e, delay);
+ nextDynamicCheckTime.set(newWakeup); // TODO - set should be atomic with reorder?
}
}
@@ -629,7 +618,7 @@
*/
public final long numberOfTransactions()
{
- return _transactions.size();
+ return _reaperElements.size();
}
/**
@@ -679,12 +668,12 @@
_lifetime.addAndGet(timeout);
// insert the element only if it's not already present. We check _timeouts first, as elements
- // maybe temporarily removed and reinserted in _transactions, so that is not as good a check.
- // We use lazy eval to ensure we insert to _transactions only if we inserted to _timeouts.
- // Note: removal works in reverse order i.e. _transactions then _timeouts.
- if ((_timeouts.putIfAbsent(reaperElement._control, reaperElement) != null) || (!_transactions.add(reaperElement))) {
- // we should not get an error here unless the user has coded the hashcode/equals/compareTo test wrong
- // or they try inserting the same thing twice - is that really an error or expected to fail silent?
+ // maybe temporarily removed and reinserted in _reaperElements, so that is not as good a check.
+ // We use lazy eval to ensure we insert to _reaperElements only if we inserted to _timeouts.
+ // Note: removal works in reverse order i.e. _reaperElements then _timeouts.
+ if ((_timeouts.putIfAbsent(reaperElement._control, reaperElement) == null)) {
+ _reaperElements.add(reaperElement);
+ } else {
throw new IllegalStateException(tsLogger.log_mesg.getString("com.arjuna.ats.arjuna.coordinator.TransactionReaper_1"));
}
@@ -760,7 +749,7 @@
{
// arg is an Object because ArjunaTransactionImple.propagationContext does not have a Reapable
- if ((_transactions.isEmpty()) || (control == null)) {
+ if ((_timeouts.isEmpty()) || (control == null)) {
if (tsLogger.arjLogger.debugAllowed()) {
tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
VisibilityLevel.VIS_PUBLIC,
@@ -807,7 +796,7 @@
*/
public final int getTimeout(Object control)
{
- if ((_transactions.isEmpty()) || (control == null)) {
+ if ((_timeouts.isEmpty()) || (control == null)) {
if (tsLogger.arjLogger.debugAllowed()) {
tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
VisibilityLevel.VIS_PUBLIC,
@@ -863,22 +852,14 @@
*/
if (!waitForTransactions) {
- Iterator iter = _transactions.iterator();
- ReaperElement e;
-
- while (iter.hasNext()) {
- e = (ReaperElement) iter.next();
-
- e.setAbsoluteTimeout(0);
- }
+ _reaperElements.setAllTimeoutsToZero();
}
/*
* Wait for all of the transactions to
* terminate normally.
*/
-
- while (!_transactions.isEmpty()) {
+ while (!_reaperElements.isEmpty()) {
try {
this.wait();
}
@@ -886,7 +867,6 @@
}
}
-
_reaperThread.shutdown();
notifyAll();
@@ -920,10 +900,10 @@
// called (indirectly) by user code doing removals on e.g. commit/rollback
// does not reset the wakeup time - we prefer leaving an unnecessary wakeup as it's
// cheaper than locking to recalculate the new time here.
- private final void removeElementClient(ReaperElement e)
+ private final void removeElementClient(ReaperElement reaperElement)
{
- _transactions.remove(e);
- _timeouts.remove(e._control);
+ _reaperElements.remove(reaperElement);
+ _timeouts.remove(reaperElement._control);
// don't recalc time, just wake up as planned
@@ -942,16 +922,17 @@
// called internally by the reaper when removing elements - note the different
// behaviour with regard to check time recalculation. Here we need to ensure the
// new time is correct.
- private final void removeElementReaper(ReaperElement e)
+ private final void removeElementReaper(ReaperElement reaperElement)
{
- _transactions.remove(e);
- _timeouts.remove(e._control);
+ _reaperElements.remove(reaperElement);
+ _timeouts.remove(reaperElement._control);
synchronized (this) {
try {
- ReaperElement first = _transactions.first();
+ ReaperElement first = _reaperElements.getFirst();
nextDynamicCheckTime.set(first.getAbsoluteTimeout());
- } catch(NoSuchElementException nsee) {
+ // TODO set needs tobe atomic to getFirst?
+ } catch(IndexOutOfBoundsException e) {
nextDynamicCheckTime.set(Long.MAX_VALUE);
if(_inShutdown) {
@@ -961,6 +942,8 @@
}
}
+
+
private final void notifyListeners(Reapable element, boolean rollback)
{
// notify listeners. Ignore errors.
@@ -982,15 +965,16 @@
* Currently we let the reaper thread run at same priority as other threads.
* Could get priority from environment.
*/
- public static synchronized TransactionReaper create(long checkPeriod)
+ public static synchronized void instantiate()
{
- if (tsLogger.arjLogger.debugAllowed()) {
- tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
- "TransactionReaper::create ( " + checkPeriod + " )");
- }
+ if (TransactionReaper._theReaper == null)
+ {
+ if (tsLogger.arjLogger.debugAllowed()) {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+ "TransactionReaper::instantiate()");
+ }
- if (TransactionReaper._theReaper == null) {
// default to dynamic mode
TransactionReaper._dynamic = true;
@@ -1008,11 +992,10 @@
}
}
+ long checkPeriod = Long.MAX_VALUE;
if (!TransactionReaper._dynamic) {
checkPeriod = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperTimeout();
- } else
- checkPeriod = Long.MAX_VALUE;
-
+ }
TransactionReaper._theReaper = new TransactionReaper(checkPeriod);
TransactionReaper._theReaper._cancelWaitPeriod = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperCancelWaitPeriod();
@@ -1055,31 +1038,22 @@
_reaperWorkerThread.start();
}
-
- return TransactionReaper._theReaper;
}
- public static TransactionReaper create()
- {
- return create(TransactionReaper.defaultCheckPeriod);
+ /**
+ * Starting with 4.8, this method will always return an instance, will never return null.
+ * This causes the reaper to be instantiated unnecessarily in some cases, but that's cheaper
+ * than the alternatives.
+ *
+ * @return a TransactionReaper singleton.
+ */
+ public static TransactionReaper transactionReaper() {
+ if(_theReaper == null) {
+ instantiate();
+ }
+ return _theReaper;
}
- public static TransactionReaper transactionReaper()
- {
- return transactionReaper(false);
- }
-
- /*
- * If parameter is true then do a create.
- */
- public static synchronized TransactionReaper transactionReaper(boolean createReaper)
- {
- if (createReaper)
- return create();
- else
- return _theReaper;
- }
-
/**
* Terminate the transaction reaper. This is a synchronous operation
* and will only return once the reaper has been shutdown cleanly.
@@ -1117,12 +1091,12 @@
public static final long defaultCancelFailWaitPeriod = 500; // in milliseconds
public static final int defaultZombieMax = 8;
- static final void reset()
+ static final synchronized void reset()
{
_theReaper = null;
}
- private final SortedSet<ReaperElement> _transactions = new ConcurrentSkipListSet<ReaperElement>();
+ private final ReaperElementManager _reaperElements = new ReaperElementManager();
// The keys are actually Reapable, as that's what insert takes. However, some functions use get(Object)
// and rely on clever hashcode/equals behaviour, especially for the JTS. Thus the generics key type is Object.
@@ -1135,7 +1109,7 @@
private long _checkPeriod = 0;
// Although it is atomic, writes (but not reads) need to by synchronized(this) i.e. on the TransactionReaper instance
- // in order to ensure proper timing with respect to wait/notify and wakeups on the _transactions queue.
+ // in order to ensure proper timing with respect to wait/notify and wakeups on the _reaperElements queue.
private final AtomicLong nextDynamicCheckTime = new AtomicLong(Long.MAX_VALUE);
/**
@@ -1158,7 +1132,7 @@
*/
private int _zombieMax = 0;
- private static TransactionReaper _theReaper = null;
+ private static volatile TransactionReaper _theReaper = null;
private static ReaperThread _reaperThread = null;
Added: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElementManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElementManager.java (rev 0)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElementManager.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -0,0 +1,175 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2009,
+ * @author JBoss by Red Hat.
+ */
+package com.arjuna.ats.internal.arjuna.coordinator;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/*
+ * Encapsulation of a specialised data structure with API and performance characteristics
+ * designed specifically for use by the transaction reaper.
+ *
+ * ReaperElements represent transactions which need timing out. To do this, the reaper needs
+ * to wake periodically and process any timeouts that are due. New elements are added on transaction
+ * creation and will be removed prior to their timeout if they terminate normally.
+ *
+ * For high concurrency, normal inserts and removes should not block. However, to determine the next element
+ * which needs (will need) processing, the elements must be ordered or at least searched. These requirements
+ * are in conflict, since ordering/searching requires stability i.e. locking.
+ *
+ * To achieve the desired performance characteristics, we combine two data structures: an unsorted, concurrent
+ * collection and a sorted, non-threadsafe one which is guarded by the the ReaperElementManager instance lock.
+ *
+ * Inserts are done, potentially concurrently, to the unsorted hash set. Removes likewise check this first
+ * and can return successfully without blocking if the element is found in this collection. Thus the insert/remove
+ * are cheap operations.
+ *
+ * When it is required to know the smallest (i.e. earliest to timeout) element, the contents
+ * of the unsorted set are moved to the sorted set. Since this happens infrequently compared to the insert/delete,
+ * only a fraction of the elements inserted should ever be copied - most will be removed without ever migrating.
+ *
+ * Note that additional external synchronization will be needed to ensure first element does not change
+ * between getFirst and any operation depending on its timeout value. This is the TransactionReaper's problem.
+ *
+ * The sorted set is maintained manually, rather than using Collections.sort or other comparator based structure.
+ * This is because compareTo on reaper elements is relatively expensive and we wish to avoid liner scans to minimise
+ * the number of such calls. Hence we prefer ArrayList with binary search, despite the higher insert/remove cost
+ * compared to LinkedList.
+ *
+ * Pay careful attention to locking and performance characteristics if altering this class.
+ *
+ *
+ * @author Jonathan Halliday (jonathan.halliday at redhat.com) 2009-10
+ */
+public class ReaperElementManager
+{
+ /**
+ * @return the first (i.e. earliest to time out) element of the colleciton.
+ */
+ public synchronized ReaperElement getFirst() {
+ flushPending(); // we need to order the elements before we can tell which is first.
+ if(elementsOrderedByTimeout.isEmpty()) {
+ return null;
+ } else {
+ return elementsOrderedByTimeout.get(0);
+ }
+ }
+
+ // note - unsynchronized for performance.
+ public void add(ReaperElement reaperElement) throws IllegalStateException {
+ if(pendingInsertions.putIfAbsent(reaperElement, reaperElement) != null) {
+ // note this is best effort - we'll allow double inserts if the element is also in the ordered set.
+ throw new IllegalStateException();
+ }
+ }
+
+ /**
+ * @param reaperElement the reaper element to reorder in the sorted set.
+ * @param delayMillis the ammout of time to increment the element's timeout by.
+ * @return the new soonest timeout in the set (not necessarily that of the reordered element)
+ */
+ public synchronized long reorder(ReaperElement reaperElement, long delayMillis) {
+ // assume it must be in the sorted list, as it was likely obtained via getFirst...
+ removeSorted(reaperElement);
+ // we could add delay to the original timeout, but using current time is probably safer.
+ reaperElement.setAbsoluteTimeout((System.currentTimeMillis() + delayMillis));
+ // reinsert into its new position.
+ insertSorted(reaperElement);
+
+ // getFirst takes care of flushing the pending set for us.
+ return getFirst().getAbsoluteTimeout();
+ }
+
+ // use only for testing, it's nasty from a performance perspective.
+ public synchronized int size() {
+ return (elementsOrderedByTimeout.size() + pendingInsertions.size());
+ }
+
+ public synchronized boolean isEmpty() {
+ return (elementsOrderedByTimeout.isEmpty() && pendingInsertions.isEmpty());
+ }
+
+ // strange hack to force instant expire of tx during shutdown.
+ public synchronized void setAllTimeoutsToZero() {
+ flushPending();
+ for(ReaperElement reaperElement : elementsOrderedByTimeout) {
+ reaperElement.setAbsoluteTimeout(0);
+ }
+ }
+
+ // Note - mostly unsynchronized for performance.
+ public void remove(ReaperElement reaperElement) {
+ if(pendingInsertions.remove(reaperElement) != null) {
+ return;
+ }
+
+ // we missed finding it in the unsorted set - perhaps it has already been copied to the sorted set...
+ synchronized(this) {
+ removeSorted(reaperElement);
+ }
+ }
+
+ ////////////
+
+ // Private methods and structures are guarded where needed by ReaperElementManager instance locks in the
+ // public methods - see class header doc comments for concurrency/performance info.
+
+ private final ArrayList<ReaperElement> elementsOrderedByTimeout = new ArrayList<ReaperElement>();
+ private final ConcurrentHashMap<ReaperElement, ReaperElement> pendingInsertions = new ConcurrentHashMap<ReaperElement, ReaperElement>();
+
+ private void removeSorted(ReaperElement reaperElement) {
+ int location = Collections.binarySearch(elementsOrderedByTimeout, reaperElement);
+ if(location >= 0) {
+ elementsOrderedByTimeout.remove(location);
+ }
+ }
+
+ private void insertSorted(ReaperElement reaperElement) {
+ int location = Collections.binarySearch(elementsOrderedByTimeout, reaperElement);
+ if(location >= 0) {
+ throw new IllegalStateException();
+ }
+ int insertionPoint = -(location + 1);
+ elementsOrderedByTimeout.add(insertionPoint, reaperElement);
+ }
+
+ private void flushPending() {
+
+ // purge the pending inserts before doing anything else. This is potentially expensive.
+ // Future versions may prefer to insert only a portion of the pending set, or
+ // iterate it each time to determine the smallest (head) element.
+ Set<Map.Entry<ReaperElement,ReaperElement>> entrySet = pendingInsertions.entrySet();
+ if(entrySet != null) {
+ Iterator<Map.Entry<ReaperElement, ReaperElement>> queueIter = entrySet.iterator();
+ // iterator is weakly consistent - will traverse elements present at its time of creation,
+ // may or may not see later updates.
+ while(queueIter.hasNext()) {
+ Map.Entry<ReaperElement,ReaperElement> entry = queueIter.next();
+ ReaperElement element = entry.getValue();
+ // insert/remove not locked, so we are careful to check that we don't insert
+ // an element that has been removed from the pending set by a concurrent thread.
+ if(entrySet.remove(entry)) {
+ insertSorted(element);
+ }
+ }
+ }
+ }
+}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperMonitorTest.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperMonitorTest.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperMonitorTest.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -67,7 +67,6 @@
@Test
public void test()
{
- TransactionReaper.create(100);
TransactionReaper reaper = TransactionReaper.transactionReaper();
DummyMonitor listener = new DummyMonitor();
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -43,7 +43,6 @@
{
// test set+readback of interval
- TransactionReaper.create(100);
TransactionReaper reaper = TransactionReaper.transactionReaper();
// set value is ignored in default DYNAMIC mode, it uses max long instead.
assertEquals(Long.MAX_VALUE, reaper.checkingPeriod());
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -21,10 +21,7 @@
package com.hp.mwtests.ts.arjuna.reaper;
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
-import com.arjuna.ats.arjuna.coordinator.Reapable;
-import com.arjuna.ats.arjuna.coordinator.ActionStatus;
import com.arjuna.ats.arjuna.common.Uid;
-import com.arjuna.ats.arjuna.logging.tsLogger;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -42,7 +39,6 @@
@Test
public void testReaper() throws Exception
{
- TransactionReaper.create(100);
TransactionReaper reaper = TransactionReaper.transactionReaper();
// create slow reapables some of which will not respond immediately
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -33,7 +33,6 @@
@Test
public void testReaperWait() throws Exception
{
- TransactionReaper.create(500);
TransactionReaper reaper = TransactionReaper.transactionReaper();
// give the reaper worker time to start too
@@ -144,7 +143,6 @@
@Test
public void testReaperForce() throws Exception
{
- TransactionReaper.create(5000);
TransactionReaper reaper = TransactionReaper.transactionReaper();
// give the reaper worker time to start too
Modified: labs/jbosstm/trunk/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -61,7 +61,7 @@
// if it has a non-negative timeout, add it to the reaper.
if (timeout > AtomicAction.NO_TIMEOUT)
- TransactionReaper.transactionReaper(true).insert(this, timeout);
+ TransactionReaper.transactionReaper().insert(this, timeout);
}
/**
Modified: labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/TransactionImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/TransactionImple.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/TransactionImple.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -55,7 +55,6 @@
import com.arjuna.ats.jta.exceptions.InactiveTransactionException;
import com.arjuna.ats.jta.exceptions.InvalidTerminationStateException;
import com.arjuna.ats.jta.logging.*;
-import com.arjuna.ats.jts.common.jtsPropertyManager;
import com.arjuna.ats.internal.jta.xa.TxInfo;
Modified: labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/subordinate/jca/SubordinateAtomicTransaction.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/subordinate/jca/SubordinateAtomicTransaction.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/subordinate/jca/SubordinateAtomicTransaction.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -54,7 +54,7 @@
if (timeout > 0)
{
- TransactionReaper reaper = TransactionReaper.transactionReaper(true);
+ TransactionReaper reaper = TransactionReaper.transactionReaper();
reaper.insert(super.getControlWrapper(), timeout);
}
Modified: labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/interposition/ServerFactory.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/interposition/ServerFactory.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/interposition/ServerFactory.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -247,9 +247,6 @@
{
TransactionReaper reaper = TransactionReaper.transactionReaper();
- if (reaper == null)
- reaper = TransactionReaper.create();
-
reaper.insert(new ServerControlWrapper((ControlImple) tranControl), time_out);
}
Modified: labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/TransactionFactoryImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/TransactionFactoryImple.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/TransactionFactoryImple.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -177,9 +177,6 @@
TransactionReaper reaper = TransactionReaper.transactionReaper();
- if (reaper == null)
- reaper = TransactionReaper.create();
-
reaper.insert(new ControlWrapper((ControlImple) tranControl), theTimeout);
}
@@ -659,7 +656,7 @@
TransactionReaper reaper = TransactionReaper.transactionReaper();
- if (reaper == null)
+ if (reaper.checkingPeriod() == Long.MAX_VALUE)
info.reaperTimeout = 0;
else
info.reaperTimeout = (int) reaper.checkingPeriod();
@@ -716,16 +713,8 @@
TransactionReaper reaper = TransactionReaper.transactionReaper();
- /*
- * If the reaper has not been created yet, then all
- * transactions so far must have 0 timeout.
- */
+ info.timeout = reaper.getTimeout(ctx);
- if (reaper == null)
- info.timeout = 0;
- else
- info.timeout = (int) reaper.getTimeout(ctx);
-
info.numberOfThreads = ctx.getImplHandle().activeThreads();
return info;
Modified: labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/coordinator/ArjunaTransactionImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/coordinator/ArjunaTransactionImple.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/coordinator/ArjunaTransactionImple.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -2133,21 +2133,14 @@
* versions, there's a configurable option.
*/
- if (TransactionReaper.transactionReaper() != null)
- {
- if (_propagateRemainingTimeout)
- {
- long timeInMills = TransactionReaper.transactionReaper().getRemainingTimeoutMills(control);
- context.timeout = (int)(timeInMills/1000L);
- }
- else
- {
- context.timeout = TransactionReaper.transactionReaper().getTimeout(control);
- }
+ if (_propagateRemainingTimeout)
+ {
+ long timeInMills = TransactionReaper.transactionReaper().getRemainingTimeoutMills(control);
+ context.timeout = (int)(timeInMills/1000L);
}
- else
+ else
{
- context.timeout = 0;
+ context.timeout = TransactionReaper.transactionReaper().getTimeout(control);
}
}
Modified: labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/jts/OTSManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/jts/OTSManager.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/jts/OTSManager.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -115,11 +115,8 @@
* Just in case control is a top-level transaction, and has
* been registered with the reaper, we need to get it removed.
*
- * Don't bother if the reaper has not been created.
*/
-
- if (TransactionReaper.transactionReaper() != null)
- {
+
Coordinator coord = null;
try
@@ -160,7 +157,6 @@
coord = null;
}
- }
/*
* Watch out for conflicts with multiple threads deleting
@@ -207,11 +203,9 @@
* Just in case control is a top-level transaction, and has
* been registered with the reaper, we need to get it removed.
*
- * Don't bother if the reaper has not been created.
*/
- if (TransactionReaper.transactionReaper() != null)
- {
+
Coordinator coord = null;
try
@@ -239,7 +233,7 @@
coord = null;
}
- }
+
/*
* Watch out for conflicts with multiple threads deleting
Modified: labs/jbosstm/trunk/atsintegration/classes/com/arjuna/ats/jbossatx/jta/TransactionManagerService.java
===================================================================
--- labs/jbosstm/trunk/atsintegration/classes/com/arjuna/ats/jbossatx/jta/TransactionManagerService.java 2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/atsintegration/classes/com/arjuna/ats/jbossatx/jta/TransactionManagerService.java 2009-10-16 13:08:50 UTC (rev 29634)
@@ -35,7 +35,6 @@
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.common.Configuration;
-import com.arjuna.ats.arjuna.logging.tsLogger;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
@@ -68,7 +67,7 @@
log.info("JBossTS Transaction Service ("+mode+" version - tag:"+tag+") - JBoss Inc.");
// Associate transaction reaper with our context classloader.
- TransactionReaper.create();
+ TransactionReaper.transactionReaper();
}
public void destroy()
More information about the jboss-svn-commits
mailing list