[jboss-svn-commits] JBL Code SVN: r29634 - in labs/jbosstm/trunk: ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator and 10 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Oct 16 09:08:51 EDT 2009


Author: jhalliday
Date: 2009-10-16 09:08:50 -0400 (Fri, 16 Oct 2009)
New Revision: 29634

Added:
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElementManager.java
Modified:
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/AtomicAction.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperMonitorTest.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java
   labs/jbosstm/trunk/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java
   labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/TransactionImple.java
   labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/subordinate/jca/SubordinateAtomicTransaction.java
   labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/interposition/ServerFactory.java
   labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/TransactionFactoryImple.java
   labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/coordinator/ArjunaTransactionImple.java
   labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/jts/OTSManager.java
   labs/jbosstm/trunk/atsintegration/classes/com/arjuna/ats/jbossatx/jta/TransactionManagerService.java
Log:
transaction reaper changes. JBTM-624


Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/AtomicAction.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/AtomicAction.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/AtomicAction.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -143,7 +143,7 @@
 				_timeout = TxControl.getDefaultTimeout();
 
 			if (_timeout > 0)
-				TransactionReaper.transactionReaper(true).insert(this, _timeout);
+				TransactionReaper.transactionReaper().insert(this, _timeout);
 		}
 
 		return status;
@@ -181,7 +181,7 @@
 
 		ThreadActionData.popAction();
 
-		TransactionReaper.create().remove(this);
+		TransactionReaper.transactionReaper().remove(this);
 
 		return status;
 	}
@@ -205,7 +205,7 @@
 
 		ThreadActionData.popAction();
 
-		TransactionReaper.create().remove(this);
+		TransactionReaper.transactionReaper().remove(this);
 
 		return status;
 	}
@@ -219,7 +219,7 @@
 		 * the thread-to-tx association though.
 		 */
 
-		TransactionReaper.create().remove(this);
+		TransactionReaper.transactionReaper().remove(this);
 
 		return outcome;
 	}
@@ -233,7 +233,7 @@
 		 * the thread-to-tx association though.
 		 */
 
-		TransactionReaper.create().remove(this);
+		TransactionReaper.transactionReaper().remove(this);
 
 		return outcome;
 	}

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TransactionReaper.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -44,7 +44,6 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -117,13 +116,14 @@
 
 public class TransactionReaper
 {
+
     public static final String NORMAL = "NORMAL";
 
     public static final String DYNAMIC = "DYNAMIC";
 
     public static final String PERIODIC = "PERIODIC"; // the new name for 'NORMAL'
 
-    public TransactionReaper(long checkPeriod)
+    private TransactionReaper(long checkPeriod)
     {
         if (tsLogger.arjLogger.debugAllowed()) {
             tsLogger.arjLogger.debug(DebugLevel.CONSTRUCTORS,
@@ -138,20 +138,15 @@
     public final long checkingPeriod()
     {
         if (_dynamic) {
-            try {
-                return nextDynamicCheckTime.get() - System.currentTimeMillis();
-            }
-            catch (final NoSuchElementException nsee) {
-                return Long.MAX_VALUE; // list is empty, so we can sleep until something is inserted.
-            }
+            return nextDynamicCheckTime.get() - System.currentTimeMillis();
         } else {
             // if we have a cancel in progress which needs
             // checking up on then we have to wake up in time
             // for it whether we are using a static or
             // dynamic model
 
-            try {
-                final ReaperElement head = _transactions.first();
+            final ReaperElement head = _reaperElements.getFirst();
+            if(head != null) {
                 if (head._status != ReaperElement.RUN) {
                     long waitTime = head.getAbsoluteTimeout() - System.currentTimeMillis();
                     if (waitTime < _checkPeriod) {
@@ -159,8 +154,6 @@
                     }
                 }
             }
-            catch (final NoSuchElementException nsee) {
-            }
 
             return _checkPeriod;
         }
@@ -184,7 +177,7 @@
         }
 
         do {
-            final ReaperElement e;
+            final ReaperElement reaperElement;
 
             synchronized (this) {
                 final long now = System.currentTimeMillis();
@@ -202,10 +195,10 @@
                     break;
                 }
 
-                try {
-                    e = _transactions.first();
-                }
-                catch (final NoSuchElementException nsee) {
+                reaperElement = _reaperElements.getFirst();
+                // TODO close window where first can change - maybe record nextDynamicCheckTime before probing first,
+                // then use compareAndSet? Although something will need to check before sleeping anyhow...
+                if(reaperElement == null) {
                     nextDynamicCheckTime.set(Long.MAX_VALUE);
                     return;
                 }
@@ -213,7 +206,7 @@
 
             if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
                 tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TransactionReaper_18",
-                                new Object[] {e._control.get_uid(), e.statusName()});
+                                new Object[] {reaperElement._control.get_uid(), reaperElement.statusName()});
             }
 
             // if we have to synchronize on multiple objects we always
@@ -222,8 +215,8 @@
             // ensure we don't deadlock. We never sychronize on the
             // reaper and the cancel queue at the same time.
 
-            synchronized (e) {
-                switch (e._status) {
+            synchronized (reaperElement) {
+                switch (reaperElement._status) {
                     case ReaperElement.RUN: {
                         // this tx has just timed out. remove it from the
                         // TX list, update the timeout to take account of
@@ -231,16 +224,16 @@
                         // TX. this ensures we process it again if it does
                         // not get cancelled in time
 
-                        e._status = ReaperElement.SCHEDULE_CANCEL;
+                        reaperElement._status = ReaperElement.SCHEDULE_CANCEL;
 
-                        reinsertElement(e, _cancelWaitPeriod);
+                        reinsertElement(reaperElement, _cancelWaitPeriod);
 
                         if (tsLogger.arjLogger.debugAllowed()) {
                             tsLogger.arjLogger
                                     .debug(
                                             DebugLevel.FUNCTIONS,
                                             VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
-                                            "Reaper scheduling TX for cancellation " + e._control.get_uid());
+                                            "Reaper scheduling TX for cancellation " + reaperElement._control.get_uid());
                         }
 
                         // insert into cancellation queue for a worker
@@ -248,7 +241,7 @@
                         // thread is awake
 
                         synchronized (_workQueue) {
-                            _workQueue.add(e);
+                            _workQueue.add(reaperElement);
                             _workQueue.notifyAll();
                         }
                     }
@@ -266,14 +259,14 @@
                         // ensure the wedged TX entry comes to the
                         // front of the queue.
 
-                        reinsertElement(e, _cancelWaitPeriod);
+                        reinsertElement(reaperElement, _cancelWaitPeriod);
 
                         if (tsLogger.arjLogger.debugAllowed()) {
                             tsLogger.arjLogger
                                     .debug(
                                             DebugLevel.FUNCTIONS,
                                             VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
-                                            "Reaper deferring interrupt for TX scheduled for cancel " + e._control.get_uid());
+                                            "Reaper deferring interrupt for TX scheduled for cancel " + reaperElement._control.get_uid());
                         }
                     }
                     break;
@@ -284,11 +277,11 @@
                         // check to ensure the thread responded to
                         // the kick
 
-                        e._status = ReaperElement.CANCEL_INTERRUPTED;
+                        reaperElement._status = ReaperElement.CANCEL_INTERRUPTED;
 
-                        e._worker.interrupt();
+                        reaperElement._worker.interrupt();
 
-                        reinsertElement(e, _cancelFailWaitPeriod);
+                        reinsertElement(reaperElement, _cancelFailWaitPeriod);
 
                         // log that we interrupted cancel()
 
@@ -299,7 +292,7 @@
                                             VisibilityLevel.VIS_PUBLIC,
                                             FacilityCode.FAC_ATOMIC_ACTION,
                                             "com.arjuna.ats.arjuna.coordinator.TransactionReaper_4",
-                                            new Object[]{e._control.get_uid()});
+                                            new Object[]{reaperElement._control.get_uid()});
                         }
                     }
                     break;
@@ -311,7 +304,7 @@
                         // cancellations. then mark the
                         // transaction as rollback only.
 
-                        e._status = ReaperElement.ZOMBIE;
+                        reaperElement._status = ReaperElement.ZOMBIE;
 
                         synchronized (this) {
                             _zombieCount++;
@@ -321,7 +314,7 @@
                                         .debug(
                                                 DebugLevel.FUNCTIONS,
                                                 VisibilityLevel.VIS_PUBLIC,
-                                                FacilityCode.FAC_ATOMIC_ACTION, "Reaper " + Thread.currentThread() + " got a zombie " + e._worker + " (zombie count now " + _zombieCount + ") cancelling " + e._control.get_uid());
+                                                FacilityCode.FAC_ATOMIC_ACTION, "Reaper " + Thread.currentThread() + " got a zombie " + reaperElement._worker + " (zombie count now " + _zombieCount + ") cancelling " + reaperElement._control.get_uid());
                             }
 
                             if (_zombieCount == _zombieMax) {
@@ -345,8 +338,8 @@
                         if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
                             tsLogger.arjLoggerI18N.warn(
                                             "com.arjuna.ats.arjuna.coordinator.TransactionReaper_6",
-                                            new Object[]{e._worker,
-                                                    e._control.get_uid()});
+                                            new Object[]{reaperElement._worker,
+                                                    reaperElement._control.get_uid()});
                         }
 
                         // ok, since the worker was wedged we need to
@@ -355,27 +348,27 @@
                         // rollback only. we have to log a message
                         // whether we succeed, fail or get interrupted
 
-                        removeElementReaper(e);
+                        removeElementReaper(reaperElement);
 
                         try {
-                            if (e._control.preventCommit()) {
+                            if (reaperElement._control.preventCommit()) {
 
                                 // log a successful preventCommit()
 
                                 if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
                                     tsLogger.arjLoggerI18N.warn(
                                                     "com.arjuna.ats.arjuna.coordinator.TransactionReaper_10",
-                                                    new Object[]{e._control.get_uid()});
+                                                    new Object[]{reaperElement._control.get_uid()});
                                 }
 
-                                notifyListeners(e._control, false);
+                                notifyListeners(reaperElement._control, false);
                             } else {
                                 // log a failed preventCommit()
 
                                 if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
                                     tsLogger.arjLoggerI18N.warn(
                                                     "com.arjuna.ats.arjuna.coordinator.TransactionReaper_11",
-                                                    new Object[]{e._control.get_uid()});
+                                                    new Object[]{reaperElement._control.get_uid()});
                                 }
                             }
                         }
@@ -385,7 +378,7 @@
                             if (tsLogger.arjLoggerI18N.isWarnEnabled()) {
                                 tsLogger.arjLoggerI18N
                                         .warn("com.arjuna.ats.arjuna.coordinator.TransactionReaper_12",
-                                                new Object[]{e._control.get_uid()}, e1);
+                                                new Object[]{reaperElement._control.get_uid()}, e1);
                             }
                         }
                     }
@@ -398,7 +391,7 @@
                         // entry so we will steal in and do it
                         // first
 
-                        removeElementReaper(e);
+                        removeElementReaper(reaperElement);
                     }
                     break;
 
@@ -414,13 +407,9 @@
      */
     private void reinsertElement(ReaperElement e, long delay)
     {
-        _transactions.remove(e);
-        e.setAbsoluteTimeout((System.currentTimeMillis() + delay));
-        _transactions.add(e);
-
         synchronized (this) {
-            ReaperElement first = _transactions.first();
-            nextDynamicCheckTime.set(first.getAbsoluteTimeout());
+            long newWakeup = _reaperElements.reorder(e, delay);
+            nextDynamicCheckTime.set(newWakeup); // TODO - set should be atomic with reorder?
         }
     }
 
@@ -629,7 +618,7 @@
      */
     public final long numberOfTransactions()
     {
-        return _transactions.size();
+        return _reaperElements.size();
     }
 
     /**
@@ -679,12 +668,12 @@
         _lifetime.addAndGet(timeout);
 
         // insert the element only if it's not already present. We check _timeouts first, as elements
-        // maybe temporarily removed and reinserted in _transactions, so that is not as good a check.
-        // We use lazy eval to ensure we insert to _transactions only if we inserted to _timeouts.
-        // Note: removal works in reverse order i.e. _transactions then _timeouts.
-        if ((_timeouts.putIfAbsent(reaperElement._control, reaperElement) != null) || (!_transactions.add(reaperElement))) {
-            // we should not get an error here unless the user has coded the hashcode/equals/compareTo test wrong
-            // or they try inserting the same thing twice - is that really an error or expected to fail silent?
+        // maybe temporarily removed and reinserted in _reaperElements, so that is not as good a check.
+        // We use lazy eval to ensure we insert to _reaperElements only if we inserted to _timeouts.
+        // Note: removal works in reverse order i.e. _reaperElements then _timeouts.
+        if ((_timeouts.putIfAbsent(reaperElement._control, reaperElement) == null)) {
+            _reaperElements.add(reaperElement);
+        } else {
             throw new IllegalStateException(tsLogger.log_mesg.getString("com.arjuna.ats.arjuna.coordinator.TransactionReaper_1"));
         }
 
@@ -760,7 +749,7 @@
     {
         // arg is an Object because ArjunaTransactionImple.propagationContext does not have a Reapable
 
-        if ((_transactions.isEmpty()) || (control == null)) {
+        if ((_timeouts.isEmpty()) || (control == null)) {
             if (tsLogger.arjLogger.debugAllowed()) {
                 tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
                         VisibilityLevel.VIS_PUBLIC,
@@ -807,7 +796,7 @@
      */
     public final int getTimeout(Object control)
     {
-        if ((_transactions.isEmpty()) || (control == null)) {
+        if ((_timeouts.isEmpty()) || (control == null)) {
             if (tsLogger.arjLogger.debugAllowed()) {
                 tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
                         VisibilityLevel.VIS_PUBLIC,
@@ -863,22 +852,14 @@
                 */
 
             if (!waitForTransactions) {
-                Iterator iter = _transactions.iterator();
-                ReaperElement e;
-
-                while (iter.hasNext()) {
-                    e = (ReaperElement) iter.next();
-
-                    e.setAbsoluteTimeout(0);
-                }
+                _reaperElements.setAllTimeoutsToZero();
             }
 
             /*
                 * Wait for all of the transactions to
                 * terminate normally.
                 */
-
-            while (!_transactions.isEmpty()) {
+            while (!_reaperElements.isEmpty()) {
                 try {
                     this.wait();
                 }
@@ -886,7 +867,6 @@
                 }
             }
 
-
             _reaperThread.shutdown();
 
             notifyAll();
@@ -920,10 +900,10 @@
     // called (indirectly) by user code doing removals on e.g. commit/rollback
     // does not reset the wakeup time - we prefer leaving an unnecessary wakeup as it's
     // cheaper than locking to recalculate the new time here.
-    private final void removeElementClient(ReaperElement e)
+    private final void removeElementClient(ReaperElement reaperElement)
     {
-        _transactions.remove(e);
-        _timeouts.remove(e._control);
+        _reaperElements.remove(reaperElement);        
+        _timeouts.remove(reaperElement._control);
 
         // don't recalc time, just wake up as planned
 
@@ -942,16 +922,17 @@
     // called internally by the reaper when removing elements - note the different
     // behaviour with regard to check time recalculation. Here we need to ensure the
     // new time is correct.
-    private final void removeElementReaper(ReaperElement e)
+    private final void removeElementReaper(ReaperElement reaperElement)
     {
-        _transactions.remove(e);
-        _timeouts.remove(e._control);
+        _reaperElements.remove(reaperElement);
+        _timeouts.remove(reaperElement._control);
 
         synchronized (this) {
             try {
-                ReaperElement first = _transactions.first();
+                ReaperElement first = _reaperElements.getFirst();
                 nextDynamicCheckTime.set(first.getAbsoluteTimeout());
-            } catch(NoSuchElementException nsee) {
+                // TODO set needs tobe atomic to getFirst?
+            } catch(IndexOutOfBoundsException e) {
                 nextDynamicCheckTime.set(Long.MAX_VALUE);
 
                 if(_inShutdown) {
@@ -961,6 +942,8 @@
         }
     }
 
+
+
     private final void notifyListeners(Reapable element, boolean rollback)
     {
         // notify listeners. Ignore errors.
@@ -982,15 +965,16 @@
      * Currently we let the reaper thread run at same priority as other threads.
      * Could get priority from environment.
      */
-    public static synchronized TransactionReaper create(long checkPeriod)
+    public static synchronized void instantiate()
     {
-        if (tsLogger.arjLogger.debugAllowed()) {
-            tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
-                    VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
-                    "TransactionReaper::create ( " + checkPeriod + " )");
-        }
+        if (TransactionReaper._theReaper == null)
+        {
+            if (tsLogger.arjLogger.debugAllowed()) {
+                tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS,
+                        VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_ATOMIC_ACTION,
+                        "TransactionReaper::instantiate()");
+            }
 
-        if (TransactionReaper._theReaper == null) {
             // default to dynamic mode
             TransactionReaper._dynamic = true;
 
@@ -1008,11 +992,10 @@
                 }
             }
 
+            long checkPeriod = Long.MAX_VALUE;
             if (!TransactionReaper._dynamic) {
                 checkPeriod = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperTimeout();
-            } else
-                checkPeriod = Long.MAX_VALUE;
-
+            }
             TransactionReaper._theReaper = new TransactionReaper(checkPeriod);
 
             TransactionReaper._theReaper._cancelWaitPeriod = arjPropertyManager.getCoordinatorEnvironmentBean().getTxReaperCancelWaitPeriod();
@@ -1055,31 +1038,22 @@
 
             _reaperWorkerThread.start();
         }
-
-        return TransactionReaper._theReaper;
     }
 
-    public static TransactionReaper create()
-    {
-        return create(TransactionReaper.defaultCheckPeriod);
+    /**
+     * Starting with 4.8, this method will always return an instance, will never return null.
+     * This causes the reaper to be instantiated unnecessarily in some cases, but that's cheaper
+     * than the alternatives.
+     *
+     * @return a TransactionReaper singleton.
+     */
+    public static TransactionReaper transactionReaper() {
+        if(_theReaper == null) {
+            instantiate();
+        }
+        return _theReaper;
     }
 
-    public static TransactionReaper transactionReaper()
-    {
-        return transactionReaper(false);
-    }
-
-    /*
-      * If parameter is true then do a create.
-      */
-    public static synchronized TransactionReaper transactionReaper(boolean createReaper)
-    {
-        if (createReaper)
-            return create();
-        else
-            return _theReaper;
-    }
-
     /**
      * Terminate the transaction reaper. This is a synchronous operation
      * and will only return once the reaper has been shutdown cleanly.
@@ -1117,12 +1091,12 @@
     public static final long defaultCancelFailWaitPeriod = 500; // in milliseconds
     public static final int defaultZombieMax = 8;
 
-    static final void reset()
+    static final synchronized void reset()
     {
         _theReaper = null;
     }
 
-    private final SortedSet<ReaperElement> _transactions = new ConcurrentSkipListSet<ReaperElement>();
+    private final ReaperElementManager _reaperElements = new ReaperElementManager();
 
     // The keys are actually Reapable, as that's what insert takes. However, some functions use get(Object)
     // and rely on clever hashcode/equals behaviour, especially for the JTS. Thus the generics key type is Object.
@@ -1135,7 +1109,7 @@
     private long _checkPeriod = 0;
 
     // Although it is atomic, writes (but not reads) need to by synchronized(this) i.e. on the TransactionReaper instance
-    // in order to ensure proper timing with respect to wait/notify and wakeups on the _transactions queue.
+    // in order to ensure proper timing with respect to wait/notify and wakeups on the _reaperElements queue.
     private final AtomicLong nextDynamicCheckTime = new AtomicLong(Long.MAX_VALUE);
 
     /**
@@ -1158,7 +1132,7 @@
      */
     private int _zombieMax = 0;
 
-    private static TransactionReaper _theReaper = null;
+    private static volatile TransactionReaper _theReaper = null;
 
     private static ReaperThread _reaperThread = null;
 

Added: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElementManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElementManager.java	                        (rev 0)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/coordinator/ReaperElementManager.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -0,0 +1,175 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2009,
+ * @author JBoss by Red Hat.
+ */
+package com.arjuna.ats.internal.arjuna.coordinator;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/*
+ * Encapsulation of a specialised data structure with API and performance characteristics
+ * designed specifically for use by the transaction reaper.
+ *
+ * ReaperElements represent transactions which need timing out. To do this, the reaper needs
+ * to wake periodically and process any timeouts that are due. New elements are added on transaction
+ * creation and will be removed prior to their timeout if they terminate normally.
+ *
+ * For high concurrency, normal inserts and removes should not block. However, to determine the next element
+ * which needs (will need) processing, the elements must be ordered or at least searched. These requirements
+ * are in conflict, since ordering/searching requires stability i.e. locking.
+ *
+ * To achieve the desired performance characteristics, we combine two data structures: an unsorted, concurrent
+ * collection and a sorted, non-threadsafe one which is guarded by the the ReaperElementManager instance lock.
+ *
+ * Inserts are done, potentially concurrently, to the unsorted hash set. Removes likewise check this first
+ * and can return successfully without blocking if the element is found in this collection. Thus the insert/remove
+ * are cheap operations.
+ *
+ * When it is required to know the smallest (i.e. earliest to timeout) element, the contents
+ * of the unsorted set are moved to the sorted set. Since this happens infrequently compared to the insert/delete,
+ * only a fraction of the elements inserted should ever be copied - most will be removed without ever migrating.
+ *
+ * Note that additional external synchronization will be needed to ensure first element does not change
+ * between getFirst and any operation depending on its timeout value. This is the TransactionReaper's problem. 
+ *
+ * The sorted set is maintained manually, rather than using Collections.sort or other comparator based structure.
+ * This is because compareTo on reaper elements is relatively expensive and we wish to avoid liner scans to minimise
+ * the number of such calls. Hence we prefer ArrayList with binary search, despite the higher insert/remove cost
+ * compared to LinkedList.
+ *
+ * Pay careful attention to locking and performance characteristics if altering this class.
+ *
+ *
+ * @author Jonathan Halliday (jonathan.halliday at redhat.com) 2009-10
+ */
+public class ReaperElementManager
+{
+    /**
+     * @return the first (i.e. earliest to time out) element of the colleciton.
+     */
+    public synchronized ReaperElement getFirst() {
+        flushPending(); // we need to order the elements before we can tell which is first.
+        if(elementsOrderedByTimeout.isEmpty()) {
+            return null;
+        } else {
+            return elementsOrderedByTimeout.get(0);
+        }
+    }
+
+    // note - unsynchronized for performance.
+    public void add(ReaperElement reaperElement) throws IllegalStateException {
+        if(pendingInsertions.putIfAbsent(reaperElement, reaperElement) != null) {
+            // note this is best effort - we'll allow double inserts if the element is also in the ordered set.
+            throw new IllegalStateException();
+        }
+    }
+
+    /**
+     * @param reaperElement the reaper element to reorder in the sorted set.
+     * @param delayMillis the ammout of time to increment the element's timeout by.
+     * @return the new soonest timeout in the set (not necessarily that of the reordered element)
+     */
+    public synchronized long reorder(ReaperElement reaperElement, long delayMillis) {
+        // assume it must be in the sorted list, as it was likely obtained via getFirst...
+        removeSorted(reaperElement);
+        // we could add delay to the original timeout, but using current time is probably safer.
+        reaperElement.setAbsoluteTimeout((System.currentTimeMillis() + delayMillis));
+        // reinsert into its new position.
+        insertSorted(reaperElement);
+
+        // getFirst takes care of flushing the pending set for us.
+        return getFirst().getAbsoluteTimeout();
+    }
+
+    // use only for testing, it's nasty from a performance perspective.
+    public synchronized int size() {
+        return (elementsOrderedByTimeout.size() + pendingInsertions.size());
+    }
+
+    public synchronized boolean isEmpty() {
+        return (elementsOrderedByTimeout.isEmpty() && pendingInsertions.isEmpty());
+    }
+
+    // strange hack to force instant expire of tx during shutdown.
+    public synchronized void setAllTimeoutsToZero() {
+        flushPending();
+        for(ReaperElement reaperElement : elementsOrderedByTimeout) {
+            reaperElement.setAbsoluteTimeout(0);
+        }
+    }
+
+    // Note - mostly unsynchronized for performance.
+    public void remove(ReaperElement reaperElement) {
+        if(pendingInsertions.remove(reaperElement) != null) {
+            return;
+        }
+
+        // we missed finding it in the unsorted set - perhaps it has already been copied to the sorted set...
+        synchronized(this) {
+            removeSorted(reaperElement);
+        }
+    }
+
+    ////////////
+
+    // Private methods and structures are guarded where needed by ReaperElementManager instance locks in the
+    // public methods - see class header doc comments for concurrency/performance info.
+
+    private final ArrayList<ReaperElement> elementsOrderedByTimeout = new ArrayList<ReaperElement>();
+    private final ConcurrentHashMap<ReaperElement, ReaperElement> pendingInsertions = new ConcurrentHashMap<ReaperElement, ReaperElement>();
+
+    private void removeSorted(ReaperElement reaperElement) {
+        int location = Collections.binarySearch(elementsOrderedByTimeout, reaperElement);
+        if(location >= 0) {
+            elementsOrderedByTimeout.remove(location);
+        }
+    }
+
+    private void insertSorted(ReaperElement reaperElement) {
+        int location = Collections.binarySearch(elementsOrderedByTimeout, reaperElement);
+        if(location >= 0) {
+            throw new IllegalStateException();
+        }
+        int insertionPoint = -(location + 1);
+        elementsOrderedByTimeout.add(insertionPoint, reaperElement);
+    }
+
+    private void flushPending() {
+
+        // purge the pending inserts before doing anything else. This is potentially expensive.
+        // Future versions may prefer to insert only a portion of the pending set, or
+        // iterate it each time to determine the smallest (head) element.
+        Set<Map.Entry<ReaperElement,ReaperElement>> entrySet = pendingInsertions.entrySet();
+        if(entrySet != null) {
+            Iterator<Map.Entry<ReaperElement, ReaperElement>> queueIter = entrySet.iterator();
+            // iterator is weakly consistent - will traverse elements present at its time of creation,
+            // may or may not see later updates.
+            while(queueIter.hasNext()) {
+                Map.Entry<ReaperElement,ReaperElement> entry = queueIter.next();
+                ReaperElement element = entry.getValue();
+                // insert/remove not locked, so we are careful to check that we don't insert
+                // an element that has been removed from the pending set by a concurrent thread.
+                if(entrySet.remove(entry)) {
+                    insertSorted(element);
+                }
+            }
+        }
+    }
+}

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperMonitorTest.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperMonitorTest.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperMonitorTest.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -67,7 +67,6 @@
     @Test
     public void test()
     {
-        TransactionReaper.create(100);
         TransactionReaper reaper = TransactionReaper.transactionReaper();
         DummyMonitor listener = new DummyMonitor();
        

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -43,7 +43,6 @@
     {
 
         // test set+readback of interval
-        TransactionReaper.create(100);
         TransactionReaper reaper = TransactionReaper.transactionReaper();
         // set value is ignored in default DYNAMIC mode, it uses max long instead.
         assertEquals(Long.MAX_VALUE, reaper.checkingPeriod());

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase2.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -21,10 +21,7 @@
 package com.hp.mwtests.ts.arjuna.reaper;
 
 import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
-import com.arjuna.ats.arjuna.coordinator.Reapable;
-import com.arjuna.ats.arjuna.coordinator.ActionStatus;
 import com.arjuna.ats.arjuna.common.Uid;
-import com.arjuna.ats.arjuna.logging.tsLogger;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -42,7 +39,6 @@
     @Test
     public void testReaper() throws Exception
     {
-        TransactionReaper.create(100);
         TransactionReaper reaper = TransactionReaper.transactionReaper();
 
         // create slow reapables some of which will not respond immediately

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/reaper/ReaperTestCase3.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -33,7 +33,6 @@
     @Test
     public void testReaperWait() throws Exception
     {
-        TransactionReaper.create(500);
         TransactionReaper reaper = TransactionReaper.transactionReaper();
 
         // give the reaper worker time to start too
@@ -144,7 +143,6 @@
     @Test
     public void testReaperForce() throws Exception
     {
-        TransactionReaper.create(5000);
         TransactionReaper reaper = TransactionReaper.transactionReaper();
 
         // give the reaper worker time to start too

Modified: labs/jbosstm/trunk/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTA/jta/classes/com/arjuna/ats/internal/jta/transaction/arjunacore/subordinate/SubordinateAtomicAction.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -61,7 +61,7 @@
 		// if it has a non-negative timeout, add it to the reaper.
 
 		if (timeout > AtomicAction.NO_TIMEOUT)
-			TransactionReaper.transactionReaper(true).insert(this, timeout);
+			TransactionReaper.transactionReaper().insert(this, timeout);
 	}
 
 	/**

Modified: labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/TransactionImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/TransactionImple.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/TransactionImple.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -55,7 +55,6 @@
 import com.arjuna.ats.jta.exceptions.InactiveTransactionException;
 import com.arjuna.ats.jta.exceptions.InvalidTerminationStateException;
 import com.arjuna.ats.jta.logging.*;
-import com.arjuna.ats.jts.common.jtsPropertyManager;
 
 import com.arjuna.ats.internal.jta.xa.TxInfo;
 

Modified: labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/subordinate/jca/SubordinateAtomicTransaction.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/subordinate/jca/SubordinateAtomicTransaction.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/subordinate/jca/SubordinateAtomicTransaction.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -54,7 +54,7 @@
 		
 		if (timeout > 0)
 		{
-			TransactionReaper reaper = TransactionReaper.transactionReaper(true);
+			TransactionReaper reaper = TransactionReaper.transactionReaper();
 			
 			reaper.insert(super.getControlWrapper(), timeout);
 		}

Modified: labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/interposition/ServerFactory.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/interposition/ServerFactory.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/interposition/ServerFactory.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -247,9 +247,6 @@
 		{
 			TransactionReaper reaper = TransactionReaper.transactionReaper();
 
-			if (reaper == null)
-				reaper = TransactionReaper.create();
-
 			reaper.insert(new ServerControlWrapper((ControlImple) tranControl), time_out);
 		}
 

Modified: labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/TransactionFactoryImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/TransactionFactoryImple.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/TransactionFactoryImple.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -177,9 +177,6 @@
 
 				TransactionReaper reaper = TransactionReaper.transactionReaper();
 
-				if (reaper == null)
-					reaper = TransactionReaper.create();
-
 				reaper.insert(new ControlWrapper((ControlImple) tranControl), theTimeout);
 			}
 
@@ -659,7 +656,7 @@
 
 		TransactionReaper reaper = TransactionReaper.transactionReaper();
 
-		if (reaper == null)
+		if (reaper.checkingPeriod() == Long.MAX_VALUE)
 			info.reaperTimeout = 0;
 		else
 			info.reaperTimeout = (int) reaper.checkingPeriod();
@@ -716,16 +713,8 @@
 
 						TransactionReaper reaper = TransactionReaper.transactionReaper();
 
-						/*
-						 * If the reaper has not been created yet, then all
-						 * transactions so far must have 0 timeout.
-						 */
+						info.timeout = reaper.getTimeout(ctx);
 
-						if (reaper == null)
-							info.timeout = 0;
-						else
-							info.timeout = (int) reaper.getTimeout(ctx);
-
 						info.numberOfThreads = ctx.getImplHandle().activeThreads();
 
 						return info;

Modified: labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/coordinator/ArjunaTransactionImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/coordinator/ArjunaTransactionImple.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/internal/jts/orbspecific/coordinator/ArjunaTransactionImple.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -2133,21 +2133,14 @@
 						 * versions, there's a configurable option.
 						 */
 
-						if (TransactionReaper.transactionReaper() != null)
-						{
-						    if (_propagateRemainingTimeout)
-                            {
-                                long timeInMills = TransactionReaper.transactionReaper().getRemainingTimeoutMills(control);
-                                context.timeout = (int)(timeInMills/1000L);
-                            }
-                            else
-                            {
-                                context.timeout = TransactionReaper.transactionReaper().getTimeout(control);
-                            }
+                        if (_propagateRemainingTimeout)
+                        {
+                            long timeInMills = TransactionReaper.transactionReaper().getRemainingTimeoutMills(control);
+                            context.timeout = (int)(timeInMills/1000L);
                         }
-						else
+                        else
                         {
-                            context.timeout = 0;
+                            context.timeout = TransactionReaper.transactionReaper().getTimeout(control);
                         }
                     }
 

Modified: labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/jts/OTSManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/jts/OTSManager.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/ArjunaJTS/jts/classes/com/arjuna/ats/jts/OTSManager.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -115,11 +115,8 @@
 	 * Just in case control is a top-level transaction, and has
 	 * been registered with the reaper, we need to get it removed.
 	 *
-	 * Don't bother if the reaper has not been created.
 	 */
-    
-	if (TransactionReaper.transactionReaper() != null)
-	{
+
 	    Coordinator coord = null;
 	
 	    try
@@ -160,7 +157,6 @@
 
 		coord = null;
 	    }
-	}
 	
 	/*
 	 * Watch out for conflicts with multiple threads deleting
@@ -207,11 +203,9 @@
 	     * Just in case control is a top-level transaction, and has
 	     * been registered with the reaper, we need to get it removed.
 	     *
-	     * Don't bother if the reaper has not been created.
 	     */
     
-	    if (TransactionReaper.transactionReaper() != null)
-	    {
+
 		Coordinator coord = null;
 	
 		try
@@ -239,7 +233,7 @@
 
 		    coord = null;
 		}
-	    }
+
     
 	    /*
 	     * Watch out for conflicts with multiple threads deleting

Modified: labs/jbosstm/trunk/atsintegration/classes/com/arjuna/ats/jbossatx/jta/TransactionManagerService.java
===================================================================
--- labs/jbosstm/trunk/atsintegration/classes/com/arjuna/ats/jbossatx/jta/TransactionManagerService.java	2009-10-16 09:10:21 UTC (rev 29633)
+++ labs/jbosstm/trunk/atsintegration/classes/com/arjuna/ats/jbossatx/jta/TransactionManagerService.java	2009-10-16 13:08:50 UTC (rev 29634)
@@ -35,7 +35,6 @@
 
 import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
 import com.arjuna.ats.arjuna.common.Configuration;
-import com.arjuna.ats.arjuna.logging.tsLogger;
 
 import javax.transaction.TransactionManager;
 import javax.transaction.UserTransaction;
@@ -68,7 +67,7 @@
         log.info("JBossTS Transaction Service ("+mode+" version - tag:"+tag+") - JBoss Inc.");
 
         // Associate transaction reaper with our context classloader.
-        TransactionReaper.create();
+        TransactionReaper.transactionReaper();
 	}
 
     public void destroy()



More information about the jboss-svn-commits mailing list