[jboss-svn-commits] JBL Code SVN: r19224 - in labs/jbosstm/trunk: ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery and 5 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Mar 25 06:12:55 EDT 2008
Author: adinn
Date: 2008-03-25 06:12:55 -0400 (Tue, 25 Mar 2008)
New Revision: 19224
Added:
labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/
labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/
labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ACCoordinatorRecoveryModule.java
labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/Implementations.java
labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ParticipantRecordSetup.java
labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/RecoverACCoordinator.java
Modified:
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java
labs/jbosstm/trunk/ArjunaCore/arjuna/services/classes/com/arjuna/ats/arjuna/services/recovery/RecoveryManagerService.java
labs/jbosstm/trunk/ArjunaJTS/trailmap/src/com/arjuna/demo/recovery/xaresource/TestXAResourceRecovery.java
labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/XTSService.java
Log:
Modified ArjunaCore class PeriodicRecovery and related Recovery
Manager Service classes to make recovery fully thread safe and provide
coherent semantics for stopping and resuming recovery scanning. Also
added support for dynamic addition and removal of recovery modules as
required by XTS code
Modified ArjunaJTS recoevry test code to provide new required
parameter for RecoveryManager.stop()
Modified XTS code to dynamically register and unregister recovery
module for class ACCoordinator.
Fixes for JBTM-346
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java 2008-03-25 06:30:57 UTC (rev 19223)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -160,20 +160,30 @@
}
/**
- * Stop the periodic recovery manager.
+ * Stop the periodic recovery manager waiting for any recovery scan in progress to complete
*/
public final void stop ()
{
- _theImple.stop();
+ stop(false);
}
-
+
/**
+ * Stop the periodic recovery manager.
+ * @param async false means wait for any recovery scan in progress to complete
+ */
+
+
+ public final void stop (boolean async)
+ {
+ _theImple.stop(async);
+ }
+ /**
* Suspend the recovery manager. If the recovery manager is in the process of
* doing recovery scans then it will be suspended afterwards, in order to
* preserve data integrity.
*
- * @param async wait for the recovery manager to finish any scans before returning.
+ * @param async false means wait for the recovery manager to finish any scans before returning.
*/
public void suspendScan (boolean async)
@@ -205,6 +215,22 @@
_theImple.addModule(module);
}
+ /**
+ * Remove a recovery module from the system.
+ *
+ * @param module The module to add.
+ */
+
+ public final void removeModule (RecoveryModule module)
+ {
+ _theImple.removeModule(module);
+ }
+
+ /**
+ * Obtain a snapshot list of available recovery modules.
+ * @return a snapshot list of the currently installed recovery modules
+ */
+
public final Vector getModules ()
{
return _theImple.getModules();
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java 2008-03-25 06:30:57 UTC (rev 19223)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -55,6 +55,8 @@
* modules. These modules are dynamically loaded. The modules to load
* are specified by properties beginning with "RecoveryExtension"
* <P>
+ * n.b. recovery scans may be performed by this object (it is a thread and may be started as a background task)
+ * and by other ad hoc threads
* @author
* @version $Id: PeriodicRecovery.java 2342 2006-03-30 13:06:17Z $
*
@@ -72,26 +74,69 @@
public class PeriodicRecovery extends Thread
{
+ /***** public API *****/
/*
* TODO uncomment for JDK 1.5.
*
- public static enum State
+ public static enum Status
{
- created, active, terminated, suspended, scanning
+ INACTIVE, SCANNING
}
+
+ public static enum Mode
+ {
+ ENABLED, SUSPENDED, TERMINATED
+ }
*/
- public class State
+ /**
+ * state values indicating whether or not some thread is currently scanning. used to define values of field
+ * {@link PeriodicRecovery#_currentStatus}
+ */
+ public class Status
{
- public static final int created = 0;
- public static final int active = 1;
- public static final int terminated = 2;
- public static final int suspended = 3;
- public static final int scanning = 4;
+ /**
+ * state value indicating that no thread is scanning
+ */
+ public static final int INACTIVE = 0;
+ /**
+ * state value indicating that some thread is scanning.
+ * n.b. the scanning thread may not be the singleton PeriodicRecovery thread instance
+ */
+ public static final int SCANNING = 1;
- private State () {}
+ private Status() { }
}
+ /**
+ * state values indicating operating mode of scanning process for ad hoc threads and controlling behaviour of
+ * singleton periodic recovery thread. used to define values of field {@link PeriodicRecovery#_currentMode}
+ *
+ * n.b. {@link PeriodicRecovery#_currentStatus} may not transition to state SCANNING when
+ * {@link PeriodicRecovery#_currentStatus} is in state SUSPENDED or TERMINATED. However, if a scan is in
+ * progress when {@link PeriodicRecovery#_currentMode} transitions to state SUSPENDED or TERMINATED
+ * {@link PeriodicRecovery#_currentStatus} may (temporarily) remain in state SCANNING before transitioning
+ * to state INACTIVE.
+ */
+ public class Mode
+ {
+ /**
+ * state value indicating that new scans may proceed
+ */
+ public static final int ENABLED = 0;
+ /**
+ * state value indicating that new scans may not proceed and the periodic recovery thread should suspend
+ */
+ public static final int SUSPENDED = 1;
+ /**
+ * state value indicating that new scans may not proceed and that the singleton
+ * PeriodicRecovery thread instance should exit if it is still running
+ */
+ public static final int TERMINATED = 2;
+
+ private Mode() { }
+ }
+
public PeriodicRecovery (boolean threaded)
{
initialise();
@@ -117,70 +162,134 @@
if (threaded)
{
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: starting background scanner thread" );
+ }
start();
}
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: starting listener worker thread" );
+ }
+
_listener.start();
}
- public int getStatus ()
- {
- synchronized (_stateLock)
- {
- return _currentState;
- }
- }
-
- public void setStatus (int s)
- {
- synchronized (_stateLock)
- {
- _currentState = s;
- }
- }
-
- public void shutdown ()
+ /**
+ * initiate termination of the periodic recovery thread and stop any subsequent scan requests from proceeding.
+ *
+ * this switches the recovery operation mode to TERMINATED. if a scan is in progress when this method is called
+ * and has not yet started phase 2 of its scan it will be forced to return before completing phase 2.
+ *
+ * @param async false if the calling thread should wait for any in-progress scan to complete before returning
+ */
+ public void shutdown (boolean async)
{
- setStatus(State.terminated);
+ synchronized (_stateLock) {
+ if (getMode() != Mode.TERMINATED) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: Mode <== TERMINATED" );
+ }
+ setMode(Mode.TERMINATED);
+ _stateLock.notifyAll();
+ }
- this.interrupt();
+ if (!async) {
+ // synchronous, so we keep waiting until the currently active scan stops or scanning
+ // changes to TERMINATED
+ while (getStatus() == Status.SCANNING) {
+ try {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: shutdown waiting for scan to end" );
+ }
+ _stateLock.wait();
+ } catch(InterruptedException ie) {
+ // just ignore and retest condition
+ }
+ }
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: shutdown scan wait complete" );
+ }
+ }
+ }
}
+ /**
+ * make all scanning operations suspend.
+ *
+ * This switches the recovery operation mode to SUSPENDED. Any attempt to start a new scan either by an ad hoc
+ * threads or by the periodic recovery thread will suspend its thread until the mode changes. If a scan is in
+ * progress when this method is called it will complete its scan without suspending.
+ *
+ * @param async false if the calling thread should wait for any in-progress scan to complete before returning
+ */
+
public void suspendScan (boolean async)
{
- synchronized (_signal)
+ synchronized (_stateLock)
{
- setStatus(State.suspended);
+ // only switch and kick everyone if we are currently ENABLED
- this.interrupt();
-
- if (!async)
- {
- try
- {
- _signal.wait();
- }
- catch (InterruptedException ex)
- {
- }
- }
+ if (getMode() == Mode.ENABLED) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: Mode <== SUSPENDED" );
+ }
+ setMode(Mode.SUSPENDED);
+ _stateLock.notifyAll();
+ }
+ if (!async) {
+ // synchronous, so we keep waiting until the currently active scan stops
+ while (getStatus() == Status.SCANNING) {
+ try {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: suspendScan waiting for scan to end" );
+ }
+ _stateLock.wait();
+ } catch(InterruptedException ie) {
+ // just ignore and retest condition
+ }
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: suspendScan scan wait compelete" );
+ }
+ }
+ }
}
}
+ /**
+ * resume scanning operations
+ *
+ * This switches the recovery operation mode from SUSPENDED to RESUMED. Any threads which suspended when
+ * they tried to start a scan will be woken up by this transition.
+ */
public void resumeScan ()
{
- /*
- * If it's suspended, then it has to be blocked
- * on the lock.
- */
-
- if (getStatus() == State.suspended)
+ synchronized (_stateLock)
{
- setStatus(State.active);
-
- synchronized (_suspendLock)
- {
- _suspendLock.notify();
+ if (getMode() == Mode.SUSPENDED) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: Mode <== ENABLED" );
+ }
+ setMode(Mode.ENABLED);
+ _stateLock.notifyAll();
}
}
}
@@ -191,7 +300,7 @@
* otherwise return a default port.
*/
- public static final ServerSocket getServerSocket () throws IOException
+ public static ServerSocket getServerSocket () throws IOException
{
if (_socket == null)
{
@@ -222,7 +331,7 @@
}
/**
- * Start the background thread to perform the periodic recovery
+ * Implements the background thread which performs the periodic recovery
*/
public void run ()
@@ -231,161 +340,458 @@
do
{
- checkSuspended();
+ boolean workToDo = false;
+ // ok, get to the point where we are ready to start a scan
+ synchronized(_stateLock) {
+ if (getStatus() == Status.SCANNING) {
+ // need to wait for some other scan to finish
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: background thread waiting on other scan" );
+ }
+ doScanningWait();
+ if (getMode() == Mode.ENABLED) {
+ // the last guy just finished scanning so we ought to wait a bit rather than just
+ // pile straight in to do some work
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: background thread backing off" );
+ }
+ doPeriodicWait();
+ // if we got told to stop then do so
+ finished = (getMode() == Mode.TERMINATED);
+ }
+ } else {
+ // status == INACTIVE so we can go ahead and scan if scanning is enabled
+ switch (getMode()) {
+ case Mode.ENABLED:
+ // ok grab our chance to be the scanning thread
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: background thread Status <== SCANNING" );
+ }
+ setStatus(Status.SCANNING);
+ workToDo = true;
+ break;
+ case Mode.SUSPENDED:
+ // we need to wait while we are suspended
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: background thread wait while SUSPENDED" );
+ }
+ doSuspendedWait();
+ // we come out of here with the lock and either ENABLED or TERMINATED
+ finished = (getMode() == Mode.TERMINATED);
+ break;
+ case Mode.TERMINATED:
+ finished = true;
+ break;
+ }
+ }
+ }
- finished = doWork(true);
+ // its ok to start work if requested -- we cannot be stopped now by a mode change to SUSPEND
+ // or TERMINATE until we get through phase 1 and maybe phase 2 if we are lucky
+ if (workToDo) {
+ // we are in state SCANNING so actually do the scan
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: background thread scanning");
+ }
+ doWorkInternal();
+ // clear the SCANNING state now we have done
+ synchronized(_stateLock) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: background thread Status <== INACTIVE");
+ }
+ setStatus(Status.INACTIVE);
+ // check if we need to notify the listener worker that we just finsihsed a scan
+ notifyWorker();
+
+ if (getMode() == Mode.ENABLED) {
+ // we managed a full scan and scanning is still enabled
+ // so wait a bit before the next attempt
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: background thread backing off" );
+ }
+ doPeriodicWait();
+ }
+ finished = (getMode() == Mode.TERMINATED);
+ }
+ }
} while (!finished);
+
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: background thread exiting" );
+ }
}
/**
- * Perform the recovery scans on all registered modules.
+ * Perform a recovery scan on all registered modules.
*
- * @param boolean periodic If <code>true</code> then this is being called
- * as part of the normal periodic running of the manager and we'll sleep
- * after phase 2 work. Otherwise, we're being called directly and there should
- * be no sleep after phase 2.
- *
- * @return <code>true</code> if the manager has been instructed to finish,
- * <code>false</code> otherwise.
+ * @caveats if a scan is already in progress this method will wait for it to complete otherwise it will
+ * perform its own scan before returning. If scanning is suspended this will require waiting for scanning
+ * to resume.
*/
- public final synchronized boolean doWork (boolean periodic)
+ public final void doWork ()
{
- boolean interrupted = false;
+ boolean workToDo = false;
- /*
- * If we're suspended or already scanning, then ignore.
- */
+ synchronized(_stateLock) {
+ if (getMode() == Mode.SUSPENDED) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: ad hoc thread wait while SUSPENDED" );
+ }
+ doSuspendedWait();
+ }
- synchronized (_stateLock)
- {
- if (getStatus() != State.active)
- {
- if (tsLogger.arjLoggerI18N.isInfoEnabled())
- {
- tsLogger.arjLoggerI18N.info("com.arjuna.ats.internal.arjuna.recovery.PeriodicRecovery_10", new Object[]{new Integer(getStatus())});
- }
+ // no longer SUSPENDED -- retest in case we got TERMINATED
- return false;
- }
+ if (getMode() == Mode.TERMINATED) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: ad hoc thread scan TERMINATED" );
+ }
+ } else {
- setStatus(State.scanning);
- }
+ // ok scanning must be enabled -- see if we can start a scan or whether we have to wait on another one
- tsLogger.arjLogger.info("Periodic recovery - first pass <" +
- _theTimestamper.format(new Date()) + ">" );
+ if (getStatus() == Status.SCANNING) {
+ // just wait for the other scan to finish
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: ad hoc thread waiting on other scan" );
+ }
+ doScanningWait();
+ } else {
- Enumeration modules = _recoveryModules.elements();
+ // ok grab our chance to start a scan
+ setStatus(Status.SCANNING);
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: ad hoc thread Status <== SCANNING" );
+ }
+ workToDo = true;
+ }
+ }
+ }
- while (modules.hasMoreElements())
- {
- RecoveryModule m = (RecoveryModule) modules.nextElement();
+ if (workToDo) {
+ // ok to start work -- we cannot be stopped now by a mode change to SUSPEND or TERMINATE
+ // until we get through phase 1 and maybe phase 2 if we are lucky
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: ad hoc thread scanning");
+ }
+ doWorkInternal();
- m.periodicWorkFirstPass();
+ // clear the scan for some other thread to have a go
+ synchronized(_stateLock) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: ad hoc thread Status <== INACTIVE");
+ }
+ setStatus(Status.INACTIVE);
+ // check if we need to notify the listener worker that we just finsihsed a scan
+ notifyWorker();
+ }
+ }
+ }
- if (tsLogger.arjLogger.isDebugEnabled())
- {
- tsLogger.arjLogger.debug( DebugLevel.FUNCTIONS,
- VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_CRASH_RECOVERY,
- " " );
- }
- }
+ /**
+ * called by the listener worker to wake the periodic recovery thread and get it to start a scan if one
+ * is not already in progress
+ */
- if (interrupted)
- {
- interrupted = false;
+ public void wakeUp()
+ {
+ synchronized (_stateLock) {
+ _workerScanRequested = true;
+ // wake up the periodic recovery thread if no scan is in progress
+ if (getStatus() != Status.SCANNING) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: listener worker interrupts background thread");
+ }
+ this.interrupt();
+ }
+ }
+ }
- _workerService.signalDone();
- }
+ /**
+ * Add the specified module to the end of the recovery module list.
+ * There is no way to specify relative ordering of recovery modules
+ * with respect to modules loaded via the property file.
+ *
+ * @param module The module to append.
+ */
- // wait for a bit to avoid catching (too many) transactions etc. that
- // are really progressing quite happily
+ public final void addModule (RecoveryModule module)
+ {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: adding module " + module.getClass().getName());
+ }
+ _recoveryModules.add(module);
+ }
- try
- {
- Thread.sleep( _backoffPeriod * 1000 );
- }
- catch ( InterruptedException ie )
- {
- interrupted = true;
- }
+ /**
+ * remove a recovery module from the recovery modules list
+ * @param module the module to be removed
+ */
+ public final void removeModule (RecoveryModule module)
+ {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: removing module " + module.getClass().getName());
+ }
+ _recoveryModules.remove(module);
+ }
- if (getStatus() == State.terminated)
- {
- return true;
- }
- else
- {
- checkSuspended();
+ /**
+ * return a copy of the current recovery modules list
+ *
+ * @return a copy of the the recovery modules list.
+ */
- setStatus(State.scanning);
- }
+ public final Vector getModules ()
+ {
+ // return a copy of the modules list so that clients are not affected by dynamic modifications to the list
+ // synchronize so that we don't copy in the middle of an add or remove
- tsLogger.arjLogger.info("Periodic recovery - second pass <"+
- _theTimestamper.format(new Date()) + ">" );
+ synchronized (_recoveryModules) {
+ return new Vector(_recoveryModules);
+ }
+ }
- modules = _recoveryModules.elements();
+ /***** private implementation *****/
- while (modules.hasMoreElements())
- {
- RecoveryModule m = (RecoveryModule) modules.nextElement();
+ /**
+ * fetch the current activity status either INACTIVE or SCANNING
+ *
+ * @caveats must only be called while synchronized on {@link PeriodicRecovery#_stateLock}
+ * @return INACTIVE if no scan is in progress or SCANNING if some thread is performing a scan
+ */
+ private int getStatus ()
+ {
+ return _currentStatus;
+ }
- m.periodicWorkSecondPass();
+ /**
+ * fetch the current recovery operation mode either ENABLED, SUSPENDED or TERMINATED
+ *
+ * @caveats must only be called while synchronized on {@link PeriodicRecovery#_stateLock}
+ * @return the current recovery operation mode
+ */
+ private int getMode ()
+ {
+ return _currentMode;
+ }
- if (tsLogger.arjLogger.isDebugEnabled())
- {
- tsLogger.arjLogger.debug ( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_CRASH_RECOVERY, " " );
- }
- }
+ /**
+ * set the current activity status
+ * @param status the new status to be used
+ */
+ private void setStatus (int status)
+ {
+ _currentStatus = status;
+ }
- try
- {
- if (!interrupted && periodic)
- Thread.sleep( _recoveryPeriod * 1000 );
- }
- catch ( InterruptedException ie )
- {
- interrupted = true;
- }
+ /**
+ * set the current recovery operation mode
+ * @param mode the new mode to be used
+ */
+ private void setMode (int mode)
+ {
+ _currentMode = mode;
+ }
- if (getStatus() == State.terminated)
- {
- return true;
- }
- else
- {
- checkSuspended();
+ /**
+ * wait for the required backoff period or less if the scanning status or scan mode changes
+ *
+ * @caveats this must only be called when synchronized on {@link PeriodicRecovery#_stateLock} and when
+ * _currentStatus is SCANNING and _currentMode is ENABLED
+ */
+ private void doBackoffWait()
+ {
+ try {
+ _stateLock.wait(_backoffPeriod * 1000);
+ } catch (InterruptedException e) {
+ // we can ignore this exception
+ }
+ }
- // make sure we're scanning again.
+ /**
+ * wait for the required recovery period or less if the scanning status or scan mode changes
+ *
+ * @caveats this must only be called when synchronized on {@link PeriodicRecovery#_stateLock} and when
+ * _currentStatus is INACTIVE and _currentMode is ENABLED
+ */
+ private void doPeriodicWait()
+ {
+ try {
+ _stateLock.wait(_recoveryPeriod * 1000);
+ } catch (InterruptedException e) {
+ // we can ignore this exception
+ }
+ }
- setStatus(State.active);
- }
+ /**
+ * wait until the we move out of SUSPENDED mode
+ *
+ * @caveats this must only be called when synchronized on {@link PeriodicRecovery#_stateLock}
+ */
+ private void doSuspendedWait()
+ {
+ while (getMode() == Mode.SUSPENDED) {
+ try {
+ _stateLock.wait();
+ } catch (InterruptedException e) {
+ // we can ignore this exception
+ }
+ }
+ }
- return false; // keep going
+ /**
+ * wait until some other thread stops scanning
+ *
+ * @caveats this must only be called when synchronized on {@link PeriodicRecovery#_stateLock} and when
+ * _currentStatus is SCANNING
+ */
+ private void doScanningWait()
+ {
+ while (getStatus() == Status.SCANNING) {
+ try {
+ _stateLock.wait();
+ } catch (InterruptedException e) {
+ // we can ignore this exception
+ }
+ }
}
/**
- * Add the specified module to the end of the recovery module list.
- * There is no way to specify relative ordering of recovery modules
- * with respect to modules loaded via the property file.
+ * start performing a scan continuing to completion unless we are terminating
*
- * @param RecoveryModule module The module to append.
+ * @caveats this must only be called when _currentStatus is SCANNING. on return _currentStatus is always
+ * still SCANNING
*/
- public final void addModule (RecoveryModule module)
+ private void doWorkInternal()
{
- _recoveryModules.add(module);
+ // n.b. we only get here if status is SCANNING
+
+ tsLogger.arjLogger.info("Periodic recovery - first pass <" +
+ _theTimestamper.format(new Date()) + ">" );
+
+ // n.b. this works on a copy of the modules list so it is not affected by
+ // dynamic updates in the middle of a scan
+
+ Enumeration modules = getModules().elements();
+
+ while (modules.hasMoreElements())
+ {
+ RecoveryModule m = (RecoveryModule) modules.nextElement();
+
+ m.periodicWorkFirstPass();
+
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug( DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ " " );
+ }
+ }
+
+ // take the lock again so we can do a backoff wait on it
+
+ synchronized (_stateLock) {
+ // we have to wait for a bit to avoid catching (too many)
+ // transactions etc. that are really progressing quite happily
+
+ doBackoffWait();
+
+ // we carry on scanning even if scanning is SUSPENDED because the suspending thread
+ // might be waiting on us to complete and we don't want to risk deadlocking it by waiting
+ // here for a resume.
+ // if we have been TERMINATED we bail out now
+ // n.b. if we give up here the caller is responsible for clearing the active scan
+
+ if (getMode() == Mode.TERMINATED) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: scan TERMINATED at phase 1");
+ }
+ return;
+ }
+ }
+
+ // move on to phase 2
+
+ tsLogger.arjLogger.info("Periodic recovery - second pass <"+
+ _theTimestamper.format(new Date()) + ">" );
+
+ modules = _recoveryModules.elements();
+
+ while (modules.hasMoreElements())
+ {
+ RecoveryModule m = (RecoveryModule) modules.nextElement();
+
+ m.periodicWorkSecondPass();
+
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug ( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_CRASH_RECOVERY, " " );
+ }
+ }
+
+ // n.b. the caller is responsible for clearing the active scan
}
/**
- * @return the recovery modules.
+ * notify the listener worker that a scan has completed
+ *
+ * @caveats this must only be called when synchronized on {@link PeriodicRecovery#_stateLock} at the point
+ * where Status transitions from SCANNING to INACTIVE
*/
- public final Vector getModules ()
+ private void notifyWorker()
{
- return _recoveryModules;
+ // if the listener is still waiting on a wakeup then notify it
+
+ if (_workerScanRequested) {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: scan thread signals listener worker");
+ }
+ _workerService.signalDone();
+ _workerScanRequested = false;
+ }
}
/**
@@ -393,7 +799,7 @@
* name of each module is used to indicate relative ordering.
*/
- private final static void loadModules ()
+ private static void loadModules ()
{
// scan the relevant properties so as to get them into sort order
Properties properties = arjPropertyManager.propertyManager.getProperties();
@@ -432,7 +838,12 @@
}
}
- private final static void loadModule (String className)
+ /**
+ * load a specific recovery module and add it to the recovery modules list
+ *
+ * @param className
+ */
+ private static void loadModule (String className)
{
if (tsLogger.arjLogger.isDebugEnabled())
{
@@ -499,67 +910,85 @@
}
}
- private void checkSuspended ()
- {
- synchronized (_signal)
- {
- _signal.notify();
- }
-
- if (getStatus() == State.suspended)
- {
- while (getStatus() == State.suspended)
- {
- try
- {
- synchronized (_suspendLock)
- {
- _suspendLock.wait();
- }
- }
- catch (InterruptedException ex)
- {
- }
- }
-
- setStatus(State.active);
- }
- }
-
- private final void initialise ()
+ /**
+ * initialise the periodic recovery instance to a suitable initial state
+ */
+ private void initialise ()
{
_recoveryModules = new Vector();
- setStatus(State.active);
+ setStatus(Status.INACTIVE);
+ setMode(Mode.ENABLED);
}
// this refers to the modules specified in the recovery manager
// property file which are dynamically loaded.
+ /**
+ * list of instances of RecoiveryModule either loaded during startup as specified in the recovery manager
+ * property file or added dynamically by calls to addModule
+ */
private static Vector _recoveryModules = null;
- // back off period is the time between the first and second pass.
- // recovery period is the time between the second pass and the start
- // of the first pass.
+ /**
+ * time in seconds between the first and second pass in any given scan
+ */
private static int _backoffPeriod = 0;
+
+ /**
+ * time in seconds for which the periodic recovery thread waits between scan attempts
+ */
private static int _recoveryPeriod = 0;
- // default values for the above
- private static final int _defaultBackoffPeriod = 10;
+ /**
+ * default value for _backoffPeriod if not specified via property {@link com.arjuna.ats.arjuna.common.Environment#RECOVERY_BACKOFF_PERIOD}
+ */
+ private static final int _defaultBackoffPeriod = 10;
+
+ /**
+ * default value for _recoveryPeriod if not specified via property {@link com.arjuna.ats.arjuna.common.Environment#PERIODIC_RECOVERY_PERIOD}
+ */
private static final int _defaultRecoveryPeriod = 120;
- // exit thread flag
- private static int _currentState = State.created;
- private static Object _stateLock = new Object();
+ /**
+ * lock controlling access to {@link PeriodicRecovery#_currentStatus}, {@link PeriodicRecovery#_currentMode} and
+ * {@link PeriodicRecovery#_workerScanRequested}
+ */
+ private static final Object _stateLock = new Object();
- private static SimpleDateFormat _theTimestamper = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss");
+ /**
+ * activity status indicating whether we IDLING or some thread is SCANNING
+ */
+ private static int _currentStatus;
+ /**
+ * operating mode indicating whether scanning is ENABLED, SUSPENDED or TERMINATED
+ */
+ private static int _currentMode;
+
+ /**
+ * flag indicating whether the listener has prodded the recovery thread
+ */
+ private boolean _workerScanRequested = false;
+
+ /**
+ * format for printing dates in log messages
+ */
+ private static SimpleDateFormat _theTimestamper = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss");
+
+ /**
+ * socket used by listener worker thread
+ */
private static ServerSocket _socket = null;
+ /**
+ * listener thread running worker service
+ */
private static Listener _listener = null;
+
+ /**
+ * the worker service which handles requests via the listener socket
+ */
private static WorkerService _workerService = null;
- private Object _suspendLock = new Object();
- private Object _signal = new Object();
-
/*
* Read the system properties to set the configurable options
*
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java 2008-03-25 06:30:57 UTC (rev 19223)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -159,7 +159,7 @@
public final void scan ()
{
- _periodicRecovery.doWork(false);
+ _periodicRecovery.doWork();
}
public final void addModule (RecoveryModule module)
@@ -167,6 +167,11 @@
_periodicRecovery.addModule(module);
}
+ public final void removeModule (RecoveryModule module)
+ {
+ _periodicRecovery.removeModule(module);
+ }
+
public final Vector getModules ()
{
return _periodicRecovery.getModules();
@@ -180,9 +185,13 @@
}
}
- public void stop ()
+ /**
+ * stop the recovery manager
+ * @param async false means wait for any recovery scan in progress to complete
+ */
+ public void stop (boolean async)
{
- _periodicRecovery.shutdown();
+ _periodicRecovery.shutdown(async);
// TODO why?
@@ -194,7 +203,7 @@
* doing recovery scans then it will be suspended afterwards, in order to
* preserve data integrity.
*
- * @param async wait for the recovery manager to finish any scans before returning.
+ * @param async false means wait for the recovery manager to finish any scans before returning.
*/
public void suspendScan (boolean async)
@@ -209,7 +218,7 @@
public void finalize ()
{
- stop();
+ stop(true);
}
private final boolean activeRecoveryManager ()
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java 2008-03-25 06:30:57 UTC (rev 19223)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -66,7 +66,7 @@
else
if (request.equals("SCAN") || (request.equals("ASYNC_SCAN")))
{
- _periodicRecovery.interrupt();
+ _periodicRecovery.wakeUp();
tsLogger.arjLogger.info("com.arjuna.ats.internal.arjuna.recovery.WorkerService_3");
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/services/classes/com/arjuna/ats/arjuna/services/recovery/RecoveryManagerService.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/services/classes/com/arjuna/ats/arjuna/services/recovery/RecoveryManagerService.java 2008-03-25 06:30:57 UTC (rev 19223)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/services/classes/com/arjuna/ats/arjuna/services/recovery/RecoveryManagerService.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -75,7 +75,7 @@
{
if ( _rm != null )
{
- _rm.stop();
+ _rm.stop(false);
}
return exitCode;
Modified: labs/jbosstm/trunk/ArjunaJTS/trailmap/src/com/arjuna/demo/recovery/xaresource/TestXAResourceRecovery.java
===================================================================
--- labs/jbosstm/trunk/ArjunaJTS/trailmap/src/com/arjuna/demo/recovery/xaresource/TestXAResourceRecovery.java 2008-03-25 06:30:57 UTC (rev 19223)
+++ labs/jbosstm/trunk/ArjunaJTS/trailmap/src/com/arjuna/demo/recovery/xaresource/TestXAResourceRecovery.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -227,7 +227,7 @@
// If the Recovery manager was ran in this VM
if( rcm != null )
{
- rcm.stop();
+ rcm.stop(true);
}
}
}
Modified: labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/XTSService.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/XTSService.java 2008-03-25 06:30:57 UTC (rev 19223)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/XTSService.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -22,6 +22,7 @@
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.logging.Logger;
+import org.jboss.transactions.xts.recovery.ACCoordinatorRecoveryModule;
import com.arjuna.mw.wsas.utils.Configuration;
import com.arjuna.mw.wst.deploy.WSTXInitialisation;
@@ -72,6 +73,7 @@
import com.arjuna.webservices.wsaddr.policy.AddressingPolicy;
import com.arjuna.wst.messaging.*;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
+import com.arjuna.ats.arjuna.recovery.RecoveryModule;
import java.io.InputStream;
@@ -84,6 +86,8 @@
private int taskManagerMinWorkerCount = 0;
private int taskManagerMaxWorkerCount = 10;
+ private ACCoordinatorRecoveryModule acCoordinatorRecoveryModule = null;
+
// TODO: how to use a (per application) remote coordinator?
// does the http servlet param indicate its own location and the
// coordinatorURL indicate the coord??
@@ -143,10 +147,10 @@
WSTXInitialisation(); // com.arjuna.mw.wst.deploy.WSTXInitialisation : Initialise WSTX
- //ACCoordinatorRecoveryModule acCoordinatorRecoveryModule = new ACCoordinatorRecoveryModule();
+ acCoordinatorRecoveryModule = new ACCoordinatorRecoveryModule();
// we assume the tx manager has started, hence initializing the recovery manager.
// to guarantee this our mbean should depend on the tx mgr mbean. (but does that g/tee start or just load?)
- //RecoveryManager.manager().addModule(acCoordinatorRecoveryModule); // TODO thread safety.
+ RecoveryManager.manager().addModule(acCoordinatorRecoveryModule);
}
@@ -154,6 +158,9 @@
{
getLog().info("JBossTS XTS Transaction Service - stopping");
+ if (acCoordinatorRecoveryModule != null) {
+ RecoveryManager.manager().removeModule(acCoordinatorRecoveryModule);
+ }
TaskManager.getManager().shutdown() ; // com.arjuna.services.framework.admin.TaskManagerInitialisation
// HttpClientInitialisation
Added: labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ACCoordinatorRecoveryModule.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ACCoordinatorRecoveryModule.java (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ACCoordinatorRecoveryModule.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -0,0 +1,325 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2007,
+ * @author Red Hat Middleware LLC.
+ */
+package org.jboss.transactions.xts.recovery;
+
+import com.arjuna.ats.arjuna.recovery.RecoveryModule;
+import com.arjuna.ats.arjuna.recovery.TransactionStatusConnectionManager;
+import com.arjuna.ats.arjuna.logging.tsLogger;
+import com.arjuna.ats.arjuna.logging.FacilityCode;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.state.InputObjectState;
+import com.arjuna.ats.arjuna.exceptions.ObjectStoreException;
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.arjuna.objectstore.ObjectStore;
+import com.arjuna.common.util.logging.DebugLevel;
+import com.arjuna.common.util.logging.VisibilityLevel;
+
+import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.ACCoordinator;
+
+import java.util.Vector;
+import java.util.Enumeration;
+
+/**
+ * This class is a plug-in module for the recovery manager.
+ * It is responsible for recovering failed XTS (ACCoordinator) transactions.
+ *
+ * Responsible for recovering instances of XTS Transaction Coordinators
+ * (com.arjuna.mwlabs.wscf.model.as.coordinator.arjunacore.ACCoordinator)
+ * Modelled on com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule
+ * TODO: refactor this and AtomicActionRecoveryModule to remove duplication?
+ * TODO: move to better package.
+ * TODO: how to register (config file vs. programmatic, given that the module list is fixed once recovery has started)
+ *
+ * @message com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_1 [com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_1] - RecoveryManagerStatusModule: Object store exception: {0}
+ * @message com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_2 [com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_2] - failed to recover Transaction {0} {1}
+ * @message com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_3 [com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_3] - failed to access transaction store {0} {1}
+ *
+ * $Id$
+ */
+public class ACCoordinatorRecoveryModule implements RecoveryModule
+{
+ public ACCoordinatorRecoveryModule()
+ {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug
+ ( DebugLevel.CONSTRUCTORS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "ACCoordinatorRecoveryModule created - default" );
+ }
+
+ if (_transactionStore == null)
+ {
+ _transactionStore = TxControl.getStore() ;
+ }
+
+ _transactionStatusConnectionMgr = new TransactionStatusConnectionManager() ;
+
+ Implementations.initialise();
+ }
+
+ /**
+ * This is called periodically by the RecoveryManager
+ */
+ public void periodicWorkFirstPass()
+ {
+ // Transaction type
+ boolean ACCoordinators = false ;
+
+ // uids per transaction type
+ InputObjectState acc_uids = new InputObjectState() ;
+
+ try
+ {
+ if (tsLogger.arjLogger.isInfoEnabled())
+ {
+ tsLogger.arjLogger.info( "StatusModule: first pass " );
+ }
+
+ ACCoordinators = _transactionStore.allObjUids( _transactionType, acc_uids );
+
+ }
+ catch ( ObjectStoreException ex )
+ {
+ if (tsLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_1",
+ new Object[]{ex});
+ }
+ }
+
+ if ( ACCoordinators )
+ {
+ _transactionUidVector = processTransactions( acc_uids ) ;
+ }
+ }
+
+ public void periodicWorkSecondPass()
+ {
+ if (tsLogger.arjLogger.isInfoEnabled())
+ {
+ tsLogger.arjLogger.info( "ACCoordinatorRecoveryModule: Second pass " );
+ }
+
+ processTransactionsStatus() ;
+ }
+
+ protected ACCoordinatorRecoveryModule (String type)
+ {
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug
+ ( DebugLevel.CONSTRUCTORS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "ACCoordinatorRecoveryModule created " + type );
+ }
+
+ if (_transactionStore == null)
+ {
+ _transactionStore = TxControl.getStore() ;
+ }
+
+ _transactionStatusConnectionMgr = new TransactionStatusConnectionManager() ;
+ _transactionType = type;
+
+ }
+
+ private void doRecoverTransaction( Uid recoverUid )
+ {
+ boolean commitThisTransaction = true ;
+
+ // Retrieve the transaction status from its original process.
+ int theStatus = _transactionStatusConnectionMgr.getTransactionStatus( _transactionType, recoverUid ) ;
+
+ boolean inFlight = isTransactionInMidFlight( theStatus ) ;
+
+ String Status = ActionStatus.stringForm( theStatus ) ;
+
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug
+ ( DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "transaction type is "+ _transactionType + " uid is " +
+ recoverUid.toString() + "\n ActionStatus is " + Status +
+ " in flight is " + inFlight ) ;
+ }
+
+ if ( ! inFlight )
+ {
+ try
+ {
+ tsLogger.arjLogger.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "jjh doing revovery here for "+recoverUid);
+ // TODO jjh
+ RecoverACCoordinator rcvACCoordinator =
+ new RecoverACCoordinator(recoverUid, theStatus);
+// RecoverAtomicAction rcvAtomicAction =
+// new RecoverAtomicAction( recoverUid, theStatus ) ;
+
+// rcvAtomicAction.replayPhase2() ;
+ rcvACCoordinator.replayPhase2();
+ }
+ catch ( Exception ex )
+ {
+ if (tsLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_2",
+ new Object[]{recoverUid.toString(), ex});
+ }
+ }
+ }
+ }
+
+ private boolean isTransactionInMidFlight( int status )
+ {
+ boolean inFlight = false ;
+
+ switch ( status )
+ {
+ // these states can only come from a process that is still alive
+ case ActionStatus.RUNNING :
+ case ActionStatus.ABORT_ONLY :
+ case ActionStatus.PREPARING :
+ case ActionStatus.COMMITTING :
+ case ActionStatus.ABORTING :
+ case ActionStatus.PREPARED :
+ inFlight = true ;
+ break ;
+
+ // the transaction is apparently still there, but has completed its
+ // phase2. should be safe to redo it.
+ case ActionStatus.COMMITTED :
+ case ActionStatus.H_COMMIT :
+ case ActionStatus.H_MIXED :
+ case ActionStatus.H_HAZARD :
+ case ActionStatus.ABORTED :
+ case ActionStatus.H_ROLLBACK :
+ inFlight = false ;
+ break ;
+
+ // this shouldn't happen
+ case ActionStatus.INVALID :
+ default:
+ inFlight = false ;
+ }
+
+ return inFlight ;
+ }
+
+ private Vector processTransactions( InputObjectState uids )
+ {
+ Vector uidVector = new Vector() ;
+
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug( DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "processing " + _transactionType
+ + " transactions" ) ;
+ }
+
+ Uid theUid = new Uid( Uid.nullUid() );
+
+ boolean moreUids = true ;
+
+ while (moreUids)
+ {
+ try
+ {
+ theUid.unpack( uids ) ;
+
+ if (theUid.equals( Uid.nullUid() ))
+ {
+ moreUids = false;
+ }
+ else
+ {
+ Uid newUid = new Uid( theUid ) ;
+
+ if (tsLogger.arjLogger.isDebugEnabled())
+ {
+ tsLogger.arjLogger.debug
+ ( DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "found transaction "+ newUid ) ;
+ }
+
+ uidVector.addElement( newUid ) ;
+ }
+ }
+ catch ( Exception ex )
+ {
+ moreUids = false;
+ }
+ }
+ return uidVector ;
+ }
+
+ private void processTransactionsStatus()
+ {
+ // Process the Vector of transaction Uids
+ Enumeration transactionUidEnum = _transactionUidVector.elements() ;
+
+ while ( transactionUidEnum.hasMoreElements() )
+ {
+ Uid currentUid = (Uid) transactionUidEnum.nextElement();
+
+ try
+ {
+ if ( _transactionStore.currentState( currentUid, _transactionType ) != ObjectStore.OS_UNKNOWN )
+ {
+ doRecoverTransaction( currentUid ) ;
+ }
+ }
+ catch ( ObjectStoreException ex )
+ {
+ if (tsLogger.arjLogger.isWarnEnabled())
+ {
+ tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.ACCoordinatorRecoveryModule_3",
+ new Object[]{currentUid.toString(), ex});
+ }
+ }
+ }
+ }
+
+ // 'type' within the Object Store for ACCoordinator.
+ private String _transactionType = new ACCoordinator().type() ;
+
+ // Array of transactions found in the object store of the
+ // ACCoordinator type.
+ private Vector _transactionUidVector = null ;
+
+ // Reference to the Object Store.
+ private static ObjectStore _transactionStore = null ;
+
+ // This object manages the interface to all TransactionStatusManagers
+ // processes(JVMs) on this system/node.
+ private TransactionStatusConnectionManager _transactionStatusConnectionMgr ;
+
+}
+
Added: labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/Implementations.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/Implementations.java (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/Implementations.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+
+package org.jboss.transactions.xts.recovery;
+
+import com.arjuna.ats.arjuna.gandiva.inventory.Inventory;
+
+/**
+ * Module specific class that is responsible for adding any implementations
+ * to the inventory.
+ *
+ * @author Mark Little (mark at arjuna.com)
+ * @version $Id: Implementations.java 2342 2006-03-30 13:06:17Z $
+ * @since JTS 1.0.
+ */
+public class Implementations {
+
+ public static synchronized boolean added ()
+ {
+ return _added;
+ }
+
+ public static synchronized void initialise ()
+ {
+ if (!_added)
+ {
+ // WS-AT Participant records.
+ Inventory.inventory().addToList(new ParticipantRecordSetup());
+
+ _added = true;
+ }
+ }
+
+ private Implementations ()
+ {
+ }
+
+ private static boolean _added = false;
+
+ static
+ {
+ initialise();
+ }
+
+}
Added: labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ParticipantRecordSetup.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ParticipantRecordSetup.java (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/ParticipantRecordSetup.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006,
+ * @author JBoss Inc.
+ */
+package org.jboss.transactions.xts.recovery;
+
+import com.arjuna.ats.arjuna.gandiva.inventory.InventoryElement;
+import com.arjuna.ats.arjuna.gandiva.ClassName;
+import com.arjuna.ats.arjuna.gandiva.ObjectName;
+import com.arjuna.ats.arjuna.PersistenceRecord;
+import com.arjuna.ats.arjuna.ArjunaNames;
+import com.arjuna.ats.arjuna.coordinator.RecordType;
+
+import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.ParticipantRecord;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: jhalli
+ * Date: Aug 20, 2007
+ * Time: 3:18:23 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ParticipantRecordSetup implements InventoryElement {
+
+ public synchronized Object createVoid ()
+ {
+ return ParticipantRecord.create();
+ }
+
+ public synchronized Object createClassName (ClassName className)
+ {
+ return null;
+ }
+
+ public synchronized Object createObjectName (ObjectName objectName)
+ {
+ return null;
+ }
+
+ public synchronized Object createResources (Object[] resources)
+ {
+ return null;
+ }
+
+ public synchronized Object createClassNameResources (ClassName className, Object[] resources)
+ {
+ return null;
+ }
+
+ public synchronized Object createObjectNameResources (ObjectName objectName, Object[] resources)
+ {
+ return null;
+ }
+
+ public ClassName className ()
+ {
+ return RecordType.typeToClassName(RecordType.USER_DEF_FIRST0);
+ //ClassName("WSATParticipantRecord"); // TODO remove dupl with ParticipantRecord
+ }
+
+}
Added: labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/RecoverACCoordinator.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/RecoverACCoordinator.java (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/transactions/xts/recovery/RecoverACCoordinator.java 2008-03-25 10:12:55 UTC (rev 19224)
@@ -0,0 +1,99 @@
+package org.jboss.transactions.xts.recovery;
+
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.arjuna.logging.tsLogger;
+import com.arjuna.ats.arjuna.logging.FacilityCode;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+
+import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.ACCoordinator;
+import com.arjuna.common.util.logging.DebugLevel;
+import com.arjuna.common.util.logging.VisibilityLevel;
+
+/**
+ * This class is a plug-in module for the recovery manager.
+ * It is responsible for recovering failed ACCoordinator transactions.
+ *
+ * @message com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule_1 [com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule_1] - RecoveryManagerStatusModule: Object store exception: {0}
+ * @message com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule_2 [com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule_2] - failed to recover Transaction {0} {1}
+ * @message com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule_3 [com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule_3] - failed to access transaction store {0} {1}
+*/
+public class RecoverACCoordinator extends ACCoordinator {
+
+ // TODO: refactor RecoverAtomicAction so that this can subclass it to remove dupl?
+
+ /**
+ * Re-creates/activates an AtomicAction for the specified
+ * transaction Uid.
+ */
+ public RecoverACCoordinator ( Uid rcvUid, int theStatus )
+ {
+ super( rcvUid ) ;
+ _theStatus = theStatus ;
+ _activated = activate() ;
+ }
+
+ /**
+ * Replays phase 2 of the commit protocol.
+ */
+ public void replayPhase2()
+ {
+ if (tsLogger.arjLoggerI18N.debugAllowed())
+ {
+ tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "com.arjuna.ats.arjuna.recovery.RecoverAtomicAction_1",
+ new Object[]{get_uid(), ActionStatus.stringForm(_theStatus)});
+ }
+
+ if ( _activated )
+ {
+ if ( (_theStatus == ActionStatus.PREPARED) ||
+ (_theStatus == ActionStatus.COMMITTING) ||
+ (_theStatus == ActionStatus.COMMITTED) ||
+ (_theStatus == ActionStatus.H_COMMIT) ||
+ (_theStatus == ActionStatus.H_MIXED) ||
+ (_theStatus == ActionStatus.H_HAZARD) )
+ {
+ super.phase2Commit( _reportHeuristics ) ;
+ }
+ else if ( (_theStatus == ActionStatus.ABORTED) ||
+ (_theStatus == ActionStatus.H_ROLLBACK) ||
+ (_theStatus == ActionStatus.ABORTING) ||
+ (_theStatus == ActionStatus.ABORT_ONLY) )
+ {
+ super.phase2Abort( _reportHeuristics ) ;
+ }
+ else
+ {
+ if (tsLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.recovery.RecoverAtomicAction_2",
+ new Object[]{ActionStatus.stringForm(_theStatus)});
+ }
+ }
+
+ if (tsLogger.arjLoggerI18N.debugAllowed())
+ {
+ tsLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "com.arjuna.ats.arjuna.recovery.RecoverAtomicAction_3",
+ new Object[]{get_uid()});
+ }
+ }
+ else
+ {
+ tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.recovery.RecoverAtomicAction_4");
+ }
+ }
+
+ // Current transaction status
+ // (retrieved from the TransactionStatusManager)
+ private int _theStatus ;
+
+ // Flag to indicate that this transaction has been re-activated
+ // successfully.
+ private boolean _activated = false ;
+
+ // whether heuristic reporting on phase 2 commit is enabled.
+ private boolean _reportHeuristics = true ;
+}
More information about the jboss-svn-commits
mailing list