[jboss-svn-commits] JBL Code SVN: r24227 - in labs/jbosstm/trunk/XTS: WS-T/dev/src10/com/arjuna/wst/stub and 5 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Dec 3 10:23:34 EST 2008
Author: adinn
Date: 2008-12-03 10:23:34 -0500 (Wed, 03 Dec 2008)
New Revision: 24227
Added:
labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java
labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java
Modified:
labs/jbosstm/trunk/XTS/WS-T/dev/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManager.java
labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/SubordinateDurable2PCStub.java
labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/SubordinateDurable2PCStub.java
labs/jbosstm/trunk/XTS/WSCF/classes/com/arjuna/mwlabs/wscf/model/twophase/arjunacore/subordinate/SubordinateCoordinator.java
labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/XTSService.java
labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManagerImple.java
Log:
implementation of AT 1.0 and 1.1 subtransaction recovery which stills needs proper testing but does not break the WSTX unit tests
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManager.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManager.java 2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManager.java 2008-12-03 15:23:34 UTC (rev 24227)
@@ -133,12 +133,26 @@
public abstract boolean isCoordinatorRecoveryStarted();
/**
+ * test whether the first AT subordinate coordinator recovery scan has completed. this indicates
+ * whether there may or may not still be unknown AT subtransaction records on disk. If the first
+ * scan has not yet completed then a commit for an unknown subtransaction must raise an exception
+ * delaying commit of the parent transaction.
+ */
+ public abstract boolean isSubordinateCoordinatorRecoveryStarted();
+
+ /**
* record the fact thatwhether the first AT coordinator recovery scan has completed.
*/
public abstract void setCoordinatorRecoveryStarted();
/**
+ * record the fact thatwhether the first AT coordinator recovery scan has completed.
+ */
+
+ public abstract void setSubordinateCoordinatorRecoveryStarted();
+
+ /**
* the singleton instance of the recovery manager
*/
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/SubordinateDurable2PCStub.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/SubordinateDurable2PCStub.java 2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src10/com/arjuna/wst/stub/SubordinateDurable2PCStub.java 2008-12-03 15:23:34 UTC (rev 24227)
@@ -3,19 +3,44 @@
import com.arjuna.wst.*;
import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator;
import com.arjuna.ats.arjuna.coordinator.TwoPhaseOutcome;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.state.OutputObjectState;
+import com.arjuna.ats.arjuna.state.InputObjectState;
+import java.io.IOException;
+
+import org.jboss.jbossts.xts.recovery.participant.at.XTSATRecoveryManager;
+
/**
* A durable participant registered on behalf of an interposed WS-AT coordinator in order to ensure that
* durable participants in the subtransaction prepared, committed and aborted at the right time.
*/
public class SubordinateDurable2PCStub implements Durable2PCParticipant
{
+ /**
+ * normal constructor used when the subordinate coordinator is registered as a durable participant
+ * with its parent coordinator.
+ *
+ * @param coordinator
+ */
public SubordinateDurable2PCStub(SubordinateCoordinator coordinator)
{
this.coordinator = coordinator;
+ this.coordinatorId = coordinator.get_uid().stringForm();
+ this.recovered = false;
}
/**
+ * empty constructor for use only during recovery
+ */
+ public SubordinateDurable2PCStub()
+ {
+ this.coordinator = null;
+ this.coordinatorId = null;
+ this.recovered = true;
+ }
+
+ /**
* This will be called when the parent coordinator is preparing its durable participants and should ensure
* that the interposed cooordinator does the same.
*
@@ -44,7 +69,41 @@
*/
public void commit() throws WrongStateException, SystemException {
- coordinator.commit();
+ if (!isRecovered()) {
+ coordinator.commit();
+ } else {
+ // first check whether crashed coordinators have been recovered
+ XTSATRecoveryManager recoveryManager = XTSATRecoveryManager.getRecoveryManager();
+ boolean isRecoveryScanStarted = recoveryManager.isSubordinateCoordinatorRecoveryStarted();
+ // now look for a subordinate coordinator with the right id
+ coordinator = SubordinateCoordinator.getRecoveredCoordinator(coordinatorId);
+ if (coordinator == null) {
+ if (!isRecoveryScanStarted) {
+ // the subtransaction may still be waiting to be resolved
+ // throw an exception causing the commit to be retried later
+ throw new SystemException();
+ }
+ } else if(!coordinator.isActivated()) {
+ // the transaction was logged but has not yet been recovered successfully
+ // throw an exception causing the commit to be retried later
+ throw new SystemException();
+ } else {
+ int status = coordinator.status();
+
+ if (status == ActionStatus.PREPARED) {
+ // ok, the commit process was not previously initiated so start it now
+ coordinator.commit();
+ status = coordinator.status();
+ }
+
+ // check that we are not still committing because of a comms timeout
+
+ if (status == ActionStatus.COMMITTING) {
+ // throw an exception causing the commit to be retried later
+ throw new SystemException();
+ }
+ }
+ }
}
/**
@@ -55,7 +114,34 @@
*/
public void rollback() throws WrongStateException, SystemException {
- coordinator.rollback();
+ if (!isRecovered()) {
+ coordinator.rollback();
+ } else {
+ // first check whether crashed coordinators have been recovered
+ XTSATRecoveryManager recoveryManager = XTSATRecoveryManager.getRecoveryManager();
+ boolean isRecoveryScanStarted = recoveryManager.isSubordinateCoordinatorRecoveryStarted();
+ // now look for a subordinate coordinator with the right id
+ coordinator = SubordinateCoordinator.getRecoveredCoordinator(coordinatorId);
+ if (coordinator == null) {
+ if (!isRecoveryScanStarted) {
+ // the subtransaction may still be waiting to be resolved
+ // throw an exception causing the rollback to be retried later
+ throw new SystemException();
+ }
+ } else if(!coordinator.isActivated()) {
+ // the transaction was logged but has not yet been recovered successfully
+ // throw an exception causing the rollback to be retried later
+ throw new SystemException();
+ } else {
+ int status = coordinator.status();
+
+ if (status == ActionStatus.PREPARED) {
+ // ok, the rollback process was not previously initiated so start it now
+ coordinator.rollback();
+ status = coordinator.status();
+ }
+ }
+ }
}
/**
@@ -75,7 +161,58 @@
}
/**
+ * Save the state of the particpant to the specified input object stream.
+ * @param oos The output output stream.
+ * @return true if persisted, false otherwise.
+ */
+ public boolean saveState(OutputObjectState oos) {
+ // we need to save the id of the subordinate coordinator so we can identify it again
+ // when we are recreated
+ try {
+ oos.packString(coordinatorId);
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Restore the state of the particpant from the specified input object stream.
+ * @param ios The Input object stream.
+ * @return true if restored, false otherwise.
+ */
+ public boolean restoreState(InputObjectState ios) {
+ // restore the subordinate coordinator id so we can check to ensure it has been committed
+ String coordinatorId;
+ try {
+ coordinatorId = ios.unpackString();
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ /**
+ * test if this participant is recovered
+ */
+ public boolean isRecovered()
+ {
+ return recovered;
+ }
+
+ /**
* the interposed coordinator
*/
private SubordinateCoordinator coordinator;
+
+ /**
+ * the interposed coordinator's id
+ */
+ private String coordinatorId;
+
+ /**
+ * a flag indicating whether this participant has been recovered
+ */
+
+ private boolean recovered;
}
\ No newline at end of file
Modified: labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/SubordinateDurable2PCStub.java
===================================================================
--- labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/SubordinateDurable2PCStub.java 2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/WS-T/dev/src11/com/arjuna/wst11/stub/SubordinateDurable2PCStub.java 2008-12-03 15:23:34 UTC (rev 24227)
@@ -3,19 +3,44 @@
import com.arjuna.wst.*;
import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator;
import com.arjuna.ats.arjuna.coordinator.TwoPhaseOutcome;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.state.OutputObjectState;
+import com.arjuna.ats.arjuna.state.InputObjectState;
+import java.io.IOException;
+
+import org.jboss.jbossts.xts.recovery.participant.at.XTSATRecoveryManager;
+
/**
* A durable participant registered on behalf of an interposed WS-AT coordinator in order to ensure that
* durable participants in the subtransaction prepared, committed and aborted at the right time.
*/
-public class SubordinateDurable2PCStub implements Durable2PCParticipant
+public class SubordinateDurable2PCStub implements Durable2PCParticipant, PersistableParticipant
{
+ /**
+ * normal constructor used when the subordinate coordinator is registered as a durable participant
+ * with its parent coordinator.
+ *
+ * @param coordinator
+ */
public SubordinateDurable2PCStub(SubordinateCoordinator coordinator)
{
this.coordinator = coordinator;
+ this.coordinatorId = coordinator.get_uid().stringForm();
+ this.recovered = false;
}
/**
+ * empty constructor for use only during recovery
+ */
+ public SubordinateDurable2PCStub()
+ {
+ this.coordinator = null;
+ this.coordinatorId = null;
+ this.recovered = true;
+ }
+
+ /**
* This will be called when the parent coordinator is preparing its durable participants and should ensure
* that the interposed cooordinator does the same.
*
@@ -44,7 +69,41 @@
*/
public void commit() throws WrongStateException, SystemException {
- coordinator.commit();
+ if (!isRecovered()) {
+ coordinator.commit();
+ } else {
+ // first check whether crashed coordinators have been recovered
+ XTSATRecoveryManager recoveryManager = XTSATRecoveryManager.getRecoveryManager();
+ boolean isRecoveryScanStarted = recoveryManager.isSubordinateCoordinatorRecoveryStarted();
+ // now look for a subordinate coordinator with the right id
+ coordinator = SubordinateCoordinator.getRecoveredCoordinator(coordinatorId);
+ if (coordinator == null) {
+ if (!isRecoveryScanStarted) {
+ // the subtransaction may still be waiting to be resolved
+ // throw an exception causing the commit to be retried later
+ throw new SystemException();
+ }
+ } else if(!coordinator.isActivated()) {
+ // the transaction was logged but has not yet been recovered successfully
+ // throw an exception causing the commit to be retried later
+ throw new SystemException();
+ } else {
+ int status = coordinator.status();
+
+ if (status == ActionStatus.PREPARED) {
+ // ok, the commit process was not previously initiated so start it now
+ coordinator.commit();
+ status = coordinator.status();
+ }
+
+ // check that we are not still committing because of a comms timeout
+
+ if (status == ActionStatus.COMMITTING) {
+ // throw an exception causing the commit to be retried later
+ throw new SystemException();
+ }
+ }
+ }
}
/**
@@ -55,7 +114,34 @@
*/
public void rollback() throws WrongStateException, SystemException {
- coordinator.rollback();
+ if (!isRecovered()) {
+ coordinator.rollback();
+ } else {
+ // first check whether crashed coordinators have been recovered
+ XTSATRecoveryManager recoveryManager = XTSATRecoveryManager.getRecoveryManager();
+ boolean isRecoveryScanStarted = recoveryManager.isSubordinateCoordinatorRecoveryStarted();
+ // now look for a subordinate coordinator with the right id
+ coordinator = SubordinateCoordinator.getRecoveredCoordinator(coordinatorId);
+ if (coordinator == null) {
+ if (!isRecoveryScanStarted) {
+ // the subtransaction may still be waiting to be resolved
+ // throw an exception causing the rollback to be retried later
+ throw new SystemException();
+ }
+ } else if(!coordinator.isActivated()) {
+ // the transaction was logged but has not yet been recovered successfully
+ // throw an exception causing the rollback to be retried later
+ throw new SystemException();
+ } else {
+ int status = coordinator.status();
+
+ if (status == ActionStatus.PREPARED) {
+ // ok, the rollback process was not previously initiated so start it now
+ coordinator.rollback();
+ status = coordinator.status();
+ }
+ }
+ }
}
/**
@@ -75,7 +161,58 @@
}
/**
+ * Save the state of the particpant to the specified input object stream.
+ * @param oos The output output stream.
+ * @return true if persisted, false otherwise.
+ */
+ public boolean saveState(OutputObjectState oos) {
+ // we need to save the id of the subordinate coordinator so we can identify it again
+ // when we are recreated
+ try {
+ oos.packString(coordinatorId);
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Restore the state of the particpant from the specified input object stream.
+ * @param ios The Input object stream.
+ * @return true if restored, false otherwise.
+ */
+ public boolean restoreState(InputObjectState ios) {
+ // restore the subordinate coordinator id so we can check to ensure it has been committed
+ String coordinatorId;
+ try {
+ coordinatorId = ios.unpackString();
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ /**
+ * test if this participant is recovered
+ */
+ public boolean isRecovered()
+ {
+ return recovered;
+ }
+
+ /**
* the interposed coordinator
*/
private SubordinateCoordinator coordinator;
+
+ /**
+ * the interposed coordinator's id
+ */
+ private String coordinatorId;
+
+ /**
+ * a flag indicating whether this participant has been recovered
+ */
+
+ private boolean recovered;
}
\ No newline at end of file
Modified: labs/jbosstm/trunk/XTS/WSCF/classes/com/arjuna/mwlabs/wscf/model/twophase/arjunacore/subordinate/SubordinateCoordinator.java
===================================================================
--- labs/jbosstm/trunk/XTS/WSCF/classes/com/arjuna/mwlabs/wscf/model/twophase/arjunacore/subordinate/SubordinateCoordinator.java 2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/WSCF/classes/com/arjuna/mwlabs/wscf/model/twophase/arjunacore/subordinate/SubordinateCoordinator.java 2008-12-03 15:23:34 UTC (rev 24227)
@@ -44,6 +44,8 @@
import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.ACCoordinator;
+import java.util.HashMap;
+
/**
* This class represents a specific coordination instance. It is essentially an
* ArjunaCore TwoPhaseCoordinator, which gives us access to two-phase with
@@ -58,15 +60,24 @@
public class SubordinateCoordinator extends ACCoordinator
{
-
+
+ /**
+ * normal constructor
+ */
public SubordinateCoordinator ()
{
super();
+ activated = true;
}
+ /**
+ * constructor for recovered coordinator
+ * @param recovery
+ */
public SubordinateCoordinator (Uid recovery)
{
super(recovery);
+ activated = false;
}
/**
@@ -233,6 +244,11 @@
{
}
+ public String type ()
+ {
+ return "/StateManager/BasicAction/AtomicAction/TwoPhaseCoordinator/TwoPhase/SubordinateCoordinator";
+ }
+
/**
* return a uid for the volatile participant registered on behalf of this corodinator
*/
@@ -250,9 +266,42 @@
}
+ protected static synchronized void addRecoveredCoordinator(SubordinateCoordinator coordinator)
+ {
+ recoveredCoordinators.put(coordinator.get_uid().stringForm(), coordinator);
+ }
+
+ protected static synchronized void removeRecoveredCoordinator(SubordinateCoordinator coordinator)
+ {
+ recoveredCoordinators.put(coordinator.get_uid().stringForm(), null);
+ }
+
+ protected void setActivated()
+ {
+ activated = true;
+ }
+
+ public boolean isActivated()
+ {
+ return activated;
+ }
+
+ public static synchronized SubordinateCoordinator getRecoveredCoordinator(String coordinatorId)
+ {
+ return recoveredCoordinators.get(coordinatorId);
+ }
+
/**
* this saves the status after the subtransaction commit or rollback so it can be referred to during
* afterCompletion processing.
*/
private int finalStatus = ActionStatus.CREATED;
+
+ /**
+ * flag identifying whether this coordinator is active, set true for normal transactions and false
+ * for recovered transactions until they are activated
+ */
+ private boolean activated;
+
+ private static final HashMap<String, SubordinateCoordinator> recoveredCoordinators = new HashMap<String, SubordinateCoordinator>();
}
Modified: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/XTSService.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/XTSService.java 2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/XTSService.java 2008-12-03 15:23:34 UTC (rev 24227)
@@ -22,6 +22,7 @@
import org.jboss.logging.Logger;
import org.jboss.jbossts.xts.recovery.coordinator.at.ACCoordinatorRecoveryModule;
+import org.jboss.jbossts.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule;
import org.jboss.jbossts.xts.recovery.coordinator.ba.BACoordinatorRecoveryModule;
import org.jboss.jbossts.xts.recovery.participant.at.ATParticipantRecoveryModule;
import org.jboss.jbossts.xts.recovery.participant.ba.BAParticipantRecoveryModule;
@@ -107,6 +108,7 @@
private final Logger log = org.jboss.logging.Logger.getLogger(XTSService.class);
private ACCoordinatorRecoveryModule acCoordinatorRecoveryModule = null;
+ private SubordinateCoordinatorRecoveryModule subordinateCoordinatorRecoveryModule = null;
private ATParticipantRecoveryModule atParticipantRecoveryModule = null;
private BACoordinatorRecoveryModule baCoordinatorRecoveryModule = null;
@@ -225,6 +227,9 @@
TaskManagerInitialisation(); // com.arjuna.services.framework.admin.TaskManagerInitialisation : initialise the Task Manager
+
+ // install the AT recovery modules
+
acCoordinatorRecoveryModule = new ACCoordinatorRecoveryModule();
// ensure Implementations are installed into the inventory before we register the module
@@ -232,6 +237,13 @@
acCoordinatorRecoveryModule.install();
// we don't need to install anything in the Inventory for this recovery module as it
+ // uses the same records as those employed by ACCoordinatorRecoveryModule
+
+ subordinateCoordinatorRecoveryModule = new SubordinateCoordinatorRecoveryModule();
+
+ subordinateCoordinatorRecoveryModule.install();
+
+ // we don't need to install anything in the Inventory for this recovery module as it
// manages its own ObjectStore records but we do need it to create the recovery manager
// singleton.
@@ -267,6 +279,7 @@
RecoveryManager.manager().addModule(baParticipantRecoveryModule);
RecoveryManager.manager().addModule(acCoordinatorRecoveryModule);
+ RecoveryManager.manager().addModule(subordinateCoordinatorRecoveryModule);
RecoveryManager.manager().addModule(baCoordinatorRecoveryModule);
}
@@ -280,6 +293,12 @@
// ok, now it is safe to get the recovery manager to uninstall its Implementations from the inventory
baCoordinatorRecoveryModule.uninstall();
}
+ if (subordinateCoordinatorRecoveryModule != null) {
+ // remove the module, making sure any scan which might be using it has completed
+ RecoveryManager.manager().removeModule(subordinateCoordinatorRecoveryModule, true);
+ // ok, now it is safe to get the recovery manager to uninstall its Implementations from the inventory
+ subordinateCoordinatorRecoveryModule.uninstall();
+ }
if (acCoordinatorRecoveryModule != null) {
// remove the module, making sure any scan which might be using it has completed
RecoveryManager.manager().removeModule(acCoordinatorRecoveryModule, true);
Copied: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java (from rev 23835, labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverACCoordinator.java)
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java 2008-12-03 15:23:34 UTC (rev 24227)
@@ -0,0 +1,123 @@
+package org.jboss.jbossts.xts.recovery.coordinator.at;
+
+import org.jboss.jbossts.xts.logging.XTSLogger;
+
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.arjuna.logging.FacilityCode;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+
+import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator;
+import com.arjuna.common.util.logging.DebugLevel;
+import com.arjuna.common.util.logging.VisibilityLevel;
+
+/**
+ * This class is a plug-in module for the recovery manager.
+ * It is responsible for recovering failed WSAT ACCoordinator transactions.
+ *
+ * @message org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_1 [org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_1] - RecoverSubordinateCoordinator.replayPhase2 recovering {0} ActionStatus is {1}
+ * @message org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_2 [org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_2] - RecoverSubordinateCoordinator.replayPhase2: Unexpected status: {0}
+ * @message org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_3 [org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_3] - RecoverSubordinateCoordinator.replayPhase2( {0} ) finished
+ * @message org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_4 [org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_4] - RecoverSubordinateCoordinator.replayPhase2 transaction {0} not activated, unable to replay phase 2 commit
+*/
+public class RecoverSubordinateCoordinator extends SubordinateCoordinator {
+
+ /**
+ * Re-creates/activates an AtomicAction for the specified
+ * transaction Uid.
+ */
+ public RecoverSubordinateCoordinator( Uid rcvUid )
+ {
+ super( rcvUid ) ;
+ _activated = activate() ;
+ }
+
+ /**
+ * run parent activate and also make this coordinator visible if there might be a durable participant waiting
+ * for it to commit.
+ * @return
+ */
+ public boolean activate()
+ {
+ boolean result = super.activate();
+ // record whether the activation worked
+ if (result) {
+ setActivated();
+ }
+
+ int status = status();
+ if (result == false || (status == ActionStatus.PREPARED || status == ActionStatus.COMMITTING)) {
+ // we need to install this coordinator in a global table so that the participant which
+ // was driving it will know that it has been recovered but not yet committed
+ // n.b. we do this even if the activation failed because we need to ensure the
+ // participant rejects a commit until this transaction has committed
+
+ SubordinateCoordinator.addRecoveredCoordinator(this);
+ }
+
+ return result;
+ }
+
+ /**
+ * Replays phase 2 of the commit protocol.
+ */
+ public void replayPhase2()
+ {
+ final int status = status();
+
+ if (XTSLogger.arjLoggerI18N.debugAllowed())
+ {
+ XTSLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_1",
+ new Object[]{get_uid(), ActionStatus.stringForm(status)});
+ }
+
+ if ( _activated )
+ {
+ // we don't run phase 2 again if status is PREPARED because we need to wait for the
+ // parent coordinator to tell us what to do
+
+ // we automatically rerun phase2 if the action status is COMMITTING, which happens when
+ // we get a comms timeout from one of the participants after sending it a COMMIT message.
+
+ if ((status == ActionStatus.COMMITTING) ||
+ (status == ActionStatus.COMMITTED) ||
+ (status == ActionStatus.H_COMMIT) ||
+ (status == ActionStatus.H_MIXED) ||
+ (status == ActionStatus.H_HAZARD))
+ {
+ super.phase2Commit( _reportHeuristics ) ;
+ } else if ((status == ActionStatus.ABORTED) ||
+ (status == ActionStatus.H_ROLLBACK) ||
+ (status == ActionStatus.ABORTING) ||
+ (status == ActionStatus.ABORT_ONLY))
+ {
+ super.phase2Abort( _reportHeuristics ) ;
+ }
+
+ if (XTSLogger.arjLoggerI18N.debugAllowed())
+ {
+ XTSLogger.arjLoggerI18N.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_3",
+ new Object[]{get_uid()});
+ }
+ }
+ else
+ {
+ XTSLogger.arjLoggerI18N.warn("org.jboss.jbossts.xts.recovery.coordinator.at.RecoverSubordinateCoordinator_4", new Object[]{get_uid()});
+ }
+
+ if ((status == ActionStatus.PREPARED) ||
+ (status == ActionStatus.COMMITTING)) {
+ SubordinateCoordinator.removeRecoveredCoordinator(this);
+ }
+ }
+
+ // Flag to indicate that this transaction has been re-activated
+ // successfully.
+ private boolean _activated = false ;
+
+ // whether heuristic reporting on phase 2 commit is enabled.
+ private boolean _reportHeuristics = true ;
+}
\ No newline at end of file
Property changes on: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/RecoverSubordinateCoordinator.java
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java (from rev 23835, labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/ACCoordinatorRecoveryModule.java)
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java (rev 0)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java 2008-12-03 15:23:34 UTC (rev 24227)
@@ -0,0 +1,342 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2007,
+ * @author Red Hat Middleware LLC.
+ */
+package org.jboss.jbossts.xts.recovery.coordinator.at;
+
+import org.jboss.jbossts.xts.logging.XTSLogger;
+import org.jboss.jbossts.xts.recovery.participant.at.XTSATRecoveryManager;
+
+import com.arjuna.ats.arjuna.recovery.RecoveryModule;
+import com.arjuna.ats.arjuna.recovery.TransactionStatusConnectionManager;
+import com.arjuna.ats.arjuna.logging.FacilityCode;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.arjuna.coordinator.ActionStatus;
+import com.arjuna.ats.arjuna.state.InputObjectState;
+import com.arjuna.ats.arjuna.exceptions.ObjectStoreException;
+import com.arjuna.ats.arjuna.common.Uid;
+import com.arjuna.ats.arjuna.objectstore.ObjectStore;
+import com.arjuna.common.util.logging.DebugLevel;
+import com.arjuna.common.util.logging.VisibilityLevel;
+
+import com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator;
+
+import java.util.Vector;
+import java.util.Enumeration;
+
+/**
+ * This class is a plug-in module for the recovery manager.
+ * It is responsible for recovering failed XTS AT subordinate (SubordinateCoordinator) transactions.
+ * (instances of com.arjuna.mwlabs.wscf.model.twophase.arjunacore.subordinate.SubordinateCoordinator)
+ *
+ * $Id$
+ *
+ * @message org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_1 [org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_1] - RecoveryManagerStatusModule: Object store exception: {0}
+ * @message org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_2 [org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_2] - failed to recover Transaction {0} {1}
+ * @message org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_3 [org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_3] - failed to access transaction store {0} {1}
+ */
+public class SubordinateCoordinatorRecoveryModule implements RecoveryModule
+{
+ public SubordinateCoordinatorRecoveryModule()
+ {
+ if (XTSLogger.arjLogger.isDebugEnabled())
+ {
+ XTSLogger.arjLogger.debug
+ ( DebugLevel.CONSTRUCTORS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "SubordinateCoordinatorRecoveryModule created - default" );
+ }
+
+ if (_transactionStore == null)
+ {
+ _transactionStore = TxControl.getStore() ;
+ }
+
+ _transactionStatusConnectionMgr = new TransactionStatusConnectionManager() ;
+ }
+
+ /**
+ * called by the service startup code before the recovery module is added to the recovery managers
+ * module list
+ */
+ public void install()
+ {
+ // nothing to do here as we share the implementations used by the ACCoordinatorRecoveryModule
+ }
+
+ /**
+ * module list in order to allow the implementations list to be purged of this module's implementations
+ */
+ public void uninstall()
+ {
+ // nothing to do here as we share the implementations used by the ACCoordinatorRecoveryModule
+ }
+
+ /**
+ * This is called periodically by the RecoveryManager
+ */
+ public void periodicWorkFirstPass()
+ {
+ // Transaction type
+ boolean SubordinateCoordinators = false ;
+
+ // uids per transaction type
+ InputObjectState acc_uids = new InputObjectState() ;
+
+ try
+ {
+ if (XTSLogger.arjLogger.isDebugEnabled())
+ {
+ XTSLogger.arjLogger.debug( "StatusModule: first pass " );
+ }
+
+ SubordinateCoordinators = _transactionStore.allObjUids( _transactionType, acc_uids );
+
+ }
+ catch ( ObjectStoreException ex )
+ {
+ if (XTSLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ XTSLogger.arjLoggerI18N.warn("org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_1",
+ new Object[]{ex});
+ }
+ }
+
+ if ( SubordinateCoordinators )
+ {
+ _transactionUidVector = processTransactions( acc_uids ) ;
+ }
+ }
+
+ public void periodicWorkSecondPass()
+ {
+ if (XTSLogger.arjLogger.isDebugEnabled())
+ {
+ XTSLogger.arjLogger.debug( "SubordinateCoordinatorRecoveryModule: Second pass " );
+ }
+
+ if (_transactionUidVector != null) {
+ processTransactionsStatus() ;
+ }
+
+ // ok notify the coordinator processor that recovery processing has completed
+
+ }
+
+ protected SubordinateCoordinatorRecoveryModule(String type)
+ {
+ if (XTSLogger.arjLogger.isDebugEnabled())
+ {
+ XTSLogger.arjLogger.debug
+ ( DebugLevel.CONSTRUCTORS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "SubordinateCoordinatorRecoveryModule created " + type );
+ }
+
+ if (_transactionStore == null)
+ {
+ _transactionStore = TxControl.getStore() ;
+ }
+
+ _transactionStatusConnectionMgr = new TransactionStatusConnectionManager() ;
+ _transactionType = type;
+
+ }
+
+ private void doRecoverTransaction( Uid recoverUid )
+ {
+ boolean commitThisTransaction = true ;
+
+ // Retrieve the transaction status from its original process.
+ // n.b. for a non-active XTS TX this status wil l always be committed even
+ // if it aborted or had a heuristic outcome. in that case we need to use
+ // the logged action status which can only be retrieved after activation
+
+ int theStatus = _transactionStatusConnectionMgr.getTransactionStatus( _transactionType, recoverUid ) ;
+
+ boolean inFlight = isTransactionInMidFlight( theStatus ) ;
+
+ String Status = ActionStatus.stringForm( theStatus ) ;
+
+ if (XTSLogger.arjLogger.isDebugEnabled())
+ {
+ XTSLogger.arjLogger.debug
+ ( DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "transaction type is "+ _transactionType + " uid is " +
+ recoverUid.toString() + "\n ActionStatus is " + Status +
+ " in flight is " + inFlight ) ;
+ }
+
+ if ( ! inFlight )
+ {
+ try
+ {
+ XTSLogger.arjLogger.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY, "jjh doing revovery here for "+recoverUid);
+ // TODO jjh
+ RecoverSubordinateCoordinator rcvSubordinateCoordinator =
+ new RecoverSubordinateCoordinator(recoverUid);
+ rcvSubordinateCoordinator.replayPhase2();
+ }
+ catch ( Exception ex )
+ {
+ if (XTSLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ XTSLogger.arjLoggerI18N.warn("org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_2",
+ new Object[]{recoverUid.toString(), ex});
+ }
+ }
+ }
+ }
+
+ private boolean isTransactionInMidFlight( int status )
+ {
+ boolean inFlight = false ;
+
+ switch ( status )
+ {
+ // these states can only come from a process that is still alive
+ case ActionStatus.RUNNING :
+ case ActionStatus.ABORT_ONLY :
+ case ActionStatus.PREPARING :
+ case ActionStatus.COMMITTING :
+ case ActionStatus.ABORTING :
+ case ActionStatus.PREPARED :
+ inFlight = true ;
+ break ;
+
+ // the transaction is apparently still there, but has completed its
+ // phase2. should be safe to redo it.
+ case ActionStatus.COMMITTED :
+ case ActionStatus.H_COMMIT :
+ case ActionStatus.H_MIXED :
+ case ActionStatus.H_HAZARD :
+ case ActionStatus.ABORTED :
+ case ActionStatus.H_ROLLBACK :
+ inFlight = false ;
+ break ;
+
+ // this shouldn't happen
+ case ActionStatus.INVALID :
+ default:
+ inFlight = false ;
+ }
+
+ return inFlight ;
+ }
+
+ private Vector processTransactions( InputObjectState uids )
+ {
+ Vector uidVector = new Vector() ;
+
+ if (XTSLogger.arjLogger.isDebugEnabled())
+ {
+ XTSLogger.arjLogger.debug( DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "processing " + _transactionType
+ + " transactions" ) ;
+ }
+
+ Uid theUid = new Uid( Uid.nullUid() );
+
+ boolean moreUids = true ;
+
+ while (moreUids)
+ {
+ try
+ {
+ theUid.unpack( uids ) ;
+
+ if (theUid.equals( Uid.nullUid() ))
+ {
+ moreUids = false;
+ }
+ else
+ {
+ Uid newUid = new Uid( theUid ) ;
+
+ if (XTSLogger.arjLogger.isDebugEnabled())
+ {
+ XTSLogger.arjLogger.debug
+ ( DebugLevel.FUNCTIONS,
+ VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "found transaction "+ newUid ) ;
+ }
+
+ uidVector.addElement( newUid ) ;
+ }
+ }
+ catch ( Exception ex )
+ {
+ moreUids = false;
+ }
+ }
+ return uidVector ;
+ }
+
+ private void processTransactionsStatus()
+ {
+ // Process the Vector of transaction Uids
+ Enumeration transactionUidEnum = _transactionUidVector.elements() ;
+
+ while ( transactionUidEnum.hasMoreElements() )
+ {
+ Uid currentUid = (Uid) transactionUidEnum.nextElement();
+
+ try
+ {
+ if ( _transactionStore.currentState( currentUid, _transactionType ) != ObjectStore.OS_UNKNOWN )
+ {
+ doRecoverTransaction( currentUid ) ;
+ }
+ }
+ catch ( ObjectStoreException ex )
+ {
+ if (XTSLogger.arjLogger.isWarnEnabled())
+ {
+ XTSLogger.arjLoggerI18N.warn("org.jboss.transactions.xts.recovery.coordinator.at.SubordinateCoordinatorRecoveryModule_3",
+ new Object[]{currentUid.toString(), ex});
+ }
+ }
+ }
+
+ XTSATRecoveryManager.getRecoveryManager().setSubordinateCoordinatorRecoveryStarted();
+ }
+
+ // 'type' within the Object Store for ACCoordinator.
+ private String _transactionType = new SubordinateCoordinator().type() ;
+
+ // Array of transactions found in the object store of the
+ // ACCoordinator type.
+ private Vector _transactionUidVector = null ;
+
+ // Reference to the Object Store.
+ private static ObjectStore _transactionStore = null ;
+
+ // This object manages the interface to all TransactionStatusManagers
+ // processes(JVMs) on this system/node.
+ private TransactionStatusConnectionManager _transactionStatusConnectionMgr ;
+
+}
\ No newline at end of file
Property changes on: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/coordinator/at/SubordinateCoordinatorRecoveryModule.java
___________________________________________________________________
Name: svn:mergeinfo
+
Modified: labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManagerImple.java
===================================================================
--- labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManagerImple.java 2008-12-03 14:51:01 UTC (rev 24226)
+++ labs/jbosstm/trunk/XTS/sar/src/org/jboss/jbossts/xts/recovery/participant/at/XTSATRecoveryManagerImple.java 2008-12-03 15:23:34 UTC (rev 24227)
@@ -299,6 +299,16 @@
}
/**
+ * test whether the first AT subordinate coordinator recovery scan has completed. this indicates
+ * whether there may or may not still be unknown AT subtransaction records on disk. If the first
+ * scan has not yet completed then a commit for an unknown subtransaction must raise an exception
+ * delaying commit of the parent transaction.
+ */
+ public synchronized boolean isSubordinateCoordinatorRecoveryStarted() {
+ return subordinateCoordinatorRecoveryStarted;
+ }
+
+ /**
* record the fact that the first AT coordinator recovery scan has completed.
*/
@@ -307,6 +317,14 @@
}
/**
+ * record the fact that the first AT subordinate coordinator recovery scan has completed.
+ */
+
+ public synchronized void setSubordinateCoordinatorRecoveryStarted() {
+ subordinateCoordinatorRecoveryStarted = true;
+ }
+
+ /**
* a global flag indicating whether the first AT participant recovery scan has
* been performed.
*/
@@ -319,6 +337,12 @@
private boolean coordinatorRecoveryStarted = false;
/**
+ * a global flag indicating whether the first AT subordinate coordinator recovery scan has
+ * been performed.
+ */
+ private boolean subordinateCoordinatorRecoveryStarted = false;
+
+ /**
* a map from participant ids to participant recovery records
*/
private HashMap<String, ATParticipantRecoveryRecord> recoveryMap = new HashMap<String, ATParticipantRecoveryRecord>();
More information about the jboss-svn-commits
mailing list